DESIGNING A SCALABLE DATA PLATFORM USING AKKA, SPARK STREAMING AND KAFKA Alex Silva
MMM…. DATA!!!!Data Ingestion
INGESTION 110010010101000111010101111111
BUT HOW DO WE START?
REAL-TIME VIDEO DATA STREAMING AND ANALYSIS
Distributed Elastic Location Agnostic Open Message Driven Self-Healing
REACTIVE
The Reactive Manifesto Responsive Elastic Resilient Message Driven
Why Akka? Reactive Scalable Fault Tolerant Load Management Both up and out Location Transparency
Challenges with Akka Learning Curve Type Safety Debugging Dead Letters
Why Kafka? Distributed Log High Throughput Reliable Concurrency
Producers vs Consumers Producer Producer Kafka Cluster Broker 2 Topic 1 Partition 1 Broker 1 Topic 1 Partition 0 Broker 3 Topic 1 Partition 3 Producer Producer Producer
Kafka: A CP or CA System? CONSISTENCY AVAILABILITY PARTITION TOLERANCE INCONSISTENCY UNAVAILABILITY PARTITIONING INTOLERANNCE
Why Spark? Fast! Unified Platform Functional Paradigm Rich Library Set Active Community
MODULES
HYDRA CORE Metrics HTTP Server Bootstrapping Utilities System Actors
HYDRA INGESTION Actor Hierarchy Supervision Kafka Gateway Message Protocol
MESSAGE HANDLERS
Handler Registry Monitors registered handlers for errors/stops Brodcasts messages Handler Lifecycle
< META > { } /ingest Coordinator Registry Handlers Ingestion Flow
Ingestion Actors: Coordinator Supervises ingestion at the request level Coordinates protocol flow Reports errors and metrics
Ingestion Protocol Publish MESSAGE HANDLERS Join STOP Validate IngestValid Invalid<<Silence>>
HEY GUYS! CHECK THIS OUT! HUH?! NICE!! BRING IT!! NAH… Publish JoinJoin Ingestion Protocol: Publish Handler Registry Message handlers
Ingestion Protocol: Validation HOW DOES IT LOOK? Validate BAD! Invalid GOOD! Valid Ingestion Coordinator Message handlers
Ingestion Protocol: Invalid Message Ingestion Coordinator Error Reporter GOT A BAD ONE ReportError Ingest
foreach handler Ingestion Protocol: Ingest SHIP IT! Ingest Encode Persist
abstract class BaseMessageHandler extends Actor with ActorConfigSupport with ActorLogging with IngestionFlow with ProducerSupport with MessageHandler { ingest { case Initialize => { //nothing required by default } case Publish(request) => { log.info(s"Publish message was not handled by ${self}. Will not join.") } case Validate(request) => { sender ! Validated } case Ingest(request) => { log.warning("Ingest message was not handled by ${self}.") sender ! HandlerCompleted } case Shutdown => { //nothing required by default } case Heartbeat => { Health.get(self).getChecks } } }
Elasticity
Elasticity Asynchronous Share Nothing Divide and Conquer Location Transparency
akka { actor { deployment { /services-manager/handler_registry/segment_handler { router = round-robin-pool optimal-size-exploring-resizer { enabled = on action-interval = 5s downsize-after-underutilized-for = 2h } } /services-manager/kafka_producer { router = round-robin-pool resizer { lower-bound = 5 upper-bound = 50 messages-per-resize = 500 } } } } }
akka { actor { deployment { /services-manager/handler_registry/segment_handler { router = round-robin-pool optimal-size-exploring-resizer { enabled = on action-interval = 5s downsize-after-underutilized-for = 2h } } } provider = "akka.cluster.ClusterRefActorProvider" } cluster { seed-nodes = ["akka.tcp://Hydra@127.0.0.1:2552","akka.tcp://hydra@172.0.0.1:2553"] } }
Message Driven
SAVE THIS! SOMEBODY LOGGED IN! Events are not addressed to a specific recipient. Message vs. Events
HYDRA PERSISTENCE Kafka Producers Offset Management Message Serialization
Message Serialization Happens at the message-level Binds Messages to Producers Pluggable
trait KafkaMessage[K, P] { val timestamp = System.currentTimeMillis def key: K def payload: P def retryOnFailure: Boolean = true } case class JsonMessage(key: String, payload: JsonNode) extends KafkaMessage[String, JsonNode] object JsonMessage { val mapper = new ObjectMapper() def apply(key: String, json: String) = { val payload: JsonNode = mapper.readTree(json) new JsonMessage(key, payload) } } case class AvroMessage(val schema: SchemaHolder, key: String, json: String) extends KafkaMessage[String, GenericRecord] { def payload: GenericRecord = { val converter: JsonConverter[GenericRecord] = new JsonConverter[GenericRecord](schema.schema) converter.convert(json) } }
Why Avro? Binary Format Space Efficient Evolutionary Schemas Automatic Tables
Fault Tolerance
“The ability of something to return to its original shape, after it has been pulled, stretched, pressed, or bent.” Merriam-Webster Resiliency
Resilient Protocols Message Loss Message Reordering Message Duplication
Asynchronous Communication + Eventual Consistency Resilient Protocols
ACID 2.0 Associative Commutative Idempotent Distributed
Resilient Protocols Replication Delegation Isolation Containment
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) { case _: ActorInitializationException => akka.actor.SupervisorStrategy.Stop case _: FailedToSendMessageException => Restart case _: ProducerClosedException => Restart case _: NoBrokersForPartitionException => Escalate case _: KafkaException => Escalate case _: ConnectException => Escalate case _: Exception => Escalate } val kafkaProducerSupervisor = BackoffSupervisor.props( Backoff.onFailure( kafkaProducerProps, childName = actorName[KafkaProducerActor], minBackoff = 3.seconds, maxBackoff = 30.seconds, randomFactor = 0.2 ))
class KafkaProducerActor extends Actor with LoggingAdapter with ActorConfigSupport with NotificationSupport[KafkaMessage[Any, Any]] { import KafkaProducerActor._ implicit val ec = context.dispatcher override def preRestart(cause: Throwable, message: Option[Any]) = { //send it to itself again after the exponential delays, no Ack from Kafka message match { case Some(rp: RetryingProduce) => { notifyObservers(KafkaMessageNotDelivered(rp.msg)) val nextBackOff = rp.backOff.nextBackOff val retry = RetryingProduce(rp.topic, rp.msg) retry.backOff = nextBackOff context.system.scheduler.scheduleOnce(nextBackOff.waitTime, self, retry) } case Some(produce: Produce) => { notifyObservers(KafkaMessageNotDelivered(produce.msg)) if (produce.msg.retryOnFailure) { context.system.scheduler.scheduleOnce(initialDelay, self, RetryingProduce(produce.topic, produce.msg)) } } } } }
The Error Kernel Pattern Error Kernel Per Request No Processing Delegation Ingestion Errors Time outs
HYDRA CONDUCTORS Event “Pulling” HTTP conductor
Location Transparency: Akka Remoting Peer-to-Peer Serialization Delivery Reliability Latency
Message Delivery in Hydra What does guaranteed delivery mean? At most once semantics Can be made stronger
Reliable Proxy
@throws(classOf[Exception]) override def init: Future[Boolean] = Future { val useProxy = config.getBoolean(“message.proxy”,false) val ingestorPath = config.getRequiredString("ingestor.path") ingestionActor = if (useProxy) context.actorSelection(ingestorPath) else context.actorOf(ReliableIngestionProxy.props(ingestorPath)) val cHeaders = config.getOptionalList("headers") topic = config.getRequiredString("kafka.topic") headers = cHeaders match { case Some(ch) => List( ch.unwrapped.asScala.map { header => { val sh = header.toString.split(":") RawHeader(sh(0), sh(1)) } }: _* ) case None => List.empty[HttpHeader] } true }
Conductors Webhooks What’s streaming into Hydra today?
What’s streaming into Hydra in the next few days? Internal ETL Many others…
HYDRA SPARK Kafka Dispatching Spark Job Server Streaming Analytics
1 2 3 4 5 6 7 8 9 10 … 90 91 9 2 9 3 9 4 9 5 9 6 9 7 9 8 9 9 10 0 Time 321 Offsets Dispatching
1 2 3 4 5 6 7 8 9 10 … 90 91 9 2 9 3 9 4 9 5 9 6 9 7 9 8 9 9 10 0 1 2 3 Time Offsets Dispatching
ImpalaHive Dispatchers HDFS RDBMS Kafka Cassandra
Dispatching Adhoc Tables { } /dispatch
WHAT ABOUT ORDERING? Post-Dispatch Ordering Single Partition Topic
Spark Defined via hydra configuration and bootstrapped Both batch and streaming contexts Ad-hoc via job server
Spark Job Server Spark as a REST Service Spark SQL, Streaming, etc. Async and Sync Job APIs Jar Management Context Management
Jar Manager Endpoint Storage Retrieval GET	/jars	-	lists	jars	and	the	upload	timestamp POST	/jars/<appName>	-	uploads	a	new	jar	under	<appName>
Context Manager Endpoint CRUD Lifecycle Management GET	/contexts	-	lists	all	current	contexts POST	/contexts/<name>	-	creates	a	new	context DELETE	/contexts/<name>	-	stops	a	context	and	all	jobs	running	in	it
Job Manager Endpoint Configuration Job repository Tracking Lifecycle management GET	/jobs?limit=N	-	Lists	the	last	N	jobs POST	/jobs	-	Starts	a	new	job;	‘sync=true’	to	wait GET	/jobs/<jobId>	-	Gets	the	result	or	status	of	a	job DELETE	/jobs/<jobId>	-	Kills	the	job GET	/jobs/<jobId>/config	-	Gets	the	job	configuration
Creating Spark Jobs #Ad-hoc jobs through hydra - Run-once jobs with transient curl --data-binary @/etc/local/hydra/video-segment-fx.jar localhost:9091/jars/segment curl -d "kafka.topic=segment" 'localhost:9091/jobs?appName=segment&classPath=hydra.SegmentJob&sync=false' { "status": "STARTED", "result": { "jobId": "3156120b-f001-56cf-d22a-b40ebf0a9af1", "context": "f5ed0ec1-hydra.spark.analytics.segment.SegmentJob" } }
Persistent Context Jobs #Required for related jobs #Create a new context curl -X POST 'localhost:9091/contexts/video-032116-ctx?num-cpu-cores=10&memory-per- node=512m' OK #Verify creation curl localhost:9091/contexts ["video-032116-ctx"] #Run job using the context curl -d "kafka.topic=segment" 'localhost:9091/jobs? appName=segment&classPath=hydra.SegmentJob&sync=true&context=video-032116-ctx' {    "result":{       "active-sessions":24476221    } }
Dispatcher Jobs curl -X POST localhost:9091/dispatch {    "dispatch":{       "kafka.topic":"youbora",       "offsets":{          "start":"2016-03-15T01:18:01",          "end":"2016-03-15T03:18:01"       },       "avro.schema":"http://172.16.10.220:8085/schemas/youbora",       "hdfs-serializer":{          "output.path":"/tmp/hydra/youbora/${c:offsets.start}"       }    } } localhost
Dispatcher Jobs curl -X POST localhost:9091/dispatch {    "dispatch":{       "kafka.topic":"youbora",       "offsets":{          "start":"2016-03-15T01:18:01",          "end":"2016-03-15T03:18:01"       },       "avro.schema":"http://172.16.10.220:8085/schemas/youbora",       "hdfs-serializer":{          "url":"jdbc:postgresql://localhost/test",          "driver":"org.postgresql.Driver",          "username":"${c:dispatchers.yb_pg_username}",          "password":"${c:dispatchers.yb_pg_password}",          "sql":"insert into video_raw values ('${total}','${type}','${subtype}','${user_id}','${ip}','$ {country}','${city}','${start}')"       }    } } localhost
Job Status #Job Results/Status curl localhost:9091/jobs/3156120b-f001-56cf-d22a-b40ebf0a9af1 {    "duration":"16.264 secs",    "classPath":"hydra.spark.analytics.segment.SegmentJob",    "startTime":"2016-03-26T01:18:01.256Z",    "context":"f5ed0ec1-hydra.spark.analytics.segment.SegmentJob",    "result":{       "active-sessions":24476221    },    "status":"FINISHED",    "jobId":"3156120b-f001-56cf-d22a-b40ebf0a9af1" }
HOW IT LOOKS LIKE IN THE WILD
Ingestion Hydra CoreIngestors HTTP Spark (Batch and Streaming) Hydra CoreDispatchers HTTP RDBMS HDFS Conductors Hydra CoreConductors HTTP Persistence :: Kafka Hydra CorePersistence HTTP AKKA Remoting 3 2 2
AWS Model vCPU Mem (GiB) SSD Storage (GB) m3.medium 1 3.75 1 x 4 m3.large 2 7.5 1 x 32 m3.xlarge 4 15 2 x 40 m3.2xlarge 8 30 2 x 80
0 500 1000 1500 2000 2500 Dec-15 Jan-16 Jan-16 Jan-16 1-Feb 3/1/16 Average Ingestions Per Second Requests
9,730 lines of Scala code Production Platform Since Jan 2016 C.I. through Jenkins and Salt Some Facts
What’s next?
Remote Handlers Ingestion Hydra CoreIngestors HTTP Handler <<REMOTE>> RegisterHandle Handler Registry Ingestion Protocol Create ZNode
roarking QUESTIONS? Thank You!
Location Transparency in Hydra Distributed by Default Akka Remoting Configuration-Driven Peer-to-Peer

Designing a Scalable Data Platform