spark-jms is a spark dataframe reader and writer for JMS(Java Messaging Service) compatible sources .This connector works in both streaming and batch mode. It works directly or indirectly with mostly also the sources which has JMS providers implemented for them.
Currently spark-jms is not available in maven central or any other public repository.This is marked as a TODO for me.
spark-jms is used normally like other connectors just like kakfa.
For using it Clone this repo on your system.Ensure maven is installed on your system for building it.
git clone https://github.com/jksinghpro/spark-jms.git Go to root directory of the project and run
mvn clean install Once the project is build succesfully,either add it as a dependency to your spark application and add its jars to the spark classpath. You can also pass the jar to the spark submit using option
spark-submit --jars <jar_path> This connector directly supports ActiveMq, RabbitMq, Kafka (Will go on adding more direct support soon). However indirectly it supports all the messaging queues or sources which have JMS clients. Only requirement is that those JMS connection factory implementation for those sources must be in spark classpath along with implementation to the following trait.
jk.bigdata.tech.jms.ConnectionFactoryProvider This trait has an abstract method
def createConnection(options:Map[String,String]):ConnectionFactory which returns an dynamic instance of ConnectionFactory for the source under consideration. For examples on implementing this trait . Refer here
Sample code for usage of spark-jms connector
//Active MQ val dataframeAmq = spark.read .format("jk.bigdata.tech.jms") .option("connection","amq") .load //Rabbit MQ val dataframeRmq = spark.read .format("jk.bigdata.tech.jms") .option("connection","rmq") .option("queue","<<queue_name>>") .option("username","<<username>>") .option("password","<<password>>") .option("virtualhost","<<virtual_host>>") .option("host","<<host>>") .option("port","<<port>>") .load //Kafka val dataframeKafka = spark.read .format("jk.bigdata.tech.jms") .option("connection","kafka") .option("bootstrap.servers","<<bootstrap-servers>>") .option("zookeeper.connect","<<zookeeper-host>>") .option("client.id","<<client id>>") .option("queue","<<queue_name>>") .load dataframe.write .mode(SaveMode.Append) .format("jk.bigdata.tech.jms") .option("connection","jk.bigdata.tech.jms.AMQConnectionFactoryProvider") .option("queue","<<queue name>>") .save dataframe.write .mode(SaveMode.Append) .format("jk.bigdata.tech.jms") .option("connection","rmq") .option("queue","<<queue name>>") .save dataframe.write .mode(SaveMode.Append) .format("jk.bigdata.tech.jms") .option("connection","kafka") .option("queue","<<queue name>>") .save //Active MQ val dataframeAmq = spark.readStream .format("jk.bigdata.tech.jms") .option("connection","amq") .load //Rabbit MQ val dataframeRmq = spark.readStream .format("jk.bigdata.tech.jms") .option("connection","rmq") .option("queue","<<queue_name>>") .option("username","<<username>>") .option("password","<<password>>") .option("virtualhost","<<virtual_host>>") .option("host","<<host>>") .option("port","<<port>>") .load //Kafka val dataframeKafka = spark.readStream .format("jk.bigdata.tech.jms") .option("connection","kafka") .option("bootstrap.servers","<<bootstrap-servers>>") .option("zookeeper.connect","<<zookeeper-host>>") .option("client.id","<<client id>>") .option("queue","<<queue_name>>") .load | Option | Required | Description | DefaultValue |
|---|---|---|---|
| Queue | true | Name of topic or messaging queue | None |
| acknowledge | false | To acknowledge the message or not | false |
| connection | true | Fully Qualified Name of ConnectionFactoryProviderImplementation or Alias of directly supported queues | None |
Note Above configurations are common across jms clients .Other than these configs some configs are specific to JMS client under consideration
- Issue Tracker: https://github.com/jksinghpro/spark-jms/issues
The project is licensed under the Apache 2 license.