Connecting Akka to the Rest of the World System integration with Akka and Apache Camel by Martin Krasser
About me • Freelance software engineer, architect – eHealth application integration (Java, Scala, Groovy) – Bioinformatics, Cheminformatics (C++, Java) – … • Open source committer – Akka – Camel – Scalaz-Camel – … • Java since 1998 • Scala since 2009 • Twitter: @mrt1nz
Overview • Introduction – Camel – Akka • Akka-Camel integration – Consumer actors – Producer actors – Actor components • Comparison – Camel routes – Akka-Camel routes – Scalaz-Camel routes
Apache Camel • Integration framework written in Java – Open source, Apache 2 license – http://camel.apache.org • Enterprise Integration Patterns (EIP) – G. Hohpe, B. Woolf • Domain-specific language (DSL) – Internal: Java, Scala – External: XML/Spring • Pluggable protocol and API bindings – HTTP, FTP, JMS, File … and approx. 100 more – Camel “components” http://camel.apache.org/components.html
Apache Camel • Usage in Scala applications – Camel Java/Scala DSL • http://camel.apache.org – Akka-Camel integration module • http://akka.io/docs/akka-modules/1.1.2/modules/camel.html – Scalaz-Camel DSL • https://github.com/krasserm/scalaz-camel
Apache Camel • Architecture
Apache Camel • Routing example D D HTTP jms:queue:docs JMS HTTP Filter Endpoint Endpoint http://example.org/docs Java DSL: from("jms:queue:docs") .filter().xpath("/person[@name='Jon']") .to("http:example.org/docs"); Scala DSL: "jms:queue:docs" when(_.in == "…") to("http:example.org/docs");
Apache Camel • Messaging endpoints – Configured via endpoint URI • "jms:queue:docs" • "http://example.org/docs" • … – Endpoint consumer: from(uri) – Endpoint producer: to(uri) • Message processors … – Configured via fluent API • filter().xpath("…") • transform(body().append("…")) • … – Custom Java/Scala code (Processor interface)
Akka • Platform for event-driven, scalable and fault- tolerant architectures on the JVM • Concurrency – Actors, agents, dataflow, STM • Scalability Core – Remote actors, cluster membership Services • Fault tolerance – Local and remote actor supervision
Akka • Add-on modules – akka-camel • Camel integration (untyped actors) – akka-camel-typed • Camel integration (typed actors) – akka-kernel • Microkernel – akka-scalaz • Scalaz support – akka-spring • Spring integration – …
Akka • Actor model – Mathematical model of concurrent computation – Carl Hewitt, 1973 • Akka actor – Encapsulates state – Encapsulates behavior – Exchanges messages – Mailbox – Event-driven – Single-threaded • API – Scala – Java
Akka • Actor model benefits – Easier to reason about – Raised abstraction level – Easier to avoid • Race conditions • Dead locks • Starvation • Live locks
Akka Actors • Receive messages import akka.actor.Actor // actor definition class ExampleActor extends Actor { def receive = { case "test" => … case _ => … } } // create actor reference (ActorRef) val actor = Actor.actorOf[ExampleActor] // start actor actor.start
Akka Actors • Send messages – Fire-Forget actor ! "test"
Akka Actors • Send messages – Send-And-Receive-Eventually actor !! "test" match { case Some(result) => … // process result case None => … // timeout } uses Future under the hood (with timeout)
Akka Actors • Send messages – Send-And-Receive-Future val future = actor !!! "test" future.await val result = future.get returns Future directly
Overview • Introduction – Camel – Akka • Akka-Camel integration – Consumer actors – Producer actors – Actor components • Comparison – Camel routes – Akka-Camel routes – Scalaz-Camel routes
Akka-Camel Integration • Exchange messages with Akka actors via – HTTP, FTP, JMS, File … and approx. 100 more – Any Camel component can be used (pluggable) • Integration layer of Akka – Consumer actors: receive message from Camel endpoints – Producer actors: produce messages to Camel endpoints – Message exchange patterns (MEPs): in-out and in-only • Leverages Camel’s asynchronous routing engine – Asynchronous completion of message exchange – No threads blocked waiting for response messages, for example
Consumer Actors • Basic example – TCP consumer • import akka.actor._ import akka.camel._ class TcpConsumer extends Actor with Consumer { def endpointUri = "mina:tcp://localhost:6200?textline=true" def receive = { case Message(body, headers) => { self.reply("received %s" format body) } } } – Camel component: camel-mina – MEP: in-out
Consumer Actors • Basic example – TCP consumer activation import akka.actor._ import akka.camel._ // Consumer actors require a running CamelService CamelServiceManager.startCamelService // Activate TCP endpoint (asynchronously) Actor.actorOf[TcpConsumer].start – Asynchronous activation of consumer endpoint
Consumer Actors • Basic example – TCP consumer activation import akka.actor._ import akka.camel._ // Consumer actors require a running CamelService val service = CamelServiceManager.startCamelService // Wait for TCP endpoint activation service.awaitEndpointActivation(1) { Actor.actorOf[TcpConsumer].start } – Wait for endpoint activation – Useful for testing purposes
Consumer Actors • Basic example – TCP consumer deactivation import akka.actor._ import akka.camel._ val service: CamelService = … val tcpConsumer: ActorRef = … // deactivate TCP endpoint (asynchronously) tcpConsumer.stop // … or wait for TCP endpoint deactivation service.awaitEndpointDeactivation(1) { tcpConsumer.stop }
Consumer Actors • Basic example – HTTP consumer class HttpConsumer extends Actor with Consumer { def endpointUri = "jetty:http://localhost:8080/example" def receive = { case msg: Message => { self.reply("received %s" format msg.bodyAs[String]) } } } – Camel component: camel-jetty – MEP: in-out – Jetty continuations used internally – Camel type converter (msg.bodyAs[…])
Consumer Actors • Basic example – JMS consumer class JMSConsumer extends Actor with Consumer { def endpointUri = "jms:queue:example" def receive = { case Message(body, headers) => ... } } – Camel component: camel-jms – MEP: in-only
Consumer Actors • Basic example – IMAP consumer class MailConsumer extends Actor with Consumer { def endpointUri = "imap://admin@mymailserver.com?password=secret" def receive = { case Message(body, headers) => ... } } – Camel component: camel-mail – MEP: in-only
Consumer Actors • Basic example – Scheduled consumer class ScheduledConsumer extends Actor with Consumer { def endpointUri = "quartz://example?cron=0/2+*+*+*+*+?" def receive = { case tick => ... } } – Camel component: camel-quartz – MEP: in-only
Consumer Actors • Acknowledgements import akka.camel.Ack ... class FileConsumer extends Actor with Consumer { override def autoack = false // default is true def endpointUri = "file:messages/in?delete=true" def receive = { case Message(body, headers) => { // ... self.reply(Ack) // delete consumed file } } } – Camel component: camel-file – MEP: in-only – Application-level acknowledgement (Ack)
Consumer Actors • Failure replies import akka.camel.Failure ... class FileConsumer extends Actor with Consumer { override def autoack = false // default is true def endpointUri = "file:messages/in?delete=true" def receive = { case Message(body, headers) => { // ... self.reply(Failure(reason)) // redeliver file } } } – Application-level negative acknowledgement (Failure) – Redelivery works also with JMS endpoints, for example – Failure replies can also be used with in-out MEP
Consumer Actors • Failure replies – Should not be made within receive – Let consumer actors crash (on Exception) – Use a supervisor for failure replies
Consumer Actors • Supervised consumer class SupervisedFileConsumer extends Actor with Consumer { override def autoack = false def endpointUri = "file:messages/in?delete=true" def receive = { case Message(body, headers) => { // if exception thrown: actor is restarted or stopped // else self.reply(Ack) // delete file } } override def preRestart(reason: scala.Throwable) { self.reply_?(Failure(reason)) // redeliver file } override def postStop() { self.reply_?(Failure(…)) // or Ack to delete file } }
Consumer Actors • Supervised consumer import akka.actor._ import akka.config.Supervision._ // Create consumer actor reference val consumer = Actor.actorOf[new SupervisedFileConsumer] // Create supervisor for consumer actor val supervisor = Supervisor( SupervisorConfig( OneForOneStrategy( List(classOf[Exception]), 5, 10000), Supervise(consumer, Permanent) :: Nil)) – Restart on any Exception – 5 restart attempts within 10 seconds
Consumer Actors • Simplified failure reporting – override def blocking = true – Pros • No need for setting up a supervisor • No need to catch exception within receive • No need to self.reply(Failure(…)) – Cons • Endpoint communicates with actor via !! • Thread blocked waiting for Ack
Consumer Actors • Simplified failure reporting class FileConsumer extends Actor with Consumer { override def blocking = true override def autoack = false def endpointUri = "file:messages/in?delete=true" def receive = { case Message(body, headers) => { // if exception thrown: endpoint receives it // else self.reply(Ack) // delete file } } }
Typed Consumer Actors • Basic example – HTTP/JMS consumer import akka.actor._ import akka.camel._ import org.apache.camel.{Body, Header} trait TypedConsumer { @consume("jetty:http://localhost:8080/example") // in-out def foo(s: String): String @consume("jms:queue:example") // in-only def bar (@Body s: String, @Header("priority") p: Integer) : Unit } class TypedConsumerImpl extends TypedActor with TypedConsumer { def foo(s: String) = "received %s" format s def bar(s: String, p: Integer) = println("received %s (priority = %d)" format(s, p)) }
Typed Consumer Actors • Annotations – Consumer endpoints • @consume(endpointURI) – Parameter bindings • @Body • @Header •…
Typed Consumer Actors • Basic example – HTTP/JMS consumer activation import akka.actor._ import akka.camel._ val service = CamelServiceManager.startCamelService // Activate HTTP and JMS endpoints (asynchronously) TypedActor.newInstance( classOf[TypedConsumer], classOf[TypedConsumerImpl]) // … or wait for HTTP and JMS endpoint activation service.awaitEndpointActivation(2) { TypedActor.newInstance( classOf[TypedConsumer], classOf[TypedConsumerImpl]) }
Producer Actors • Basic example – HTTP producer import akka.actor._ import akka.camel._ class HttpProducer extends Actor with Producer { def endpointUri = "jetty:http://localhost:8080/example" } – Camel component: camel-jetty – MEP: in-out – receive inherited from Producer trait – Jetty’s async HTTP client used internally
Producer Actors • Basic example – HTTP producer activation import akka.actor._ import akka.camel._ // Producer actors require an initialized CamelContext CamelServiceManager.startCamelService // activate producer actor val producer = Actor.actorOf[HttpProducer].start // POST test message to http://localhost:8080/example producer !! "test" match { case Some(m: Message) => … case Some(f: Failure) => … case None => … } // ! and !!! can also be used
Producer Actors • Basic example – JMS producer class JmsProducer extends Actor with Producer { def endpointUri = "jms:queue:example" override def oneway = true } – Camel component: camel-jms – MEP: in-only
Producer Actors • oneway = false (default) – Initiates an in-out ME with endpoint – Replies result to initial sender • oneway = true – Initiates an in-only ME with endpoint – No reply to sender • receiveAfterProduce – Change default reply behaviour
Producer Actors • Custom replies class JmsProducer extends Actor with Producer { def endpointUri = "jms:queue:example" override def oneway = true override def receiveAfterProduce = { case m: Message => self.reply("enqueue succeeded") case f: Failure => self.reply("enqueue failed") } }
Producer Actors • Forward results class HttpProducer extends Actor with Producer { val target: ActorRef = … def endpointUri = "jetty:http://localhost:8080/example" override def receiveAfterProduce = { case msg => target forward msg } } – Chaining of producer actors (pipelines) – Related: Future composition
Producer Actors • Future composition val producer1 = Actor.actorOf[HttpProducer1].start val producer2 = Actor.actorOf[HttpProducer2].start // monadic future composition (non-blocking) val future = for { m1: Message <- producer1 !!! Message("test") m2: Message <- producer2 !!! m1 } yield m2 // blocks until result is available – Producer Message val result: pipeline = future.get
Actor Components • Are Camel components – Can be used in any Camel route • actor Camel component – Send messages to untyped actors – Provided by akka-camel module • typed-actor Camel component – Send messages to typed actors – Provided by akka-camel-typed module – Extension of Camel’s bean component • Used by akka-camel internally – Routes to consumer actors
Actor Components • actor endpoint URI – actor:uuid:[<actor-uuid>][?<params>] • Parameters – autoack: Boolean • System or application-level acknowledgements – blocking: Boolean • Use ! or !! for sending messages to actor • Supported message headers – CamelActorIdentifier • Dynamic routing to actors
Actor Components • Example import akka.actor._ // can be any actor (no need for Consumer) val actor = Actor.actorOf[SomeActor].start // ... // Camel route from JMS endpoint to actor from("jms:queue:example") .to("actor:uuid:%s?autoack=false" format actor.uuid) – actor receives messages of type Message
Overview • Introduction – Camel – Akka • Akka-Camel integration – Consumer actors – Producer actors – Actor components • Comparison – Camel routes – Akka-Camel routes – Scalaz-Camel routes
Comparison • Criteria – Connectivity – Message processing – Route composition – Other • High-level • Incomplete
Camel Routes • Connectivity – Camel components/endpoints • Message processing – Processor interface • Predefined processors (all known EIPs) • Custom processors – Concurrent execution of Processor instances – Mutable messages
Camel Routes • Route composition – Camel DSL (Java, Scala, XML) • Other – No built-in mechanism for distributing routes (except via endpoints) – Distribution addressed by ServiceMix and FuseSource Fabric, for example
Akka-Camel Routes • Connectivity – Camel components/endpoints managed by • Consumer actors • Producer actors • Message processing – Actor • Predefined (a few in akka.actor.routing) • Custom – Sequential execution of actor instance – Immutable messages
Akka-Camel Routes • Route composition – Wiring actors (low-level) – Future composition – … – No integration DSL (yet) • Other – Easy to implementing stateful EIPs (aggregator, resequencer …) – Strong built-in mechanisms for distribution, scalability and fault-tolerance – Basis for a distributed and scalable Enterprise Service Bus (?)
Scalaz-Camel Routes • Connectivity – Camel components/endpoints • Message processing – Scala functions • Predefined (some EIPs) • Custom – Concurrent execution – Immutable messages
Scalaz-Camel Routes • Route composition – Based on functional programming concepts • Message processors chained via Kleisli composition (>=>) – Continuation-based approach • scala.Responder used as continuation monad – Direct-style DSL • Other – Configurable concurrency strategies – Camel processors can be re-used – Akka integration
Scalaz-Camel Routes • Example // custom message processor val validate: MessageProcessor = ... // Kleisli composition of route (+ implicit conversions) val vroute = validate >=> oneway >=> to("jms:queue:valid") >=> { m: Message => m.setBody("order accepted") } // consume messages via HTTP and error handling routes from(jetty:http://localhost:8080/orders) attempt { vroute } fallback { case e: ValidationException => { m: Message => ... } >=> failWith(e) case _ => ... } – Details at https://github.com/krasserm/scalaz-camel
THANK YOU!

System Integration with Akka and Apache Camel

  • 1.
    Connecting Akka tothe Rest of the World System integration with Akka and Apache Camel by Martin Krasser
  • 2.
    About me • Freelancesoftware engineer, architect – eHealth application integration (Java, Scala, Groovy) – Bioinformatics, Cheminformatics (C++, Java) – … • Open source committer – Akka – Camel – Scalaz-Camel – … • Java since 1998 • Scala since 2009 • Twitter: @mrt1nz
  • 3.
    Overview • Introduction – Camel – Akka • Akka-Camel integration – Consumer actors – Producer actors – Actor components • Comparison – Camel routes – Akka-Camel routes – Scalaz-Camel routes
  • 4.
    Apache Camel • Integrationframework written in Java – Open source, Apache 2 license – http://camel.apache.org • Enterprise Integration Patterns (EIP) – G. Hohpe, B. Woolf • Domain-specific language (DSL) – Internal: Java, Scala – External: XML/Spring • Pluggable protocol and API bindings – HTTP, FTP, JMS, File … and approx. 100 more – Camel “components” http://camel.apache.org/components.html
  • 5.
    Apache Camel • Usagein Scala applications – Camel Java/Scala DSL • http://camel.apache.org – Akka-Camel integration module • http://akka.io/docs/akka-modules/1.1.2/modules/camel.html – Scalaz-Camel DSL • https://github.com/krasserm/scalaz-camel
  • 6.
  • 7.
    Apache Camel • Routingexample D D HTTP jms:queue:docs JMS HTTP Filter Endpoint Endpoint http://example.org/docs Java DSL: from("jms:queue:docs") .filter().xpath("/person[@name='Jon']") .to("http:example.org/docs"); Scala DSL: "jms:queue:docs" when(_.in == "…") to("http:example.org/docs");
  • 8.
    Apache Camel • Messagingendpoints – Configured via endpoint URI • "jms:queue:docs" • "http://example.org/docs" • … – Endpoint consumer: from(uri) – Endpoint producer: to(uri) • Message processors … – Configured via fluent API • filter().xpath("…") • transform(body().append("…")) • … – Custom Java/Scala code (Processor interface)
  • 9.
    Akka • Platform forevent-driven, scalable and fault- tolerant architectures on the JVM • Concurrency – Actors, agents, dataflow, STM • Scalability Core – Remote actors, cluster membership Services • Fault tolerance – Local and remote actor supervision
  • 10.
    Akka • Add-on modules – akka-camel • Camel integration (untyped actors) – akka-camel-typed • Camel integration (typed actors) – akka-kernel • Microkernel – akka-scalaz • Scalaz support – akka-spring • Spring integration – …
  • 11.
    Akka • Actor model – Mathematical model of concurrent computation – Carl Hewitt, 1973 • Akka actor – Encapsulates state – Encapsulates behavior – Exchanges messages – Mailbox – Event-driven – Single-threaded • API – Scala – Java
  • 12.
    Akka • Actor modelbenefits – Easier to reason about – Raised abstraction level – Easier to avoid • Race conditions • Dead locks • Starvation • Live locks
  • 13.
    Akka Actors • Receivemessages import akka.actor.Actor // actor definition class ExampleActor extends Actor { def receive = { case "test" => … case _ => … } } // create actor reference (ActorRef) val actor = Actor.actorOf[ExampleActor] // start actor actor.start
  • 14.
    Akka Actors • Sendmessages – Fire-Forget actor ! "test"
  • 15.
    Akka Actors • Sendmessages – Send-And-Receive-Eventually actor !! "test" match { case Some(result) => … // process result case None => … // timeout } uses Future under the hood (with timeout)
  • 16.
    Akka Actors • Sendmessages – Send-And-Receive-Future val future = actor !!! "test" future.await val result = future.get returns Future directly
  • 17.
    Overview • Introduction – Camel – Akka • Akka-Camel integration – Consumer actors – Producer actors – Actor components • Comparison – Camel routes – Akka-Camel routes – Scalaz-Camel routes
  • 18.
    Akka-Camel Integration • Exchangemessages with Akka actors via – HTTP, FTP, JMS, File … and approx. 100 more – Any Camel component can be used (pluggable) • Integration layer of Akka – Consumer actors: receive message from Camel endpoints – Producer actors: produce messages to Camel endpoints – Message exchange patterns (MEPs): in-out and in-only • Leverages Camel’s asynchronous routing engine – Asynchronous completion of message exchange – No threads blocked waiting for response messages, for example
  • 19.
    Consumer Actors •Basic example – TCP consumer • import akka.actor._ import akka.camel._ class TcpConsumer extends Actor with Consumer { def endpointUri = "mina:tcp://localhost:6200?textline=true" def receive = { case Message(body, headers) => { self.reply("received %s" format body) } } } – Camel component: camel-mina – MEP: in-out
  • 20.
    Consumer Actors • Basicexample – TCP consumer activation import akka.actor._ import akka.camel._ // Consumer actors require a running CamelService CamelServiceManager.startCamelService // Activate TCP endpoint (asynchronously) Actor.actorOf[TcpConsumer].start – Asynchronous activation of consumer endpoint
  • 21.
    Consumer Actors • Basicexample – TCP consumer activation import akka.actor._ import akka.camel._ // Consumer actors require a running CamelService val service = CamelServiceManager.startCamelService // Wait for TCP endpoint activation service.awaitEndpointActivation(1) { Actor.actorOf[TcpConsumer].start } – Wait for endpoint activation – Useful for testing purposes
  • 22.
    Consumer Actors • Basicexample – TCP consumer deactivation import akka.actor._ import akka.camel._ val service: CamelService = … val tcpConsumer: ActorRef = … // deactivate TCP endpoint (asynchronously) tcpConsumer.stop // … or wait for TCP endpoint deactivation service.awaitEndpointDeactivation(1) { tcpConsumer.stop }
  • 23.
    Consumer Actors • Basicexample – HTTP consumer class HttpConsumer extends Actor with Consumer { def endpointUri = "jetty:http://localhost:8080/example" def receive = { case msg: Message => { self.reply("received %s" format msg.bodyAs[String]) } } } – Camel component: camel-jetty – MEP: in-out – Jetty continuations used internally – Camel type converter (msg.bodyAs[…])
  • 24.
    Consumer Actors • Basicexample – JMS consumer class JMSConsumer extends Actor with Consumer { def endpointUri = "jms:queue:example" def receive = { case Message(body, headers) => ... } } – Camel component: camel-jms – MEP: in-only
  • 25.
    Consumer Actors • Basicexample – IMAP consumer class MailConsumer extends Actor with Consumer { def endpointUri = "imap://admin@mymailserver.com?password=secret" def receive = { case Message(body, headers) => ... } } – Camel component: camel-mail – MEP: in-only
  • 26.
    Consumer Actors • Basicexample – Scheduled consumer class ScheduledConsumer extends Actor with Consumer { def endpointUri = "quartz://example?cron=0/2+*+*+*+*+?" def receive = { case tick => ... } } – Camel component: camel-quartz – MEP: in-only
  • 27.
    Consumer Actors • Acknowledgements import akka.camel.Ack ... class FileConsumer extends Actor with Consumer { override def autoack = false // default is true def endpointUri = "file:messages/in?delete=true" def receive = { case Message(body, headers) => { // ... self.reply(Ack) // delete consumed file } } } – Camel component: camel-file – MEP: in-only – Application-level acknowledgement (Ack)
  • 28.
    Consumer Actors • Failurereplies import akka.camel.Failure ... class FileConsumer extends Actor with Consumer { override def autoack = false // default is true def endpointUri = "file:messages/in?delete=true" def receive = { case Message(body, headers) => { // ... self.reply(Failure(reason)) // redeliver file } } } – Application-level negative acknowledgement (Failure) – Redelivery works also with JMS endpoints, for example – Failure replies can also be used with in-out MEP
  • 29.
    Consumer Actors • Failurereplies – Should not be made within receive – Let consumer actors crash (on Exception) – Use a supervisor for failure replies
  • 30.
    Consumer Actors • Supervisedconsumer class SupervisedFileConsumer extends Actor with Consumer { override def autoack = false def endpointUri = "file:messages/in?delete=true" def receive = { case Message(body, headers) => { // if exception thrown: actor is restarted or stopped // else self.reply(Ack) // delete file } } override def preRestart(reason: scala.Throwable) { self.reply_?(Failure(reason)) // redeliver file } override def postStop() { self.reply_?(Failure(…)) // or Ack to delete file } }
  • 31.
    Consumer Actors • Supervisedconsumer import akka.actor._ import akka.config.Supervision._ // Create consumer actor reference val consumer = Actor.actorOf[new SupervisedFileConsumer] // Create supervisor for consumer actor val supervisor = Supervisor( SupervisorConfig( OneForOneStrategy( List(classOf[Exception]), 5, 10000), Supervise(consumer, Permanent) :: Nil)) – Restart on any Exception – 5 restart attempts within 10 seconds
  • 32.
    Consumer Actors • Simplifiedfailure reporting – override def blocking = true – Pros • No need for setting up a supervisor • No need to catch exception within receive • No need to self.reply(Failure(…)) – Cons • Endpoint communicates with actor via !! • Thread blocked waiting for Ack
  • 33.
    Consumer Actors • Simplifiedfailure reporting class FileConsumer extends Actor with Consumer { override def blocking = true override def autoack = false def endpointUri = "file:messages/in?delete=true" def receive = { case Message(body, headers) => { // if exception thrown: endpoint receives it // else self.reply(Ack) // delete file } } }
  • 34.
    Typed Consumer Actors •Basic example – HTTP/JMS consumer import akka.actor._ import akka.camel._ import org.apache.camel.{Body, Header} trait TypedConsumer { @consume("jetty:http://localhost:8080/example") // in-out def foo(s: String): String @consume("jms:queue:example") // in-only def bar (@Body s: String, @Header("priority") p: Integer) : Unit } class TypedConsumerImpl extends TypedActor with TypedConsumer { def foo(s: String) = "received %s" format s def bar(s: String, p: Integer) = println("received %s (priority = %d)" format(s, p)) }
  • 35.
    Typed Consumer Actors •Annotations – Consumer endpoints • @consume(endpointURI) – Parameter bindings • @Body • @Header •…
  • 36.
    Typed Consumer Actors •Basic example – HTTP/JMS consumer activation import akka.actor._ import akka.camel._ val service = CamelServiceManager.startCamelService // Activate HTTP and JMS endpoints (asynchronously) TypedActor.newInstance( classOf[TypedConsumer], classOf[TypedConsumerImpl]) // … or wait for HTTP and JMS endpoint activation service.awaitEndpointActivation(2) { TypedActor.newInstance( classOf[TypedConsumer], classOf[TypedConsumerImpl]) }
  • 37.
    Producer Actors • Basicexample – HTTP producer import akka.actor._ import akka.camel._ class HttpProducer extends Actor with Producer { def endpointUri = "jetty:http://localhost:8080/example" } – Camel component: camel-jetty – MEP: in-out – receive inherited from Producer trait – Jetty’s async HTTP client used internally
  • 38.
    Producer Actors • Basicexample – HTTP producer activation import akka.actor._ import akka.camel._ // Producer actors require an initialized CamelContext CamelServiceManager.startCamelService // activate producer actor val producer = Actor.actorOf[HttpProducer].start // POST test message to http://localhost:8080/example producer !! "test" match { case Some(m: Message) => … case Some(f: Failure) => … case None => … } // ! and !!! can also be used
  • 39.
    Producer Actors • Basicexample – JMS producer class JmsProducer extends Actor with Producer { def endpointUri = "jms:queue:example" override def oneway = true } – Camel component: camel-jms – MEP: in-only
  • 40.
    Producer Actors • oneway= false (default) – Initiates an in-out ME with endpoint – Replies result to initial sender • oneway = true – Initiates an in-only ME with endpoint – No reply to sender • receiveAfterProduce – Change default reply behaviour
  • 41.
    Producer Actors • Customreplies class JmsProducer extends Actor with Producer { def endpointUri = "jms:queue:example" override def oneway = true override def receiveAfterProduce = { case m: Message => self.reply("enqueue succeeded") case f: Failure => self.reply("enqueue failed") } }
  • 42.
    Producer Actors • Forwardresults class HttpProducer extends Actor with Producer { val target: ActorRef = … def endpointUri = "jetty:http://localhost:8080/example" override def receiveAfterProduce = { case msg => target forward msg } } – Chaining of producer actors (pipelines) – Related: Future composition
  • 43.
    Producer Actors • Futurecomposition val producer1 = Actor.actorOf[HttpProducer1].start val producer2 = Actor.actorOf[HttpProducer2].start // monadic future composition (non-blocking) val future = for { m1: Message <- producer1 !!! Message("test") m2: Message <- producer2 !!! m1 } yield m2 // blocks until result is available – Producer Message val result: pipeline = future.get
  • 44.
    Actor Components • AreCamel components – Can be used in any Camel route • actor Camel component – Send messages to untyped actors – Provided by akka-camel module • typed-actor Camel component – Send messages to typed actors – Provided by akka-camel-typed module – Extension of Camel’s bean component • Used by akka-camel internally – Routes to consumer actors
  • 45.
    Actor Components • actorendpoint URI – actor:uuid:[<actor-uuid>][?<params>] • Parameters – autoack: Boolean • System or application-level acknowledgements – blocking: Boolean • Use ! or !! for sending messages to actor • Supported message headers – CamelActorIdentifier • Dynamic routing to actors
  • 46.
    Actor Components • Example import akka.actor._ // can be any actor (no need for Consumer) val actor = Actor.actorOf[SomeActor].start // ... // Camel route from JMS endpoint to actor from("jms:queue:example") .to("actor:uuid:%s?autoack=false" format actor.uuid) – actor receives messages of type Message
  • 47.
    Overview • Introduction – Camel – Akka • Akka-Camel integration – Consumer actors – Producer actors – Actor components • Comparison – Camel routes – Akka-Camel routes – Scalaz-Camel routes
  • 48.
    Comparison • Criteria – Connectivity – Message processing – Route composition – Other • High-level • Incomplete
  • 49.
    Camel Routes • Connectivity – Camel components/endpoints • Message processing – Processor interface • Predefined processors (all known EIPs) • Custom processors – Concurrent execution of Processor instances – Mutable messages
  • 50.
    Camel Routes • Routecomposition – Camel DSL (Java, Scala, XML) • Other – No built-in mechanism for distributing routes (except via endpoints) – Distribution addressed by ServiceMix and FuseSource Fabric, for example
  • 51.
    Akka-Camel Routes • Connectivity – Camel components/endpoints managed by • Consumer actors • Producer actors • Message processing – Actor • Predefined (a few in akka.actor.routing) • Custom – Sequential execution of actor instance – Immutable messages
  • 52.
    Akka-Camel Routes • Routecomposition – Wiring actors (low-level) – Future composition – … – No integration DSL (yet) • Other – Easy to implementing stateful EIPs (aggregator, resequencer …) – Strong built-in mechanisms for distribution, scalability and fault-tolerance – Basis for a distributed and scalable Enterprise Service Bus (?)
  • 53.
    Scalaz-Camel Routes • Connectivity – Camel components/endpoints • Message processing – Scala functions • Predefined (some EIPs) • Custom – Concurrent execution – Immutable messages
  • 54.
    Scalaz-Camel Routes • Routecomposition – Based on functional programming concepts • Message processors chained via Kleisli composition (>=>) – Continuation-based approach • scala.Responder used as continuation monad – Direct-style DSL • Other – Configurable concurrency strategies – Camel processors can be re-used – Akka integration
  • 55.
    Scalaz-Camel Routes • Example // custom message processor val validate: MessageProcessor = ... // Kleisli composition of route (+ implicit conversions) val vroute = validate >=> oneway >=> to("jms:queue:valid") >=> { m: Message => m.setBody("order accepted") } // consume messages via HTTP and error handling routes from(jetty:http://localhost:8080/orders) attempt { vroute } fallback { case e: ValidationException => { m: Message => ... } >=> failWith(e) case _ => ... } – Details at https://github.com/krasserm/scalaz-camel
  • 56.