• Posted by Intent Media 09 Jun
  • 3 Comments

One Up: Building Machine Learning Pipelines with Mario

As data engineers, we are often constructing large-scale pipelines to learn models of user behavior. These model learning pipelines are crucial to our business, so we need them to run reliably and efficiently at scale.

The challenges we encounter developing and maintaining sophisticated data processing pipelines are often around code clarity, testability, and extensibility. Based on our experiences with these challenges, we’ve developed solutions for various common pipeline construction problems, one of which we’re open-sourcing today. In this blog post, we’ll introduce Mario, a Scala library for constructing type-safe pipelines using functional idioms. It’s a small library that asks very little of its users but provides a great deal in terms of composability, safety, and concurrency.

Down the Warp Pipe: Introducing Mario

Mario is a Scala library focused on defining complex data pipelines in a functional, type-safe, and efficient way. It is applicable in all sorts of data pipelines, but we’re currently focusing on Spark as a primary use case.

We recently transitioned to using Spark after years of using Hadoop for our data pipelines. Our continued investment in Spark is driven by our overall happiness with Spark as a platform. It offers powerful abstractions that allow us to more rapidly develop large-scale machine learning systems.

For example, Spark’s Resilient Distributed Datasets (RDDs) provide functional and type-safe representations of datasets. The concept of RDD is an incredibly useful abstraction that allows you to construct other abstractions with the same principles of functional composition and type safety.

Another feature present in Mario is concurrent pipeline execution. This feature may not be crucial for pipelines built using Spark, because Spark has its own execution manager. In other contexts, however, concurrency can be useful for costly computations like querying a database or calling a remote API.

World 1-1: An Example Pipeline

For a motivating example, let’s suppose that we want to extract data from two different sources, operate on each one, and then join those results. This can be represented by the following diagram.

   LoadA     LoadB
     |         |
     |         |
TransformA  TransformB
      \      /
       \    /
       JoinAB

As a way of approximating a large-scale data processing job, we will construct our steps as transformations on Spark RDDs.

First, we will define the pipeline steps in terms of Scala functions.

case class DataA(id: String, data: Iterable[String])
case class DataB(id: String, data: Iterable[String])

sc: SparkContext // defined somewhere else

def loadA = sc.textFile("examples/data/a.text")
def loadB = sc.textFile("examples/data/b.text")

def transformA(a: RDD[String]) =
  a.map (_.split(" "))
   .map(x => (x(0), x(1)))
   .groupByKey()
   .map { case (id, data) => (id, DataA(id, data)) }

def transformB(b: RDD[String]) =
  b.map (_.split(" "))
  .map(x => (x(0), x(1)))
  .groupByKey()
  .map { case (id, data) => (id, DataB(id, data)) }

def joinAB(a: RDD[(String, DataA)], b: RDD[(String, DataB)]) =
  a join b

Snippet #1

Here is a naive approach to composing these steps.

val loadAStep    = loadA
val loadBStep    = loadB
val transformerAStep = transformA(loadAStep)
val transformerBStep = transformB(loadBStep)
val result      = joinAB(transformAStep, transformBStep)

Snippet #2

This naive composition strategy can be enhanced by using Mario to compose these steps using the pipe method.

val loadAStep    = pipe(loadA)
val loadBStep    = pipe(loadB)
val transformerAStep = pipe(transformA, loadAStep)
val transformerBStep = pipe(transformB, loadBStep)
val joinABStep    = pipe(joinAB, transformAStep, transformBStep)
val result      = joinABStep.run()

Snippet #3

Dependencies

Each of these vals is a pipeline, an object that encapsulates a function. A pipeline may also depend on other pipelines. Those dependencies are inferred from the type signature of the function.  In some cases, such as anonymous functions, you may need to specify the type parameters in the function. Fully composed pipelines will start with a pipeline that has no dependencies. If you merge different pipelines that have shared dependent steps, Mario will execute those steps only once.

One distinguishing feature of Mario’s pipelines is that cyclic dependencies are impossible to define. This is a consequence of the functional nature of its design.  Pipeline values are immutable and thus can’t rely on themselves for their own creation.

Idiomatic Mario

For greater expressiveness, you can also use for comprehensions to define pipelines:

for {
  loadAStep    <- pipe(loadA)
  loadBStep    <- pipe(loadB)
  transformerAStep <- pipe(transformB, loadA)
  transformerBStep <- pipe(transformB, loadB)
  joinABStep    <- pipe(joinAB, transformAStep, transformBStep)
} yield joinABStep.run()

Snippet #4

This is equivalent to the pipeline shown above, but it obviates the need to define vals of each step.

Execution

In the above example, all of the step vals are lazy, in that they do not apply their functions until the final line where .run() is called. This is in contrast to Snippet #2, where functions are immediately applied. Even if you attempted to use lazy vals in the Snippet #2 approach, the transformer steps would force function application.

Decoupling the composition of pipelines from their execution is a major design goal of Mario.  By separating these two concerns the type system can ensure valid pipeline composition without needing to actually execute a pipeline.

Concurrency

As mentioned before, Mario provides built in concurrency without the need to explicitly define which steps are to be executed concurrently. Determining which steps can be executed is purely inferred from the dependencies, as expressed in the pipeline composition. For example, in Snippet #3, loadAStep and loadBStep will be executed independently and concurrently.

Isolation

Each of these steps is a pipeline itself that can be run in isolation. This isolation is crucial for testing and prototyping. Testing easily composable functional code is much easier than testing a monolithic pipeline. Dependency injection becomes simple. These sorts of features are extremely useful when you’re constructing things like large-scale machine learning pipelines that can take hours to run even on massive clusters.

Multiplayer: Comparison

Although there are good tools already available, like Luigi, Cascalog, and Spark’s ML Pipelines, we feel that they only solve some of the problems of pipeline construction. Our motivation in the development of Mario was to use a more functional, declarative and composable approach, focused on use with Spark.

Luigi

Luigi (AKA Player 2) is often used for pipeline construction in big data workflows. The main difference between Mario and Luigi is that Luigi only offers task level concurrency and composition. Mario is focused on composition of steps within a given task. When used with Spark, this has substantial implications for runtime performance, because you can use RDDs in memory to communicate between steps.  Contrast this with the alternative approach, persisting step outputs to disk.  The RDD-based composition approach will not have to perform the I/O intensive operation of reading the intermediates from disk.  Moreover, there is no need to manually determine how tasks should be distributed when submitting a pipeline to Spark using Mario.

Cascalog

Our previous approach to constructing pipelines at Intent Media relied on Cascalog checkpoint’s workflows combined with an in-house job chaining tool. The key difficulties of using Cascalog workflows is that very few static guarantees are provided. The lack of guarantees requires testing to verify the correct composition of pipelines. Moreover, the Hadoop style of step communication, persisting intermediates to disk, was inefficient and resulted in even more uncertainty about the impact of changes.

Spark’s ML Pipelines

Spark’s ML Pipelines also offer some overlapping functionality. However, ML Pipelines do not offer strong typing guarantees. Rather, they merely enforce schemas. They also have no complex dependency graph support; all dependency graphs are linear. Composition using ML Pipelines results in a code structure that does not allow for the isolated execution of a given arbitrary Pipeline step.

Obviously, Mario doesn’t address all of the use cases that these other tools do. It is intentionally small and tightly focused on the problems of pipeline construction. Mario could, in theory, be used in combination with these and other pipelining tools; there is no intrinsic conflict

Goal Pole: Summary

So, that’s Mario: a simple and generic tool for defining composable and parallel computations with strong type safety guarantees. It can either coexist with your existing data pipeline or be a drop-in replacement.

The current API is tightly focused on solving the pain points we’ve experienced in building pipelines, but we’re not done.

Here are some of our ideas around future Mario development:

  • Fault tolerance
  • Implicit caching (in Spark)
  • Web UI with graph visualization

We’ll keep digging into more details and features on future posts. Check it out on Github; we are always open to bug reports and pull requests.


Sebastian Rojas is a data engineer at Intent Media, where he works solving big data processing and machine learning problems. He is constantly dreaming about dystopian retro futuristic technology and sometimes composing related music about it. You can find him on Github.

Jeff Smith is a data engineer at Intent Media working on large scale machine learning systems. He used to build warehouses and sequence genomes (not at the same time, though).  Intent Media is the fifth startup he’s worked at, and it’s easily the most fun one. You can find him tweeting, blogging, and drawing comics all over the internet.

Post Comments 3

Posted by ittay on
  • Jun 23 2015
 
How is Mario related to Scalding? Scalding also offers type safe pipeline construction. With https://github.com/tresata/spark-scalding, one can write Scalding pipelines on top of Spark and I believe Cascading wants to add out-of-the-box support for Spark
    Posted by Jeff Smith on
    • Jun 23 2015
     
    Scalding has much broader ambitions than Mario. The number of abstractions that Cascading and thus Scalding employ is quite large. Mario tries to be much simpler given its narrower scope. It's tightly focused on how steps can be composed into pipelines, with an understanding of what their dependencies are. So type-safety is just a part of that. You could certainly use Mario and Scalding in combination, as well. It just depends on what you're trying to do with Scalding in comparison to what else you're trying to do in your other Scala pipeline code. In such a scenario, I could see Mario being useful glue code that allowed you to build up a top-level pipeline abstraction that united your Scalding and other code in your pipeline.
Posted by dave on
  • Oct 10 2015
 
hi, i like your idea of using for comprehension. But it looks like it's not the right way to do for comprehension? Pipelien.flatMap should decompose the PipeLine[T] type so the signature should be something like: PipeLine.flatMap(f:Out=>A)