Dependency Injection in Apache Spark Applications Sandeep Prabhakar & Shu Das
Overview ● Signals overview ● How we use Spark in the Signals team at Salesforce ● An example of a simple Spark application ● Injecting dependencies to a Spark application ● Pitfalls of Dependency Injection in Spark and how to overcome them
Signals: a platform for making sense of activity streams ● A platform for extracting important insights from a large volume of activity ● Activity includes emails, meetings, phone calls, web clicks, news, SMS urgent email suggested follow-up topic alert pricing mentioned meeting request negative sentiment
Spark in Signals team at Salesforce ● Spark Structured Streaming applications ● Applications process emails read from Kafka and S3 ● Perform enrichment, aggregations on data over time windows ● Write to Kafka and Postgres ● Deployed on Apache Mesos Apache Kafka AWS S3 Apache Kafka
Simple Spark application
Simple Spark application val spark = SparkSession.builder.appName("Example").master("local").getOrCreate() import spark.implicits._ // Create DataFrame representing the stream of input lines val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() // Split the lines into words val words = lines.as[String].flatMap(_.split(" ")) // Generate running word count val wordCounts = words.groupBy("value").count() // Start running the query that saves word counts to redis val query = wordCounts.writeStream .foreach(new BasicRedisWriter) .outputMode("update") .start() query.awaitTermination()
Desired functionalities ● Easily testable ● Components with single responsibility (separation of concerns) ● Ability to compose and reuse dependencies ● Ability to use different implementations of dependencies ● Configurable between dev, test, and prod environments
Using Guice to inject dependencies
Dependency Injection ● Technique where one object supplies the dependencies of another object ● A dependency is an object that can be used (a service) ● An injection is the passing of a dependency to a dependent object (a client) that would use it Summarized from Wikipedia: https://en.wikipedia.org/wiki/Dependency_injection
Guice ● DI framework from Google ● Easy to use and highly configurable ● Framework agnostic, implements JSR-330 (javax.inject)
Using Guice to inject dependencies // Inject RedisClient class GuiceRedisWriter @Inject()(redisClient: RedisClient) extends ForeachWriter[Row] { ... } // Inject the abstract ForeachWriter[Row]. Guice module will set the proper implementation class GuiceExample @Inject()(writer: ForeachWriter[Row]) { def countWords(spark: SparkSession, lines: DataFrame): StreamingQuery = { ... } } // Guice Module that provides implementations for dependencies class GuiceExampleModule extends AbstractModule with ScalaModule { @Provides @Singleton def provideRedisClient(): RedisClient = new RedisClient("localhost", 6379) @Provides @Singleton def provideForeachWriter(redis: RedisClient): ForeachWriter[Row] = new GuiceRedisWriter(redis) } def main(args: Array[String]): Unit = { // Create the injector and get instance of class val injector = Guice.createInjector(new GuiceExampleModule) val wordCounter = injector.getInstance(classOf[GuiceExample]) // Create Spark Session and Stream. Then call countWords of GuiceExample instance }
Spark Serialization Exception org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) ... Caused by: java.io.NotSerializableException: com.redis.RedisClient Serialization stack: - object not serializable (class: com.redis.RedisClient, value: localhost:6379) - field (class: com.salesforceiq.spark.examples.guice.GuiceRedisWriter, name: redisClient, type: class com.redis.RedisClient) - object (class com.salesforceiq.spark.examples.guice.GuiceRedisWriter, com.salesforceiq.spark.examples.guice.GuiceRedisWriter@2aef90ef) ...
Why Guice fails on Spark ● Internally Spark ships tasks to executors ● These tasks should be serializable to be transmitted ● Guice instantiates dependencies on the Spark driver at application start ● Not all dependencies can be serialized ○ Dependencies with network connections (Redis Client, Postgres Client) ○ 3rd party libraries that are not in our control ● Spark tries to serialize these dependencies but fails
How to solve this ● Need to serialize non-serializable dependencies?!?!
How to solve this ● Need to serialize non-serializable dependencies?!?! ● Serialize the configuration for a dependency (aka Guice providers) ● Construct the instances of the dependencies on the executors
How to solve this ● Need to serialize non-serializable dependencies?!?! ● Serialize the configuration for a dependency (aka Guice providers) ● Construct the instances of the dependencies on the executors ● Basically, serialize the injector and inject dependencies on the executors
Injector Provider
Injector Provider ● An internal library written at SalesforceIQ (soon to be open sourced) ● A wrapper on Guice that creates a serializable injector ● Creates an injector that can lazily load modules and dependencies
Spark with Injector Provider ● Spark ships the serialized injector along with the task to the executors ● On task deserialization ○ The injector is deserialized ○ All the dependencies are injected
Using Injector Provider class InjectorProviderExampleModule extends AbstractModule { @Provides @Singleton def provideForeachWriter(stub: ProvidedInjectorStub, redisClient: RedisClient): ForeachWriter[Row] = { new InjectorProviderRedisWriter(stub, redisClient) } } class InjectorProviderRedisWriter @Inject()(stub: ProvidedInjectorStub, _redisClient: RedisClient) extends ForeachWriter[Row] { // Make the RedisClient transient and Injectable so it does not get serialized by the JVM @Inject @transient private val redisClient = _redisClient // Deserialize this object and then use stub to inject all members private def readObject(in: ObjectInputStream): Unit = { in.defaultReadObject() stub.injectMembers(this) } ... } // Extend abstract class which internally injects all @Inject annotated objects class InjectorProviderExample @Inject() (writer: ForeachWriter[Row]) extends ProvidedInjector { /* Same */ } def main(args: Array[String]): Unit = { // Create the injector and get instance val injector = InjectorProvider.builder().addBootstrapModuleTypes(classOf[InjectorProviderModule]).build() val wordCounter = injector.getInstance(classOf[InjectorProviderExample]) // Create Spark Session and Stream. Then call countWords of GuiceExample instance }
Tradeoffs of Injector Provider ● Not straightforward ● Tied to Guice as your DI framework
Conclusions ● Creating modular Spark jobs is not easy ● Dependency injection (DI) in Spark isn’t straightforward ● Spark tasks must be serializable ● Plain old Guice does not work on Spark ● Using an injector that is serializable makes DI possible in Spark ● Injector Provider (soon to be open sourced) gives the ability to build serializable injectors

Dependency Injection in Apache Spark Applications

  • 1.
    Dependency Injection in ApacheSpark Applications Sandeep Prabhakar & Shu Das
  • 2.
    Overview ● Signals overview ●How we use Spark in the Signals team at Salesforce ● An example of a simple Spark application ● Injecting dependencies to a Spark application ● Pitfalls of Dependency Injection in Spark and how to overcome them
  • 3.
    Signals: a platformfor making sense of activity streams ● A platform for extracting important insights from a large volume of activity ● Activity includes emails, meetings, phone calls, web clicks, news, SMS urgent email suggested follow-up topic alert pricing mentioned meeting request negative sentiment
  • 4.
    Spark in Signalsteam at Salesforce ● Spark Structured Streaming applications ● Applications process emails read from Kafka and S3 ● Perform enrichment, aggregations on data over time windows ● Write to Kafka and Postgres ● Deployed on Apache Mesos Apache Kafka AWS S3 Apache Kafka
  • 5.
  • 6.
    Simple Spark application valspark = SparkSession.builder.appName("Example").master("local").getOrCreate() import spark.implicits._ // Create DataFrame representing the stream of input lines val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() // Split the lines into words val words = lines.as[String].flatMap(_.split(" ")) // Generate running word count val wordCounts = words.groupBy("value").count() // Start running the query that saves word counts to redis val query = wordCounts.writeStream .foreach(new BasicRedisWriter) .outputMode("update") .start() query.awaitTermination()
  • 7.
    Desired functionalities ● Easilytestable ● Components with single responsibility (separation of concerns) ● Ability to compose and reuse dependencies ● Ability to use different implementations of dependencies ● Configurable between dev, test, and prod environments
  • 8.
    Using Guice toinject dependencies
  • 9.
    Dependency Injection ● Techniquewhere one object supplies the dependencies of another object ● A dependency is an object that can be used (a service) ● An injection is the passing of a dependency to a dependent object (a client) that would use it Summarized from Wikipedia: https://en.wikipedia.org/wiki/Dependency_injection
  • 10.
    Guice ● DI frameworkfrom Google ● Easy to use and highly configurable ● Framework agnostic, implements JSR-330 (javax.inject)
  • 11.
    Using Guice toinject dependencies // Inject RedisClient class GuiceRedisWriter @Inject()(redisClient: RedisClient) extends ForeachWriter[Row] { ... } // Inject the abstract ForeachWriter[Row]. Guice module will set the proper implementation class GuiceExample @Inject()(writer: ForeachWriter[Row]) { def countWords(spark: SparkSession, lines: DataFrame): StreamingQuery = { ... } } // Guice Module that provides implementations for dependencies class GuiceExampleModule extends AbstractModule with ScalaModule { @Provides @Singleton def provideRedisClient(): RedisClient = new RedisClient("localhost", 6379) @Provides @Singleton def provideForeachWriter(redis: RedisClient): ForeachWriter[Row] = new GuiceRedisWriter(redis) } def main(args: Array[String]): Unit = { // Create the injector and get instance of class val injector = Guice.createInjector(new GuiceExampleModule) val wordCounter = injector.getInstance(classOf[GuiceExample]) // Create Spark Session and Stream. Then call countWords of GuiceExample instance }
  • 12.
    Spark Serialization Exception org.apache.spark.SparkException:Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) ... Caused by: java.io.NotSerializableException: com.redis.RedisClient Serialization stack: - object not serializable (class: com.redis.RedisClient, value: localhost:6379) - field (class: com.salesforceiq.spark.examples.guice.GuiceRedisWriter, name: redisClient, type: class com.redis.RedisClient) - object (class com.salesforceiq.spark.examples.guice.GuiceRedisWriter, com.salesforceiq.spark.examples.guice.GuiceRedisWriter@2aef90ef) ...
  • 13.
    Why Guice failson Spark ● Internally Spark ships tasks to executors ● These tasks should be serializable to be transmitted ● Guice instantiates dependencies on the Spark driver at application start ● Not all dependencies can be serialized ○ Dependencies with network connections (Redis Client, Postgres Client) ○ 3rd party libraries that are not in our control ● Spark tries to serialize these dependencies but fails
  • 14.
    How to solvethis ● Need to serialize non-serializable dependencies?!?!
  • 15.
    How to solvethis ● Need to serialize non-serializable dependencies?!?! ● Serialize the configuration for a dependency (aka Guice providers) ● Construct the instances of the dependencies on the executors
  • 16.
    How to solvethis ● Need to serialize non-serializable dependencies?!?! ● Serialize the configuration for a dependency (aka Guice providers) ● Construct the instances of the dependencies on the executors ● Basically, serialize the injector and inject dependencies on the executors
  • 17.
  • 18.
    Injector Provider ● Aninternal library written at SalesforceIQ (soon to be open sourced) ● A wrapper on Guice that creates a serializable injector ● Creates an injector that can lazily load modules and dependencies
  • 19.
    Spark with InjectorProvider ● Spark ships the serialized injector along with the task to the executors ● On task deserialization ○ The injector is deserialized ○ All the dependencies are injected
  • 20.
    Using Injector Provider classInjectorProviderExampleModule extends AbstractModule { @Provides @Singleton def provideForeachWriter(stub: ProvidedInjectorStub, redisClient: RedisClient): ForeachWriter[Row] = { new InjectorProviderRedisWriter(stub, redisClient) } } class InjectorProviderRedisWriter @Inject()(stub: ProvidedInjectorStub, _redisClient: RedisClient) extends ForeachWriter[Row] { // Make the RedisClient transient and Injectable so it does not get serialized by the JVM @Inject @transient private val redisClient = _redisClient // Deserialize this object and then use stub to inject all members private def readObject(in: ObjectInputStream): Unit = { in.defaultReadObject() stub.injectMembers(this) } ... } // Extend abstract class which internally injects all @Inject annotated objects class InjectorProviderExample @Inject() (writer: ForeachWriter[Row]) extends ProvidedInjector { /* Same */ } def main(args: Array[String]): Unit = { // Create the injector and get instance val injector = InjectorProvider.builder().addBootstrapModuleTypes(classOf[InjectorProviderModule]).build() val wordCounter = injector.getInstance(classOf[InjectorProviderExample]) // Create Spark Session and Stream. Then call countWords of GuiceExample instance }
  • 21.
    Tradeoffs of InjectorProvider ● Not straightforward ● Tied to Guice as your DI framework
  • 22.
    Conclusions ● Creating modularSpark jobs is not easy ● Dependency injection (DI) in Spark isn’t straightforward ● Spark tasks must be serializable ● Plain old Guice does not work on Spark ● Using an injector that is serializable makes DI possible in Spark ● Injector Provider (soon to be open sourced) gives the ability to build serializable injectors