Building Distributed Systems in Scala A presentation to Emerging Technologies for the Enterprise April 8, 2010 – Philadelphia, PA TM
About @al3x ‣ At Twitter since 2007 ‣ Working on the Web since 1995 ‣ Co-author of Programming Scala (O’Reilly, 2009) ‣ Into programming languages, distributed systems.
About Twitter ‣ Social messaging – a new way to communicate ‣ Launched in mid-2006 ‣ Hit the mainstream in 2008 ‣ 50+ million tweets per day (600+ per second) ‣ Millions of users worldwide
Technologies Used At Twitter Languages Frameworks ‣ Ruby, JavaScript ‣ Rails ‣ Scala ‣ jQuery ‣ lil’ bit of C, Python, Java Data Storage Misc. ‣ MySQL ‣ memcached ‣ Cassandra ‣ ZooKeeper ‣ HBase (Hadoop) ‣ Jetty ‣ so much more!
Why Scala? ‣ A language that’s both fun and productive. ‣ Great performance (on par with Java). ‣ Object-oriented and functional programming, together. ‣ Ability to reuse existing Java libraries. ‣ Flexible concurrency (Actors, threads, events). ‣ A smart community with infectious momentum.
Hawkwind A case study in (re)building a distributed system in Scala.
Requirements ‣ Search for people by name, username, eventually by other attributes. ‣ Order the results some sensible way (ex: by number of followers). ‣ Offer suggestions for misspellings/alternate names. ‣ Handle case-folding and other text normalization concerns on the query string. ‣ Return results in about a second, preferably less.
Finding People on Twitter
Finding People on Twitter results
Finding People on Twitter suggestion results
Finding People on Twitter speedy! suggestion results
First Attempt: acts_as_solr ‣ Crunched on time, so we wanted the fastest route to working user search. ‣ Uses the Solr distribution/platform from Apache Lucene. ‣ Tries to make Rails integration straightforward and idiomatic. ‣ Easy to get running, hard to operationalize.
In the Interim: A Move to SOA ‣ Stopped thinking of our architecture as just a Rails app and the components that orbit it. ‣ Started building isolated services that communicate with the rest of the system via Thrift (an RPC and server framework). ‣ Allows us freedom to change the underlying implementation of services without modifying the rest of the system.
Thrift Example struct Results { 1: list<i64> people 2: string suggestion 3: i32 processingTime /* milliseconds */ 4: list<i32> timings 5: i32 totalResults } service NameSearch { Results find(1: string name, 2: i32 maxResults, 3: bool wantSuggestion) Results find_with_ranking(1: string name, 2: i32 maxResults, 3: bool wantSuggestion, 4: Ranker ranking) }
Second Attempt: Hawkwind 1 ‣ A quick (three weeks) bespoke Scala project to “stop the bleeding”. ‣ Vertically but not horizontally scalable: no sharding, no failover, machine-level redundancy. ‣ Ran into memory and disk space limits. ‣ Reused Java code but didn’t offer nice Scala wrappers or rewrites. ‣ Still, planned to grow 10x, grew 25x!
Goals for Hawkwind 2 ‣ Horizontally scalable: sharded corpus, replication of shards, easy to grow the service. ‣ Faster. ‣ Higher-quality results. ‣ Better use of Scala (language features, programming style). ‣ Maintainable code base, make it easy to add features.
High-Level Concepts ‣ Shards: pieces of the user corpus. ‣ Replicas: copies of shards. ‣ Document Servers. ‣ Merge Servers. ‣ Every machine gets the same code, can be either a Document Server or a Merge Server.
Hawkwind 2 Internet High-Level queries for users, API requests Architecture Rails Cluster Thrift call to semi-random Merge Server Merge Merge Merge Server Server Server Thrift calls to semi-random replica of each shard Shard 1 Shard 1 Shard 2 Shard 2 Shard 3 Shard 3 Doc Server Doc Server Doc Server Doc Server Doc Server Doc Server periodic deliveries of sharded user corpus Hadoop (HBase)
Taking Care of Data ‣ A Hadoop job gathers up the user data and slices it into shards. ‣ A cron job fetches these data dumps several times per day. ‣ To load a new corpus on a Document Server, simply restart the process. ‣ Redundancy and staggered scheduling keeps the system from running too hot while restarts are in progress.
What a Document Server does ‣ On startup, load Thrift serialized User objects. ‣ Populate an Inverted Index, Map, and Trie with normalized attributes of those User objects. ‣ Once ready, listen for queries. ‣ Answering a query basically means looking stuff up in those pre-populated data structures. ‣ Maintains a connection pool for Thrift requests, wrapping org.apache.commons.pool.
What a Merge Server does ‣ Gets queries. ‣ Fans out queries to Document Servers. ‣ Waits for queries to come back using a custom ParallelFuture class, which wraps a number of java.util.concurrent classes. ‣ Merges together the result sets, re-ranks them, and ships ‘em back to the requesting client.
How to model a distributed system? ‣ Literal decomposition: classes for all architectural components (Shard, Replica, etc.). ‣ Each component knows/does as little as possible. ‣ Isolate mutable state, test carefully. ‣ Cleanly delegate calls.
Literal Decomposition: Replica case class Replica(val shard: Shard, val server: Server) { private val log = Logger.get val BACKEND_ERROR = Stats.getCounter("backend_timeout") def query(q: Query): DocResults = w3c.time("replica-query") { server.thriftCall { client => // logic goes here } } def ping(): Boolean = server.thriftCall { client => log.debug("calling ping via thrift for %s", server) val rv = client.ping() log.debug("ping returned %s from %s", rv, server) rv } }
Literal Decomposition: Server case class Server(val hostname: String, val port: Int) { val pool = ConnectionPool(hostname, port) private val log = Logger.get def thriftCall[A](f: Client => A) = { log.debug("making thriftCall for server %s", this) pool.withClient { client => f(client) } } def replica: Replica = { Replica(ShardMap.serversToShards(this), this) } }
Hawkwind 2 Query Call MergeLayer.query Graph ShardMap.query shard.replicaManager ! query shard.query randomReplica() replica.query server.thriftCall NameSearchDocumentLayerClient.find
Hawkwind 2 Query Call MergeLayer.query Graph what’s this? ShardMap.query shard.replicaManager ! query shard.query randomReplica() replica.query server.thriftCall NameSearchDocumentLayerClient.find
ShardMap: Isolating Mutable State ‣ A singleton and an Actor. ‣ Contains a map from Servers to their corresponding Shards. ‣ Also contains a map from Shards to the Replicas of those shards. ‣ Responsible for populating and managing those maps. ‣ Send it a message to evict or reinsert a Replica. ‣ Fans out queries to Shards.
ReplicaHealthChecker ‣ Much like the ShardMap, a singleton and an Actor. ‣ Maintains mutable lists of unhealthy Replicas (“the penalty box”). ‣ Constantly checking to see if evicted Replicas are healthy again (back online). ‣ Sends messages to itself – an effective Actor technique.
Challenges, Large and Small ‣ Fast importing of huge serialized Thrift object dumps. ‣ Testing the ShardMap and ReplicaHealthChecker (mutable state wants to hurt you). ‣ Efficient accent normalization and filtering for special characters. ‣ Working with the Apache Commons object pool. ‣ Breaking out different ranking mechanisms in a clean, reusable way.
Libraries & Tools Things that make working in Scala way more productive.
sbt – the Simple Build Tool ‣ Scala’s answer to Ant and Maven. ‣ Sets up new projects. ‣ Maintains project configuration, build tasks, and dependencies in pure Scala. Totally open- ended. ‣ Interactive console. ‣ Will run tasks as soon as files in your project change – automatically compile and run tests!
Ostrich ‣ Gather statistics about your application. ‣ Counters, gauges, and timings. ‣ Share stats via JMX, a plain-text socket, a web interface, or log files. ‣ Ex: Stats.time("foo") { timeConsumingOperation() }
Configgy ‣ Manages configuration files and logging. ‣ Flexible file format, can include files in other files. ‣ Inheritance, variable substitution. ‣ Tunable logging, logging with Scribe. ‣ Subscription API: push and validate configuration changes to running processes. ‣ Ex: val foo = config.getString(“foo”)
Specs + xrayspecs ‣ A behavior-driven development (BDD) testing framework for Scala. ‣ Elegant, readable, fun-to-write tests. ‣ Support for several mocking frameworks (we like Mockito). ‣ Test concurrent operations, time, much more. ‣ Ex: "suggestion with a List of null does not blow up" in { MergeLayer.suggestion("steve", List(null)) mustEqual None }
Questions? Follow me at twitter.com/al3x Learn with us at engineering.twitter.com Work with us at jobs.twitter.com TM

Building Distributed Systems in Scala

  • 1.
    Building Distributed Systemsin Scala A presentation to Emerging Technologies for the Enterprise April 8, 2010 – Philadelphia, PA TM
  • 2.
    About @al3x ‣ At Twitter since 2007 ‣ Working on the Web since 1995 ‣ Co-author of Programming Scala (O’Reilly, 2009) ‣ Into programming languages, distributed systems.
  • 3.
    About Twitter ‣ Social messaging – a new way to communicate ‣ Launched in mid-2006 ‣ Hit the mainstream in 2008 ‣ 50+ million tweets per day (600+ per second) ‣ Millions of users worldwide
  • 4.
    Technologies Used AtTwitter Languages Frameworks ‣ Ruby, JavaScript ‣ Rails ‣ Scala ‣ jQuery ‣ lil’ bit of C, Python, Java Data Storage Misc. ‣ MySQL ‣ memcached ‣ Cassandra ‣ ZooKeeper ‣ HBase (Hadoop) ‣ Jetty ‣ so much more!
  • 5.
    Why Scala? ‣ A language that’s both fun and productive. ‣ Great performance (on par with Java). ‣ Object-oriented and functional programming, together. ‣ Ability to reuse existing Java libraries. ‣ Flexible concurrency (Actors, threads, events). ‣ A smart community with infectious momentum.
  • 6.
    Hawkwind A case studyin (re)building a distributed system in Scala.
  • 7.
    Requirements ‣ Search for people by name, username, eventually by other attributes. ‣ Order the results some sensible way (ex: by number of followers). ‣ Offer suggestions for misspellings/alternate names. ‣ Handle case-folding and other text normalization concerns on the query string. ‣ Return results in about a second, preferably less.
  • 8.
  • 9.
    Finding People onTwitter results
  • 10.
    Finding People onTwitter suggestion results
  • 11.
    Finding People onTwitter speedy! suggestion results
  • 12.
    First Attempt: acts_as_solr ‣ Crunched on time, so we wanted the fastest route to working user search. ‣ Uses the Solr distribution/platform from Apache Lucene. ‣ Tries to make Rails integration straightforward and idiomatic. ‣ Easy to get running, hard to operationalize.
  • 13.
    In the Interim:A Move to SOA ‣ Stopped thinking of our architecture as just a Rails app and the components that orbit it. ‣ Started building isolated services that communicate with the rest of the system via Thrift (an RPC and server framework). ‣ Allows us freedom to change the underlying implementation of services without modifying the rest of the system.
  • 14.
    Thrift Example struct Results { 1: list<i64> people 2: string suggestion 3: i32 processingTime /* milliseconds */ 4: list<i32> timings 5: i32 totalResults } service NameSearch { Results find(1: string name, 2: i32 maxResults, 3: bool wantSuggestion) Results find_with_ranking(1: string name, 2: i32 maxResults, 3: bool wantSuggestion, 4: Ranker ranking) }
  • 15.
    Second Attempt: Hawkwind1 ‣ A quick (three weeks) bespoke Scala project to “stop the bleeding”. ‣ Vertically but not horizontally scalable: no sharding, no failover, machine-level redundancy. ‣ Ran into memory and disk space limits. ‣ Reused Java code but didn’t offer nice Scala wrappers or rewrites. ‣ Still, planned to grow 10x, grew 25x!
  • 16.
    Goals for Hawkwind2 ‣ Horizontally scalable: sharded corpus, replication of shards, easy to grow the service. ‣ Faster. ‣ Higher-quality results. ‣ Better use of Scala (language features, programming style). ‣ Maintainable code base, make it easy to add features.
  • 17.
    High-Level Concepts ‣ Shards: pieces of the user corpus. ‣ Replicas: copies of shards. ‣ Document Servers. ‣ Merge Servers. ‣ Every machine gets the same code, can be either a Document Server or a Merge Server.
  • 18.
    Hawkwind 2 Internet High-Level queries for users, API requests Architecture Rails Cluster Thrift call to semi-random Merge Server Merge Merge Merge Server Server Server Thrift calls to semi-random replica of each shard Shard 1 Shard 1 Shard 2 Shard 2 Shard 3 Shard 3 Doc Server Doc Server Doc Server Doc Server Doc Server Doc Server periodic deliveries of sharded user corpus Hadoop (HBase)
  • 19.
    Taking Care ofData ‣ A Hadoop job gathers up the user data and slices it into shards. ‣ A cron job fetches these data dumps several times per day. ‣ To load a new corpus on a Document Server, simply restart the process. ‣ Redundancy and staggered scheduling keeps the system from running too hot while restarts are in progress.
  • 20.
    What a DocumentServer does ‣ On startup, load Thrift serialized User objects. ‣ Populate an Inverted Index, Map, and Trie with normalized attributes of those User objects. ‣ Once ready, listen for queries. ‣ Answering a query basically means looking stuff up in those pre-populated data structures. ‣ Maintains a connection pool for Thrift requests, wrapping org.apache.commons.pool.
  • 21.
    What a MergeServer does ‣ Gets queries. ‣ Fans out queries to Document Servers. ‣ Waits for queries to come back using a custom ParallelFuture class, which wraps a number of java.util.concurrent classes. ‣ Merges together the result sets, re-ranks them, and ships ‘em back to the requesting client.
  • 22.
    How to modela distributed system? ‣ Literal decomposition: classes for all architectural components (Shard, Replica, etc.). ‣ Each component knows/does as little as possible. ‣ Isolate mutable state, test carefully. ‣ Cleanly delegate calls.
  • 23.
    Literal Decomposition: Replica caseclass Replica(val shard: Shard, val server: Server) { private val log = Logger.get val BACKEND_ERROR = Stats.getCounter("backend_timeout") def query(q: Query): DocResults = w3c.time("replica-query") { server.thriftCall { client => // logic goes here } } def ping(): Boolean = server.thriftCall { client => log.debug("calling ping via thrift for %s", server) val rv = client.ping() log.debug("ping returned %s from %s", rv, server) rv } }
  • 24.
    Literal Decomposition: Server case class Server(val hostname: String, val port: Int) { val pool = ConnectionPool(hostname, port) private val log = Logger.get def thriftCall[A](f: Client => A) = { log.debug("making thriftCall for server %s", this) pool.withClient { client => f(client) } } def replica: Replica = { Replica(ShardMap.serversToShards(this), this) } }
  • 25.
    Hawkwind 2 Query Call MergeLayer.query Graph ShardMap.query shard.replicaManager ! query shard.query randomReplica() replica.query server.thriftCall NameSearchDocumentLayerClient.find
  • 26.
    Hawkwind 2 Query Call MergeLayer.query Graph what’s this? ShardMap.query shard.replicaManager ! query shard.query randomReplica() replica.query server.thriftCall NameSearchDocumentLayerClient.find
  • 27.
    ShardMap: Isolating MutableState ‣ A singleton and an Actor. ‣ Contains a map from Servers to their corresponding Shards. ‣ Also contains a map from Shards to the Replicas of those shards. ‣ Responsible for populating and managing those maps. ‣ Send it a message to evict or reinsert a Replica. ‣ Fans out queries to Shards.
  • 28.
    ReplicaHealthChecker ‣ Much like the ShardMap, a singleton and an Actor. ‣ Maintains mutable lists of unhealthy Replicas (“the penalty box”). ‣ Constantly checking to see if evicted Replicas are healthy again (back online). ‣ Sends messages to itself – an effective Actor technique.
  • 29.
    Challenges, Large andSmall ‣ Fast importing of huge serialized Thrift object dumps. ‣ Testing the ShardMap and ReplicaHealthChecker (mutable state wants to hurt you). ‣ Efficient accent normalization and filtering for special characters. ‣ Working with the Apache Commons object pool. ‣ Breaking out different ranking mechanisms in a clean, reusable way.
  • 30.
    Libraries & Tools Thingsthat make working in Scala way more productive.
  • 31.
    sbt – theSimple Build Tool ‣ Scala’s answer to Ant and Maven. ‣ Sets up new projects. ‣ Maintains project configuration, build tasks, and dependencies in pure Scala. Totally open- ended. ‣ Interactive console. ‣ Will run tasks as soon as files in your project change – automatically compile and run tests!
  • 32.
    Ostrich ‣ Gather statistics about your application. ‣ Counters, gauges, and timings. ‣ Share stats via JMX, a plain-text socket, a web interface, or log files. ‣ Ex: Stats.time("foo") { timeConsumingOperation() }
  • 33.
    Configgy ‣ Manages configuration files and logging. ‣ Flexible file format, can include files in other files. ‣ Inheritance, variable substitution. ‣ Tunable logging, logging with Scribe. ‣ Subscription API: push and validate configuration changes to running processes. ‣ Ex: val foo = config.getString(“foo”)
  • 34.
    Specs + xrayspecs ‣ A behavior-driven development (BDD) testing framework for Scala. ‣ Elegant, readable, fun-to-write tests. ‣ Support for several mocking frameworks (we like Mockito). ‣ Test concurrent operations, time, much more. ‣ Ex: "suggestion with a List of null does not blow up" in { MergeLayer.suggestion("steve", List(null)) mustEqual None }
  • 35.
    Questions? Follow me at twitter.com/al3x Learn with us at engineering.twitter.com Work with us at jobs.twitter.com TM

Editor's Notes

  • #24 This is literally all there is to this class!