read

Finally after using Apache Spark internally in standalone mode we had a reason to deploy in on Amazon's EMR to do some funky machine learning with MLlib!

This brought a lot of pain, swearing and tears but ended up with a cheerful "HELL YEAH!". Here's a semi-random list of things I wish I'd known before I started. Some are Spark related, some will concern only Scala users and some are just general tips.

1. DON'T use the App trait!

This isn't really a Spark problem but can result in NullPointerExceptions as discussed in this issue and believe me, it's not easy to debug/figure out what's happening. The problem lies in late initialisation of variables, in the example below str1 will be a null on workers! But will work well on the driver... Even simple apps like the one in the issue report will have problems:

object DemoBug extends App {
    val conf = new SparkConf()
    val sc = new SparkContext(conf)

    val rdd = sc.parallelize(List("A","B","C","D"))
    val str1 = "A"

    val rslt = rdd.filter(x => { str1 != null && x != "A" }).count
    
    println(s"DemoBug: rslt = $rslt")
}

Will produce:

DemoBug: rslt = 0

Instead use:

def main(args:Array[String])

2. DO use a lot of logging.

Logging in moderation is always a good thing and might sound pretty obvious, but... With Spark, and well Hadoop in general, it's a vital part of your app. Debugging Spark jobs in production is ridiculously hard and well placed logging messages might save you from a redeploy, which might takes minutes or even hours. Remember, though, that many loggers are not Serializable so you might need to use one of the tricks I wrote about here.

3a. DO pay attention to serialization.

Spark sends a lot of objects over the wire, that's why plenty of your classes will have to implement the Serializable interface. For this reason this will fail:

class NotSerializablePrinter { def print(msg:String) = println(msg) }

val printer = new NotSerializablePrinter
val rdd = sc.parallelize(Array("1","2","3"))
rdd.foreach(msg => printer.print(msg))

You'll get a beautiful java.io.NotSerializableException: $iwC$$iwC$NotSerializableClass. That's because everything inside the RDD object methods is sent to workers to be carried out and has to transfered over the wire. Since the printer class isn't it cannot be sent from the driver node to the workers.

Whenever you see this exception you have a few options:

  1. The easiest is to make the class serializable by class NotSerializablePrinter extends Serializable. Do remember that all the member variables also have to be serializable, though!
  2. Sometimes you cannot modify the code, in that case you can create the object on the worker nodes in the closure itself, the below code works as no objects are sent over the wire:
  3. rdd.foreach(msg => new NotSerializablePrinter.print(msg))
    
  4. The problem above is that you create a new object with each iteration. To work around this you can create a singleton object on each worker, in Java that would require making a static variable. In Scala this should work:
  5. object printer {
      val instance = new NotSerializablePrinter
    }
    rdd.foreach(msg => printer.instance.print(msg))
    

3b. DO use non-default serializers.

Java serialization, which is the default serialization method in Spark, isn't the fastest, don't think we need to discuss it. But if you still don't believe me, here:

Fortunately you can very easily switch from native Java serialization to Kryo by setting in SparkConf the property "spark.serializer" to "org.apache.spark.serializer.KryoSerializer". This will be used both during shuffles between worker nodes and during serialization to disk.

Kryo does require some more configuration and has some limitations, go check out the spark documentation to know what you need to exactly do. Actually read that whole part of the doc right now!

4. DON'T use different modes/schedulers/data for testing and prod!

The thought of using standalone scheduler+client mode for tests and YARN/Mesos+cluster mode in prod sounds very tempting. Same goes for testing your Spark apps on smaller datasets than the ones in production. Do not give into that temptation as a false one, it is :-) There are scheduler specific bugs like this one I encountered. Everything worked fine in local mode on a small dataset. A big dataset+Yarn = disaster.

5. DO cache/persist

Consider this piece of code:

import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors

// Declare and initialise the spark context

// Load a HUGE dataset
val data = sc.textFile("kmeans_data.txt")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble)))

val clusters = KMeans.train(parsedData, 4, 500)

All fine and dandy, right? Wrong! It's extremely important to read the docs of every method you use to see if it uses your RDD more than once! In the above case parsedData will be re-read from the file and the whole RDD will be recreated hundreds of time inside the KMeans.train method!

While writing your algorithms and using third party libraries remember about persist(level) and the shorthand version cache().

The above methods store computed RDD either in memory (faster access) or in memory and on disk (more space but slower) in a serialized (less storage, more CPU intensive) or non serialized format so Spark will not recreate it every time the RDD is used. My advice is to, if possible, use the memory only storage level, you really don't want to hit the disk although that's not always an option.

The above example can be quickly fixed by adding one line (or calling the method during the assignment) of code before the KMeans call:

val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble)))
parsedData.cache()

6. BONUS1: AWS EMR mode....

It seems that AWS EMR only supports natively YARN scheduling and there's no (easy) way to use the spark standalone mode so if you, for some reason, need to use it better go with spark ec2 scripts and save yourselve a lot of trouble.

7. BONUS2: ES HADOOP

Elasticsearch Hadoop is a very cool library which will allow you to read data from Elasticsearch as a DataFrame or an RDD. Two pieces of advice here:

  1. Remember that each field in ES is by default an array, even without any additional config? Well yes, it can be a singular value or an array straight out of the box! If you have arrays in your documents better use es.read.field.as.array.include option which is fully described in the documentation. since ES-Hadoop might have a hard time figuring this out.
  2. Even though all the issues regarding Position for 'FIELDNAME' not found in row; typically this is caused by a mapping inconsistency regarding nested fields I have found in the tracker seem to be resolved I still had some problems when using DataFrames. If you encounter them consider using EsSpark.esRDD and going with an RDD instead...

That's it for now, not much but hope it will make people's life easier! Now is the time to make the whole thing actually fast so be on a look out for performance tuning tips!

Blog Logo

Mateusz Dymczyk


Published

Image

Rants about programming...

Coding, Japan and general rants from Toyko...

Back to Overview