DESIGNING A REACTIVE DATA PLATFORM: CHALLENGES, PATTERNS AND ANTI-PATTERNS Alex Silva
Me! Me! Me!
Distributed Elastic Location Agnostic Open Message Driven Self-Healing
REACTIVE
The Reactive Manifesto Responsive Elastic Resilient Message Driven
Responsiveness
Elasticity
Scaling OUT Scaling UP VS
Elasticity Asynchronous Share Nothing Divide and Conquer Location Transparency
Synchronous Messaging Inherit ordering introduces implicit back pressure on the sender 3 1 2 Synchronous
4Invalid! Asynchronous 1 2 3 Asynchronous Messaging
“The ability of something to return to its original shape, after it has been pulled, stretched, pressed, or bent.” Merriam-Webster Resiliency
What about software systems?
WHAT IF TOLD YOU IT IS COMPLEX BUT NOT THAT COMPLICATED
Software Systems are Complex Systems
“Complex systems run in degraded mode.” “Complex systems run as broken systems.” Richard Cook
Asynchronous Communication + Eventual Consistency Resilient Protocols
Failures Contained Observed Managed Reified as messages
Message Driven
Messages vs Events SAVE THIS! SOMEBODY LOGGED IN! FactsTopic Events Past AddressableSpecific Messages
REAL-TIME DATA INGESTION PLATFORM
Why Akka? Reactive Elastic Fault Tolerant Load Management Both up and out Location Transparency
Akka Actors Lightweight Reactive Asynchronous Resilient
Challenges with Akka Learning Curve Type Safety Debugging Dead Letters
Why Kafka? Distributed Log High Throughput Replicated Concurrency
Kafka Producer Producer Kafka Cluster Broker 2 Topic 1 Partition 1 Broker 1 Topic 1 Partition 0 Broker 3 Topic 1 Partition 3 Client Client Client
Why Spark? Fast! Unified Platform Functional Paradigm Rich Library Set Active Community
PATTERNS AND ANTI-PATTERNS
Ingestion Hydra CoreIngestors HTTP Spark (Batch and Streaming) Hydra CoreDispatchers HTTP RDBMS HDFS Conductors Hydra CoreConductors HTTP Persistence :: Kafka Hydra CorePersistence HTTP AKKA Remoting 3 2 2 Hydra Topology
GOOD PRACTICE: DECENTRALIZE THE PROCESSING OF KEY TASKS
HYDRA INGESTION MODULE Actor Hierarchy Supervision Kafka Gateway Message Protocol
MESSAGE HANDLERS
< META > { } /ingest Coordinator Registry Handlers Hydra Ingestion Flow
Handler Registry Monitors registered handlers for errors/stops Broadcasts messages Handler Lifecycle
GOOD PRACTICE: DESIGN AN INCREMENTAL COMMUNICATION PROTOCOL
Hydra Ingestion Protocol Publish MESSAGE HANDLERS Join STOP Validate IngestValid Invalid<<Silence>>
HEY GUYS! CHECK THIS OUT! HUH?! NICE!! BRING IT!! NAH… Publish JoinJoin Hydra Ingestion Protocol: Publish Handler Registry Message handlers
Hydra Ingestion Protocol: Validation HOW DOES IT LOOK? Validate BAD! Invalid GOOD! Valid Ingestion Coordinator Message handlers
Hydra Ingestion Protocol: Invalid Message Ingestion Coordinator Error Reporter GOT A BAD ONE ReportError Ingest
foreach handler Hydra Ingestion Protocol: Ingest SHIP IT! Ingest Encode Persist
abstract class BaseMessageHandler extends Actor with ActorConfigSupport with ActorLogging with IngestionFlow with ProducerSupport with MessageHandler { ingest { case Initialize => { //nothing required by default } case Publish(request) => { log.info(s"Publish message was not handled by ${self}. Will not join.") } case Validate(request) => { sender ! Validated } case Ingest(request) => { log.warning("Ingest message was not handled by ${self}.") sender ! HandlerCompleted } case Shutdown => { //nothing required by default } case Heartbeat => { Health.get(self).getChecks } } }
GOOD PRACTICE: HIDE AN ELASTIC POOL OF RESOURCES BEHIND ITS OWNER
Publisher Subscriber Back pressure Less of this…
RouterPublisher Workers More of this!
akka { actor { deployment { /services-manager/handler_registry/segment_handler { router = round-robin-pool optimal-size-exploring-resizer { enabled = on action-interval = 5s downsize-after-underutilized-for = 2h } } /services-manager/kafka_producer { router = round-robin-pool resizer { lower-bound = 5 upper-bound = 50 messages-per-resize = 500 } } } } }
akka { actor { deployment { /services-manager/handler_registry/segment_handler { router = round-robin-pool optimal-size-exploring-resizer { enabled = on action-interval = 5s downsize-after-underutilized-for = 2h } } } provider = "akka.cluster.ClusterRefActorProvider" } cluster { seed-nodes = ["akka.tcp://Hydra@127.0.0.1:2552","akka.tcp://hydra@172.0.0.1:2553"] } }
GOOD PRACTICE: USE SELF-DESCRIBING MESSAGES
trait KafkaMessage[K, P] { val timestamp = System.currentTimeMillis def key: K def payload: P def retryOnFailure: Boolean = true } case class JsonMessage(key: String, payload: JsonNode) extends KafkaMessage[String, JsonNode] object JsonMessage { val mapper = new ObjectMapper() def apply(key: String, json: String) = { val payload: JsonNode = mapper.readTree(json) new JsonMessage(key, payload) } } case class AvroMessage(val schema: SchemaHolder, key: String, json: String) extends KafkaMessage[String, GenericRecord] { def payload: GenericRecord = { val converter: JsonConverter[GenericRecord] = new JsonConverter[GenericRecord](schema.schema) converter.convert(json) } }
GOOD PRACTICE: PREFER BINARY DATA FORMATS FOR COMMUNICATION
Why Avro? Binary Format Space Efficient Evolutionary Schemas Automatic Tables
GOOD PRACTICE: DELEGATE AND SUPERVISE! REPEAT!
Error Kernel
Ingestion Actors: Coordinators Supervises ingestion at the request level Coordinates protocol flow Reports errors and metrics
GOOD PRACTICE: LET IT CRASH
Let it Crash Components where full restarts are always ok Transient failures are hard to find Simplified failure model
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) { case _: ActorInitializationException => akka.actor.SupervisorStrategy.Stop case _: FailedToSendMessageException => Restart case _: ProducerClosedException => Restart case _: NoBrokersForPartitionException => Escalate case _: KafkaException => Escalate case _: ConnectException => Escalate case _: Exception => Escalate } val kafkaProducerSupervisor = BackoffSupervisor.props( Backoff.onFailure( kafkaProducerProps, childName = actorName[KafkaProducerActor], minBackoff = 3.seconds, maxBackoff = 30.seconds, randomFactor = 0.2 ))
class KafkaProducerActor extends Actor with LoggingAdapter with ActorConfigSupport with NotificationSupport[KafkaMessage[Any, Any]] { import KafkaProducerActor._ implicit val ec = context.dispatcher override def preRestart(cause: Throwable, message: Option[Any]) = { //send it to itself again after the exponential delays, no Ack from Kafka message match { case Some(rp: RetryingProduce) => { notifyObservers(KafkaMessageNotDelivered(rp.msg)) val nextBackOff = rp.backOff.nextBackOff val retry = RetryingProduce(rp.topic, rp.msg) retry.backOff = nextBackOff context.system.scheduler.scheduleOnce(nextBackOff.waitTime, self, retry) } case Some(produce: Produce) => { notifyObservers(KafkaMessageNotDelivered(produce.msg)) if (produce.msg.retryOnFailure) { context.system.scheduler.scheduleOnce(initialDelay, self, RetryingProduce(produce.topic, produce.msg)) } } } } }
Monitoring through Death Watches
WHAT ABOUT SOME ANTI- PATTERNS?
NOT SO GOOD PRACTICE: BUILDING NANO SERVICES
Ingestion Hydra CoreIngestors HTTP Spark (Batch and Streaming) Hydra CoreDispatchers HTTP RDBMS HDFS Conductors Hydra CoreConductors HTTP Persistence :: Kafka Hydra CorePersistence HTTP AKKA Remoting 3 2 2 Hydra Topology
NOT SO GOOD PRACTICE: TREATING LOCATION TRANSPARENCY AS A FREE-FOR-ALL
Guaranteed Delivery in Hydra What does guaranteed delivery mean? At most once semantics Can be made stronger
Akka Remoting Peer-to-Peer Serialization Delivery Reliability Latency
The Reliable Proxy Pattern
@throws(classOf[Exception]) override def init: Future[Boolean] = Future { val useProxy = config.getBoolean(“message.proxy”,false) val ingestorPath = config.getRequiredString("ingestor.path") ingestionActor = if (useProxy) context.actorSelection(ingestorPath) else context.actorOf(ReliableIngestionProxy.props(ingestorPath)) val cHeaders = config.getOptionalList("headers") topic = config.getRequiredString("kafka.topic") headers = cHeaders match { case Some(ch) => List( ch.unwrapped.asScala.map { header => { val sh = header.toString.split(":") RawHeader(sh(0), sh(1)) } }: _* ) case None => List.empty[HttpHeader] } true }
NOT SO GOOD PRACTICE: NOT KEEPING MESSAGE PROTOCOL BOUND TO THEIR CONTEXTS
object Messages { case object ServiceStarted case class RegisterHandler(info: ActorRef) case class RegisteredHandler(name: String, handler: ActorRef) case class RemoveHandler(path: ActorPath) case object GetHandlers case object InitiateIngestion extends HydraMessage case class RequestCompleted(s: IngestionSummary) extends HydraMessage case class IngestionSummary(name:String) case class Produce(topic: String, msg: KafkaMessage[_, _], ack: Option[ActorRef]) extends HydraMessage case object HandlerTimeout extends HydraMessage case class Validate(req: HydraRequest) extends HydraMessage case class Validated(req: HydraRequest) extends HydraMessage case class NotValid(req: HydraRequest, reason: String) extends HydraMessage case object HandlingCompleted extends HydraMessage case class Publish(request: HydraRequest) case class Ingest(request: HydraRequest) case class Join(r: HydraRequest) extends HydraMessage }
class HandlerRegistry extends Actor with LoggingAdapter with ActorConfigSupport { override def receive: Receive = { ... } override val supervisorStrategy = OneForOneStrategy() { case e: Exception => { report(e) Restart } } } object HandlerRegistry { case class RegisterHandler(info: HandlerInfo) case class RegisteredHandler(name: String, handler: ActorRef) case class RemoveHandler(path: ActorPath) case object GetHandlers }
NOT SO GOOD PRACTICE: DEVELOPING OVERLY CHATTY PROTOCOLS
What’s next?
Conductors Webhooks What’s streaming into Hydra today?
0 500 1000 1500 2000 2500 Dec-15 Jan-16 Jan-16 Jan-16 1-Feb 3/1/16 Average Ingestions Per Second Requests
9,730 lines of Scala code Production Platform Since Jan 2016 C.I. through Jenkins and Salt Some Facts
roarking QUESTIONS? Thank You!

Designing a reactive data platform: Challenges, patterns, and anti-patterns

Editor's Notes

  • #12 Fundamental design principles, not something you can accomplish in five lines of code. Architectural decision/design. Four traits; one relates to each other. One of main goals was to create a technology agnostic set of concerns using a shared vocabulary. Responsive is first trait from which all others derive from. Responsiveness is as old as programming itself. it must react to failure and stay available (resilient) it must react to inputs (message-driven)
  • #13 System must react to its users (responsive) stay responsive in the face of variable load, partial outages, program failure and more. We will see that this requires adjustments in the way we think about and design our applications.
  • #14 Elastic = scalability ON DEMAND --> Just the right amount of resources to run the system That’s a moving target Scalabity is a pre-requesite for elasticity it must react to variable load conditions (elastic) - Core of architecture - Machines are limited: limited CPU power, CPU cycles, etc. Limited number of users.      - Need to process requests on more than one machine      - Usually lives in the cloud
  • #15 Scaling up == within the same system Scaling down is just as important
  • #16 Share nothing -> send messages / don’t share the same resources  increases locality and minimize contention Divide and Conquer -> multiple entities progressing to solve the problem at the same time
  • #19 Computer systems fail: software and hardware Fault Tolerance = how hard you can get hit and keep moving forward; how much you can take. Get hit, survive, and eventually you may be ok. Not always a sustainable strategy. Resilience is beyond fault tolerance  restore full functionality and not just sort of limp along. Continue as if nothing happened; heal from failure.
  • #20 Broken systems - social system, meerkats example
  • #21  Complex = many similar tasks that interacting with simple individual rules where interactions of parts produce a coherent behavior. Perfect example is a human cell.
  • #23 It doesn’t matter how beautiful your app is without resilience. Who cares about anything if the app just doesn’t work?
  • #27 We tend to shy away from thinking about failure because we label it as just too complicated. And it won’t happen right? Complicated vs. Complex System
  • #29  Complicated = many different small parts working in specific tasks. Possible but very hard to fully comprehend. Example: brain surgery
  • #30  complicated procedures like brain surgery and rocket launchings require engineer-designed blueprints, step-by-step algorithms, well-trained staff, and exquisite combinations of computer software running carefully calibrated equipment. A complicated system assumes expert and rational leaders, top-down planning, smooth implementation of policies, and a clock-like organization that runs smoothly. Work is specified and delegated to particular units.
  • #31 Complex systems are filled with hundreds of moving parts, scores of players of varied expertise and independence yet missing a “mission control” that runs all these different parts within an ever-changing political, economic, and societal environment. The result: constant adaptations in design and action.  Blueprints, technical experts, strategic plans and savvy managers simply are inadequate to get complex systems with thousands of reciprocal ties between people to operate effectively in such constantly changing and unpredictable environments. These web-like complex systems of interdependent units adapt continuously to turbulent surroundings. See the complexity of dealing with the Taliban in Afghanistan in this slide.
  • #33 https://www.washingtonpost.com/blogs/answer-sheet/wp/2014/08/08/the-difference-between-complex-and-complicated-and-why-it-matters-in-school-reform
  • #34 Focus is on data ingestion, fast. Consistency is not as important. The world is inconsistent. Things happen we don’t know about. A good aim is an eventual consistency. Scalability and Elasticity are much easier with eventual consistency Message Loss Message Reordering Message Duplication
  • #35 Resilience is by design! Bounce back from failures. It is an architectural decision from day one. Failure is NATURAL… Nothing exceptional about it; it is another state in the app lifecycle. Like start/stop. DESIGN FOR FAILURE!
  • #36 Message = asynchronous delivery+ resiliency (send the message again)
  • #37 Events are not addressed to a specific recipient. They are facts. Somebody logged in, somebody did this. Entities emit events; no specific destination. Addressable event sources. Messages have a specific recipient. can contain events.
  • #40 Akka is an unified runtime and programming model for: Scale up (Concurrency) Scale out (Remoting) Fault tolerance Akka also supplies a wide array of concurrency-paradigms, allowing users to choose the right tool for the job. Actors let you manage service failures (Supervisors), load management (back-off strategies, timeouts and processing-isolation), as well as both horizontal and vertical scalability (add more cores and/or add more machines).
  • #41 Lightweight – can create millions of them; not tied to a specific thread Resilient by design – fully encapsulated and contained Asyncrhonous message passing Fully thread safe
  • #42 All messages in Akka are ‘any’ and anything that’s not handled becomes a dead letter. Loss of type safety; can use typed actors but lose some of the flexibility around that.
  • #43 Kafka’s strengths are: Ÿ High-Throughput and Low Latency: even with very modest hardware, Kafka can support hundreds of thousands of messages per second, with latencies as low as a few milliseconds Ÿ Scalability: a Kafka cluster can be elastically and transparently expanded without downtime Ÿ Durability and Reliability: messages are persisted on disk and replicated within the cluster to prevent data loss Ÿ Fault-Tolerance: immune to machine failure in the Kafka cluster Ÿ High Concurrency: ability to simultaneously handle a large number (thousands) of diverse clients, simultaneously writing to and reading from Kafka
  • #45 Unified platform for both batch and real-time analytics
  • #77  Chaos monkey from netflix - perform arbitrary restarts Akka also does this very well with supervision strategies
  • #85 no guaranteed delivery: highest performance, least implementation overhead. At the core of the problem lies the question what exactly this guarantee shall mean: The message is sent out on the network? The message is received by the other host? The message is put into the target actor's mailbox? The message is starting to be processed by the target actor? The message is processed successfully by the target actor?
  • #86 A local message is not likely to fail, and if it does the reason was one of it was meant to fail (bounded mailbox) or a bad JVM error, e.g. an OutOfMemoryError, etc.  In particular Akka Remoting does not work transparently with Network Address Translation, Load Balancers, or in Docker containers. 
  • #87 Sending via a ReliableProxy makes the message send exactly as reliable as if the represented target were to live within the same JVM, provided that the remote actor system does not terminate. In effect, both ends (i.e. JVM and actor system) must be considered as one when evaluating the reliability of this communication channel. The benefit is that the network in-between is taken out of that equation.
  • #94 Should be: Bound to message context Protocol Delegation Supervisory needs