Commit Logs

Serve Spark ML Models Using Play Framework and S3

We had talked about various ways to serve machine learning results in production (see an ealier post Predictive Model Deployment with Spark for example). That article outlines three architectures of model serving systems. In particular, I found using Spark’s internal serialization logic to persist/load models to be both flexible and reliable. As a follow-up, in this post, I am going to show case how to serve a simple Spark MLLib machine learning model, i.e. Naive Bayes classifier as an example, in a web application built with the Play Framework, which is one of the most popular web frameworks in Scala/Java.

Offline model training

Today machine learning models are often trained on a large Spark cluster to take advantage of its powerful distributed computing capability and easy-to-use APIs. Since the offline training part is not the focus of this post, I will just build on top of a toy training pipeline from the official Spark tutorial

1
2
3
4
5
6
7
8
9
10
11
12
13
import org.apache.spark.mllib.classification.{NaiveBayes, NaiveBayesModel}
import org.apache.spark.mllib.util.MLUtils

// Load and parse the data file.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

// Split data into training (60%) and test (40%).
val Array(training, test) = data.randomSplit(Array(0.6, 0.4))

val model = NaiveBayes.train(training, lambda = 1.0, modelType = "multinomial")

val predictionAndLabel = test.map(p => (model.predict(p.features), p.label))
val accuracy = 1.0 * predictionAndLabel.filter(x => x._1 == x._2).count() / test.count()

S3 storage

There are a few model storage options and I decide to use S3 for its simplicity and avaliability. Moreover, Spark provides nice support to save serialized model directly to S3. All you need to do is to configure your SparkContext with the right S3 credentials and then add the following lines to the previous code snippet.

1
2
3
4
5
sc.hadoopConfiguration.set("fs.s3a.access.key", YourAWSAccessKeyId)
sc.hadoopConfiguration.set("fs.s3a.secret.key", YourAWSSecretKey)

// replace it with your bucket path
model.save(sc, "s3a://persisted-models/naivebayesexample")

If you go to your AWS console, you could find .parquet files stored under your S3 bucket persisted-models/naivebayesexample, which represent the model internal serialization logic of Spark.

Note that I am using the s3a URI schema to interact with S3. There are in total three variants as described in https://wiki.apache.org/hadoop/AmazonS3

S3 Native FileSystem (URI scheme: s3n) A native filesystem for reading and writing regular files on S3. The advantage of this filesystem is that you can access files on S3 that were written with other tools. Conversely, other tools can access files written using Hadoop. The disadvantage is the 5GB limit on file size imposed by S3.

S3A (URI scheme: s3a) A successor to the S3 Native, s3n fs, the S3a: system uses Amazon’s libraries to interact with S3. This allows S3a to support larger files (no more 5GB limit), higher performance operations and more. The filesystem is intended to be a replacement for/successor to S3 Native: all objects accessible from s3n:// URLs should also be accessible from s3a simply by replacing the URL schema.

S3 Block FileSystem (URI scheme: s3) A block-based filesystem backed by S3. Files are stored as blocks, just like they are in HDFS. This permits efficient implementation of renames. This filesystem requires you to dedicate a bucket for the filesystem - you should not use an existing bucket containing files, or write other files to the same bucket. The files stored by this filesystem can be larger than 5GB, but they are not interoperable with other S3 tools.

I choose to use S3A is because 1) it is an object-based overlay on top of S3, unlike S3 Block FileSystem, and 2) it has better performance and suports object up to 5TB compared to S3 Native FileSystem with 5GB object size limit.

Online model serving

Now let’s come to the online serving part. To save some time, I am assuming you already have a Play application up and running (I may write a follow-up post to explain how to set up a minimal web application using the Play Framework, which should be fairly straightforward). For now, let’s also assume the project name of the Play application is yoda and its skeleton looks like the following.

1
2
3
4
5
6
7
8
9
10
11
12
yoda/
app/
controllers/
Application.scala
views/
index.scala.html
main.scala.html
conf/
application.conf
routes
public/
build.sbt

Add dependencies

The trained model is saved on S3 already at this moment, what we want is to load up the model in memory for yoda. To achieve this, additional dependencies need to be added to the web application. Specifically, we need to add

  • guava: dependency injection and cache
  • hadoop: read files from AWS S3
  • spark: deserialize parquet files to Spark ML model and make predictions

You can add these dependencies in the build.sbt file by adding the following lines.

1
2
3
4
5
6
7
8
9
10
11
libraryDependencies ++= Seq(
jdbc,
anorm,
cache,
"com.google.guava" %% "guava" % "19.0",
"org.apache.spark" %% "spark-core" % "2.0.0",
"org.apache.spark" %% "spark-hive" % "2.0.0",
"org.apache.spark" %% "spark-sql" % "2.0.0",
"org.apache.spark" %% "spark-mllib" % "2.0.0",
"org.apache.hadoop" %% "hadoop-aws" % "2.7.3"
)

Load model from S3

With these dependencies, we can now load trained model from S3 to memory. Note that we can use some cache mechanism to keep the model in memory for prediction and refresh it if the model is updated. For similicity, we are going to keep the model in memory and refresh every 24 hours in this toy example. More complex caching logic should be use case specific.

The logic can be added to CacheProvider.scala under controllers/, i.e.

1
2
3
controllers/
Application.scala
CacheProvider.scala

such that

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package controllers

import java.util.concurrent.TimeUnit

import com.google.common.cache.{CacheBuilder, CacheLoader}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.classification.NaiveBayesModel


trait CacheProvider {

val conf = new SparkConf().setMaster("local").setAppName("yoda")
.set("spark.driver.host", "localhost")
.set("spark.driver.allowMultipleContexts", "true")
val sc = new SparkContext(conf)

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", YourAWSAccessKeyId)
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", YourAWSSecretKey)

val naiveBayesModelCache = CacheBuilder.newBuilder()
.maximumSize(2)
.refreshAfterWrite(24, TimeUnit.HOURS)
.build(
new CacheLoader[String, NaiveBayesModel]{
def load(path: String): NaiveBayesModel = {
NaiveBayesModel.load(sc, path)
}
}
)

def getNaiveBayesModel: NaiveBayesModel = {
naiveBayesModelCache.get("s3n://persisted-models/naivebayesexample")
}
}

Essentially, the web application yoda runs a Spark in local mode and use its built-in functionality to load the saved model from S3. This approach is pretty generic in the sense that all types of models .save to S3 can be loaded in memory by a web application with minimal code changes.

Make online prediction

Once the Spark ML model is loaded in memory, making prediction based on incoming request is straightforward. Most Spark ML models have a built-in .predict method that takes in an array of features and returns a prediction score.

You can add the following lines to your Application.scala to make predictions on randomly generated features.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package controllers

import play.api._
import play.api.mvc._

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits._

object Application extends Controller {

def index = Action.async {
val f: Future[Double] = Future {

// random generate inputs
val testInput = Vectors.dense(Array.fill(692)(Random.nextInt).map(_.toDouble))

CacheProvider.getNaiveBayesModel.predict(testInput)
}

f.map { i => Ok(views.html.index(i.toString)) }
}
}
}

Summary

Building machine learning models are fun and challenging, but, at the end of the day, we want to serve them to our users. This often means expose some endpoints in a web service. The described method in this post aims to provide a generic way to serve all kinds of machine learning models to improve the experience of our users.

As always, I would really appreciate your thoughts/comments. Feel free to leave them following this post or tweet me @_LeiG.