Deep Dive into Building Streaming Applications with Apache Pulsar Tim Spann / Developer Advocate #ossummit @PaaSDev
Deep Dive into Building Streaming Applications with Apache Pulsar
Tim Spann Developer Advocate ● FLiP(N) Stack = Flink, Pulsar and NiFi Stack ● Streaming Systems/ Data Architect ● Experience: ○ 15+ years of experience with batch and streaming technologies including Pulsar, Flink, Spark, NiFi, Spring, Java, Big Data, Cloud, MXNet, Hadoop, Datalakes, IoT and more.
FLiP Stack Weekly This week in Apache Flink, Apache Pulsar, Apache NiFi, Apache Spark and open source friends. https://bit.ly/32dAJft
#ossummit Apache Pulsar is a Cloud-Native Messaging and Event-Streaming Platform.
Why Apache Pulsar? Unified Messaging Platform Guaranteed Message Delivery Resiliency Infinite Scalability
Building Microservices Asynchronous Communication Building Real Time Applications Highly Resilient Tiered storage 7 Pulsar Benefits
● “Bookies” ● Stores messages and cursors ● Messages are grouped in segments/ledgers ● A group of bookies form an “ensemble” to store a ledger ● “Brokers” ● Handles message routing and connections ● Stateless, but with caches ● Automatic load-balancing ● Topics are composed of multiple segments ● ● Stores metadata for both Pulsar and BookKeeper ● Service discovery Store Messages Metadata & Service Discovery Metadata & Service Discovery Key Pulsar Concepts: Architecture MetaData Storage
Component Description Value / data payload The data carried by the message. All Pulsar messages contain raw bytes, although message data can also conform to data schemas. Key Messages are optionally tagged with keys, used in partitioning and also is useful for things like topic compaction. Properties An optional key/value map of user-defined properties. Producer name The name of the producer who produces the message. If you do not specify a producer name, the default name is used. Message De-Duplication. Sequence ID Each Pulsar message belongs to an ordered sequence on its topic. The sequence ID of the message is its order in that sequence. Message De-Duplication. Messages - the basic unit of Pulsar
Key Pulsar Concepts: Messaging vs Streaming Message Queueing - Queueing systems are ideal for work queues that do not require tasks to be performed in a particular order. Streaming - Streaming works best in situations where the order of messages is important.
#ossummit Connectivity • Functions - Lightweight Stream Processing (Java, Python, Go) • Connectors - Sources & Sinks (Cassandra, Kafka, …) • Protocol Handlers - AoP (AMQP), KoP (Kafka), MoP (MQTT) • Processing Engines - Flink, Spark, Presto/Trino via Pulsar SQL • Data Offloaders - Tiered Storage - (S3) hub.streamnative.io
#ossummit Schema Registry Schema Registry schema-1 (value=Avro/Protobuf/JSON) schema-2 (value=Avro/Protobuf/JSON) schema-3 (value=Avro/Protobuf/JSON) Schema Data ID Local Cache for Schemas + Schema Data ID + Local Cache for Schemas Send schema-1 (value=Avro/Protobuf/JSON) data serialized per schema ID Send (register) schema (if not in local cache) Read schema-1 (value=Avro/Protobuf/JSON) data deserialized per schema ID Get schema by ID (if not in local cache) Producers Consumers
#ossummit Kafka On Pulsar (KoP)
#ossummit MQTT On Pulsar (MoP)
#ossummit AMQP On Pulsar (AoP)
#ossummit Presto/Trino workers can read segments directly from bookies (or offloaded storage) in parallel. Pulsar SQL Bookie 1 Segment 1 Producer Consumer Broker 1 Topic1-Part1 Broker 2 Topic1-Part2 Broker 3 Topic1-Part3 Segment 2 Segment 3 Segment 4 Segment X Segment 1 Segment 1 Segment 1 Segment 3 Segment 3 Segment 3 Segment 2 Segment 2 Segment 2 Segment 4 Segment 4 Segment 4 Segment X Segment X Segment X Bookie 2 Bookie 3 Query Coordin ator SQL Worker SQL Worker SQL Worker SQL Worker Query Topic Metadata
● Buffer ● Batch ● Route ● Filter ● Aggregate ● Enrich ● Replicate ● Dedupe ● Decouple ● Distribute
#ossummit Pulsar Functions ● Lightweight computation similar to AWS Lambda. ● Specifically designed to use Apache Pulsar as a message bus. ● Function runtime can be located within Pulsar Broker. A serverless event streaming framework
#ossummit ● Consume messages from one or more Pulsar topics. ● Apply user-supplied processing logic to each message. ● Publish the results of the computation to another topic. ● Support multiple programming languages (Java, Python, Go) ● Can leverage 3rd-party libraries to support the execution of ML models on the edge. Pulsar Functions
#ossummit Run a Local Standalone Bare Metal wget https://archive.apache.org/dist/pulsar/pulsar-2.10.1/apache-pulsar-2.10.1- bin.tar.gz tar xvfz apache-pulsar-2.10.1-bin.tar.gz cd apache-pulsar-2.10.1 bin/pulsar standalone (For Pulsar SQL Support) bin/pulsar sql-worker start https://pulsar.apache.org/docs/en/standalone/
#ossummit <or> Run in Docker docker run -it -p 6650:6650 -p 8080:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:2.10.1 bin/pulsar standalone https://pulsar.apache.org/docs/en/standalone-docker/
#ossummit Building Tenant, Namespace, Topics bin/pulsar-admin tenants create conf bin/pulsar-admin namespaces create conf/europe bin/pulsar-admin tenants list bin/pulsar-admin namespaces list conf bin/pulsar-admin topics create persistent://conf/europe/first bin/pulsar-admin topics list conf/europe
#ossummit Install Python 3 Pulsar Client pip3 install pulsar-client=='2.10.1[all]' Includes AARCH64, ARM, M2, INTEL, … For Python on Pulsar on Pi https://github.com/tspannhw/PulsarOnRaspberryPi https://pulsar.apache.org/docs/en/client-libraries-python/ https://pypi.org/project/pulsar-client/2.10.0/#files
#ossummit Building a Python 3 Producer import pulsar client = pulsar.Client('pulsar://localhost:6650') producer client.create_producer('persistent://conf/ete/first') producer.send(('Simple Text Message').encode('utf-8')) client.close()
#ossummit Building a Python 3 Cloud Producer Oath python3 prod.py -su pulsar+ssl://name1.name2.snio.cloud:6651 -t persistent://public/default/pyth --auth-params '{"issuer_url":"https://auth.streamnative.cloud", "private_key":"my.json", "audience":"urn:sn:pulsar:name:myclustr"}' from pulsar import Client, AuthenticationOauth2 parse = argparse.ArgumentParser(prog=prod.py') parse.add_argument('-su', '--service-url', dest='service_url', type=str, required=True) args = parse.parse_args() client = pulsar.Client(args.service_url, authentication=AuthenticationOauth2(args.auth_params)) https://github.com/streamnative/examples/blob/master/cloud/python/OAuth2Producer.py https://github.com/tspannhw/FLiP-Pi-BreakoutGarden
#ossummit Example Avro Schema Usage import pulsar from pulsar.schema import * from pulsar.schema import AvroSchema class thermal(Record): uuid = String() client = pulsar.Client('pulsar://pulsar1:6650') thermalschema = AvroSchema(thermal) producer = client.create_producer(topic='persistent://public/default/pi-thermal-avro', schema=thermalschema,properties={"producer-name": "thrm" }) thermalRec = thermal() thermalRec.uuid = "unique-name" producer.send(thermalRec,partition_key=uniqueid) https://github.com/tspannhw/FLiP-Pi-Thermal
#ossummit Example Json Schema Usage import pulsar from pulsar.schema import * from pulsar.schema import JsonSchema class weather(Record): uuid = String() client = pulsar.Client('pulsar://pulsar1:6650') wsc = JsonSchema(thermal) producer = client.create_producer(topic='persistent://public/default/wthr,schema=wsc,pro perties={"producer-name": "wthr" }) weatherRec = weather() weatherRec.uuid = "unique-name" producer.send(weatherRec,partition_key=uniqueid) https://github.com/tspannhw/FLiP-Pi-Weather https://github.com/tspannhw/FLiP-PulsarDevPython101
#ossummit Building a Python3 Consumer import pulsar client = pulsar.Client('pulsar://localhost:6650') consumer = client.subscribe('persistent://conf/ete/first',subscription_name='mine') while True: msg = consumer.receive() print("Received message: '%s'" % msg.data()) consumer.acknowledge(msg) client.close()
#ossummit MQTT from Python pip3 install paho-mqtt import paho.mqtt.client as mqtt client = mqtt.Client("rpi4iot") row = { } row['gasKO'] = str(readings) json_string = json.dumps(row) json_string = json_string.strip() client.connect("pulsar-server.com", 1883, 180) client.publish("persistent://public/default/mqtt-2", payload=json_string,qos=0,retain=True) https://www.slideshare.net/bunkertor/data-minutes-2-apache-pulsar-with-mqtt-for-edge-computing-lightning-2022 MQTT
#ossummit Web Sockets from Python pip3 install websocket-client import websocket, base64, json topic = 'ws://server:8080/ws/v2/producer/persistent/public/default/topic1' ws = websocket.create_connection(topic) message = "Hello Philly ETE Conference" message_bytes = message.encode('ascii') base64_bytes = base64.b64encode(message_bytes) base64_message = base64_bytes.decode('ascii') ws.send(json.dumps({'payload' : base64_message,'properties': {'device' : 'macbook'},'context' : 5})) response = json.loads(ws.recv()) https://pulsar.apache.org/docs/en/client-libraries-websocket/ https://github.com/tspannhw/FLiP-IoT/blob/main/wspulsar.py https://github.com/tspannhw/FLiP-IoT/blob/main/wsreader.py Websockets
#ossummit Kafka from Python pip3 install kafka-python from kafka import KafkaProducer from kafka.errors import KafkaError row = { } row['gasKO'] = str(readings) json_string = json.dumps(row) json_string = json_string.strip() producer = KafkaProducer(bootstrap_servers='pulsar1:9092',retries=3) producer.send('topic-kafka-1', json.dumps(row).encode('utf-8')) producer.flush() https://github.com/streamnative/kop https://docs.streamnative.io/platform/v1.0.0/concepts/kop-concepts Apache Kafka
#ossummit Deploy Python Functions bin/pulsar-admin functions create --auto-ack true --py py/src/sentiment.py --classname "sentiment.Chat" --inputs "persistent://public/default/chat" --log-topic "persistent://public/default/logs" --name Chat --output "persistent://public/default/chatresult" https://github.com/tspannhw/pulsar-pychat-function
#ossummit Pulsar IO Function in Python3 from pulsar import Function import json class Chat(Function): def __init__(self): pass def process(self, input, context): logger = context.get_logger() msg_id = context.get_message_id() fields = json.loads(input) https://github.com/tspannhw/pulsar-pychat-function
#ossummit Building a Golang Pulsar App http://pulsar.apache.org/docs/en/client-libraries-go/ go get -u "github.com/apache/pulsar-client-go/pulsar" import ( "log" "time" "github.com/apache/pulsar-client-go/pulsar" ) func main() { client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "pulsar://localhost:6650",OperationTimeout: 30 * time.Second, ConnectionTimeout: 30 * time.Second, }) if err != nil { log.Fatalf("Could not instantiate Pulsar client: %v", err) } defer client.Close() }
#ossummit Typed Java Client Producer<User> producer = client.newProducer(Schema.AVRO(User.class)).create(); producer.newMessage() .value(User.builder() .userName("pulsar-user") .userId(1L) .build()) .send(); Consumer<User> consumer = client.newConsumer(Schema.AVRO(User.class)).create(); User user = consumer.receive();
#ossummit Pulsar Producer import java.util.UUID; import java.net.URL; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2; PulsarClient client = PulsarClient.builder() .serviceUrl(serviceUrl) .authentication( AuthenticationFactoryOAuth2.clientCredentials( new URL(issuerUrl), new URL(credentialsUrl.), audience)) .build();
#ossummit Spring RabbitMQ/AMQP Producer rabbitTemplate.convertAndSend(topicName, DataUtility.serializeToJSON(observation));
#ossummit Spring MQTT Producer MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setPayload(DataUtility.serialize(payload)); mqttMessage.setQos(1); mqttMessage.setRetained(true); mqttClient.publish(topicName, mqttMessage);
#ossummit Spring Kafka Producer ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, uuidKey.toString(), DataUtility.serializeToJSON(message)); kafkaTemplate.send(producerRecord);
#ossummit Pulsar Simple Producer String pulsarKey = UUID.randomUUID().toString(); String OS = System.getProperty("os.name").toLowerCase(); ProducerBuilder<byte[]> producerBuilder = client.newProducer().topic(topic) .producerName("demo"); Producer<byte[]> producer = producerBuilder.create(); MessageId msgID = producer.newMessage().key(pulsarKey).value("msg".getBytes()) .property("device",OS).send(); producer.close(); client.close();
#ossummit import java.util.function.Function; public class MyFunction implements Function<String, String> { public String apply(String input) { return doBusinessLogic(input); } } Your Code Here Pulsar Function Java
#ossummit import org.apache.pulsar.client.impl.schema.JSONSchema; import org.apache.pulsar.functions.api.*; public class AirQualityFunction implements Function<byte[], Void> { @Override public Void process(byte[] input, Context context) { context.getLogger().debug("File:” + new String(input)); context.newOutputMessage(“topicname”, JSONSchema.of(Observation.class)) .key(UUID.randomUUID().toString()) .property(“prop1”, “value1”) .value(observation) .send(); } } Your Code Here Pulsar Function SDK
#ossummit Setting Subscription Type Java Consumer<byte[]> consumer = pulsarClient.newConsumer() .topic(topic) .subscriptionName(“subscriptionName") .subscriptionType(SubscriptionType.Shared) .subscribe();
#ossummit Subscribing to a Topic and Setting Subscription Name Java Consumer<byte[]> consumer = pulsarClient.newConsumer() .topic(topic) .subscriptionName(“subscriptionName") .subscribe();
#ossummit Producing Object Events From Java ProducerBuilder<Observation> producerBuilder = pulsarClient.newProducer(JSONSchema.of(Observation.class)) .topic(topicName) .producerName(producerName).sendTimeout(60, TimeUnit.SECONDS); Producer<Observation> producer = producerBuilder.create(); msgID = producer.newMessage() .key(someUniqueKey) .value(observation) .send();
#ossummit Monitoring and Metrics Check curl http://pulsar1:8080/admin/v2/persistent/conf/europe/first/stats | python3 -m json.tool bin/pulsar-admin topics stats-internal persistent://conf/europe/first curl http://pulsar1:8080/metrics/ bin/pulsar-admin topics stats-internal persistent://conf/europe/first bin/pulsar-admin topics peek-messages --count 5 --subscription ete-reader persistent://conf/europe/first bin/pulsar-admin topics subscriptions persistent://conf/europe/first
#ossummit Metrics: Broker Broker metrics are exposed under "/metrics" at port 8080. You can change the port by updating webServicePort to a different port in the broker.conf configuration file. All the metrics exposed by a broker are labeled with cluster=${pulsar_cluster}. The name of Pulsar cluster is the value of ${pulsar_cluster}, configured in the broker.conf file. For more information: https://pulsar.apache.org/docs/en/reference-metrics/#broker
#ossummit Metrics: Broker These metrics are available for brokers: ● Namespace metrics ○ Replication metrics ● Topic metrics ○ Replication metrics ● ManagedLedgerCache metrics ● ManagedLedger metrics ● LoadBalancing metrics ○ BundleUnloading metrics ○ BundleSplit metrics ● Subscription metrics ● Consumer metrics ● ManagedLedger bookie client metrics
#ossummit Cleanup bin/pulsar-admin topics delete persistent://conf/europe/first bin/pulsar-admin namespaces delete conf/europe bin/pulsar-admin tenants delete conf
#ossummit • Unified Messaging Platform • AdTech • Fraud Detection • Connected Car • IoT Analytics • Microservices Development Use Cases
#ossummit Java for Pulsar ● https://github.com/tspannhw/airquality ● https://github.com/tspannhw/FLiPN-AirQuality-REST ● https://github.com/tspannhw/pulsar-airquality-function ● https://github.com/tspannhw/FLiPN-DEVNEXUS-2022 ● https://github.com/tspannhw/FLiP-Py-ADS-B ● https://github.com/tspannhw/pulsar-adsb-function ● https://github.com/tspannhw/airquality-amqp-consumer ● https://github.com/tspannhw/airquality-mqtt-consumer ● https://github.com/tspannhw/airquality-consumer ● https://github.com/tspannhw/airquality-kafka-consumer
#ossummit Python For Pulsar on Pi ● https://github.com/tspannhw/FLiP-Pi-BreakoutGarden ● https://github.com/tspannhw/FLiP-Pi-Thermal ● https://github.com/tspannhw/FLiP-Pi-Weather ● https://github.com/tspannhw/FLiP-RP400 ● https://github.com/tspannhw/FLiP-Py-Pi-GasThermal ● https://github.com/tspannhw/FLiP-PY-FakeDataPulsar ● https://github.com/tspannhw/FLiP-Py-Pi-EnviroPlus ● https://github.com/tspannhw/PythonPulsarExamples ● https://github.com/tspannhw/pulsar-pychat-function ● https://github.com/tspannhw/FLiP-PulsarDevPython101 ● https://github.com/tspannhw/airquality
OSS EU: Deep Dive into Building Streaming Applications with Apache Pulsar

OSS EU: Deep Dive into Building Streaming Applications with Apache Pulsar

  • 1.
    Deep Dive intoBuilding Streaming Applications with Apache Pulsar Tim Spann / Developer Advocate #ossummit @PaaSDev
  • 2.
    Deep Dive intoBuilding Streaming Applications with Apache Pulsar
  • 3.
    Tim Spann Developer Advocate ●FLiP(N) Stack = Flink, Pulsar and NiFi Stack ● Streaming Systems/ Data Architect ● Experience: ○ 15+ years of experience with batch and streaming technologies including Pulsar, Flink, Spark, NiFi, Spring, Java, Big Data, Cloud, MXNet, Hadoop, Datalakes, IoT and more.
  • 4.
    FLiP Stack Weekly Thisweek in Apache Flink, Apache Pulsar, Apache NiFi, Apache Spark and open source friends. https://bit.ly/32dAJft
  • 5.
    #ossummit Apache Pulsar isa Cloud-Native Messaging and Event-Streaming Platform.
  • 6.
    Why Apache Pulsar? Unified MessagingPlatform Guaranteed Message Delivery Resiliency Infinite Scalability
  • 7.
  • 8.
    ● “Bookies” ● Storesmessages and cursors ● Messages are grouped in segments/ledgers ● A group of bookies form an “ensemble” to store a ledger ● “Brokers” ● Handles message routing and connections ● Stateless, but with caches ● Automatic load-balancing ● Topics are composed of multiple segments ● ● Stores metadata for both Pulsar and BookKeeper ● Service discovery Store Messages Metadata & Service Discovery Metadata & Service Discovery Key Pulsar Concepts: Architecture MetaData Storage
  • 9.
    Component Description Value /data payload The data carried by the message. All Pulsar messages contain raw bytes, although message data can also conform to data schemas. Key Messages are optionally tagged with keys, used in partitioning and also is useful for things like topic compaction. Properties An optional key/value map of user-defined properties. Producer name The name of the producer who produces the message. If you do not specify a producer name, the default name is used. Message De-Duplication. Sequence ID Each Pulsar message belongs to an ordered sequence on its topic. The sequence ID of the message is its order in that sequence. Message De-Duplication. Messages - the basic unit of Pulsar
  • 10.
    Key Pulsar Concepts:Messaging vs Streaming Message Queueing - Queueing systems are ideal for work queues that do not require tasks to be performed in a particular order. Streaming - Streaming works best in situations where the order of messages is important.
  • 11.
    #ossummit Connectivity • Functions -Lightweight Stream Processing (Java, Python, Go) • Connectors - Sources & Sinks (Cassandra, Kafka, …) • Protocol Handlers - AoP (AMQP), KoP (Kafka), MoP (MQTT) • Processing Engines - Flink, Spark, Presto/Trino via Pulsar SQL • Data Offloaders - Tiered Storage - (S3) hub.streamnative.io
  • 12.
    #ossummit Schema Registry Schema Registry schema-1(value=Avro/Protobuf/JSON) schema-2 (value=Avro/Protobuf/JSON) schema-3 (value=Avro/Protobuf/JSON) Schema Data ID Local Cache for Schemas + Schema Data ID + Local Cache for Schemas Send schema-1 (value=Avro/Protobuf/JSON) data serialized per schema ID Send (register) schema (if not in local cache) Read schema-1 (value=Avro/Protobuf/JSON) data deserialized per schema ID Get schema by ID (if not in local cache) Producers Consumers
  • 13.
  • 14.
  • 15.
  • 16.
    #ossummit Presto/Trino workers canread segments directly from bookies (or offloaded storage) in parallel. Pulsar SQL Bookie 1 Segment 1 Producer Consumer Broker 1 Topic1-Part1 Broker 2 Topic1-Part2 Broker 3 Topic1-Part3 Segment 2 Segment 3 Segment 4 Segment X Segment 1 Segment 1 Segment 1 Segment 3 Segment 3 Segment 3 Segment 2 Segment 2 Segment 2 Segment 4 Segment 4 Segment 4 Segment X Segment X Segment X Bookie 2 Bookie 3 Query Coordin ator SQL Worker SQL Worker SQL Worker SQL Worker Query Topic Metadata
  • 19.
    ● Buffer ● Batch ●Route ● Filter ● Aggregate ● Enrich ● Replicate ● Dedupe ● Decouple ● Distribute
  • 20.
    #ossummit Pulsar Functions ● Lightweightcomputation similar to AWS Lambda. ● Specifically designed to use Apache Pulsar as a message bus. ● Function runtime can be located within Pulsar Broker. A serverless event streaming framework
  • 21.
    #ossummit ● Consume messagesfrom one or more Pulsar topics. ● Apply user-supplied processing logic to each message. ● Publish the results of the computation to another topic. ● Support multiple programming languages (Java, Python, Go) ● Can leverage 3rd-party libraries to support the execution of ML models on the edge. Pulsar Functions
  • 22.
    #ossummit Run a LocalStandalone Bare Metal wget https://archive.apache.org/dist/pulsar/pulsar-2.10.1/apache-pulsar-2.10.1- bin.tar.gz tar xvfz apache-pulsar-2.10.1-bin.tar.gz cd apache-pulsar-2.10.1 bin/pulsar standalone (For Pulsar SQL Support) bin/pulsar sql-worker start https://pulsar.apache.org/docs/en/standalone/
  • 23.
    #ossummit <or> Run inDocker docker run -it -p 6650:6650 -p 8080:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:2.10.1 bin/pulsar standalone https://pulsar.apache.org/docs/en/standalone-docker/
  • 24.
    #ossummit Building Tenant, Namespace,Topics bin/pulsar-admin tenants create conf bin/pulsar-admin namespaces create conf/europe bin/pulsar-admin tenants list bin/pulsar-admin namespaces list conf bin/pulsar-admin topics create persistent://conf/europe/first bin/pulsar-admin topics list conf/europe
  • 25.
    #ossummit Install Python 3Pulsar Client pip3 install pulsar-client=='2.10.1[all]' Includes AARCH64, ARM, M2, INTEL, … For Python on Pulsar on Pi https://github.com/tspannhw/PulsarOnRaspberryPi https://pulsar.apache.org/docs/en/client-libraries-python/ https://pypi.org/project/pulsar-client/2.10.0/#files
  • 26.
    #ossummit Building a Python3 Producer import pulsar client = pulsar.Client('pulsar://localhost:6650') producer client.create_producer('persistent://conf/ete/first') producer.send(('Simple Text Message').encode('utf-8')) client.close()
  • 27.
    #ossummit Building a Python3 Cloud Producer Oath python3 prod.py -su pulsar+ssl://name1.name2.snio.cloud:6651 -t persistent://public/default/pyth --auth-params '{"issuer_url":"https://auth.streamnative.cloud", "private_key":"my.json", "audience":"urn:sn:pulsar:name:myclustr"}' from pulsar import Client, AuthenticationOauth2 parse = argparse.ArgumentParser(prog=prod.py') parse.add_argument('-su', '--service-url', dest='service_url', type=str, required=True) args = parse.parse_args() client = pulsar.Client(args.service_url, authentication=AuthenticationOauth2(args.auth_params)) https://github.com/streamnative/examples/blob/master/cloud/python/OAuth2Producer.py https://github.com/tspannhw/FLiP-Pi-BreakoutGarden
  • 28.
    #ossummit Example Avro SchemaUsage import pulsar from pulsar.schema import * from pulsar.schema import AvroSchema class thermal(Record): uuid = String() client = pulsar.Client('pulsar://pulsar1:6650') thermalschema = AvroSchema(thermal) producer = client.create_producer(topic='persistent://public/default/pi-thermal-avro', schema=thermalschema,properties={"producer-name": "thrm" }) thermalRec = thermal() thermalRec.uuid = "unique-name" producer.send(thermalRec,partition_key=uniqueid) https://github.com/tspannhw/FLiP-Pi-Thermal
  • 29.
    #ossummit Example Json SchemaUsage import pulsar from pulsar.schema import * from pulsar.schema import JsonSchema class weather(Record): uuid = String() client = pulsar.Client('pulsar://pulsar1:6650') wsc = JsonSchema(thermal) producer = client.create_producer(topic='persistent://public/default/wthr,schema=wsc,pro perties={"producer-name": "wthr" }) weatherRec = weather() weatherRec.uuid = "unique-name" producer.send(weatherRec,partition_key=uniqueid) https://github.com/tspannhw/FLiP-Pi-Weather https://github.com/tspannhw/FLiP-PulsarDevPython101
  • 30.
    #ossummit Building a Python3Consumer import pulsar client = pulsar.Client('pulsar://localhost:6650') consumer = client.subscribe('persistent://conf/ete/first',subscription_name='mine') while True: msg = consumer.receive() print("Received message: '%s'" % msg.data()) consumer.acknowledge(msg) client.close()
  • 31.
    #ossummit MQTT from Python pip3install paho-mqtt import paho.mqtt.client as mqtt client = mqtt.Client("rpi4iot") row = { } row['gasKO'] = str(readings) json_string = json.dumps(row) json_string = json_string.strip() client.connect("pulsar-server.com", 1883, 180) client.publish("persistent://public/default/mqtt-2", payload=json_string,qos=0,retain=True) https://www.slideshare.net/bunkertor/data-minutes-2-apache-pulsar-with-mqtt-for-edge-computing-lightning-2022 MQTT
  • 32.
    #ossummit Web Sockets fromPython pip3 install websocket-client import websocket, base64, json topic = 'ws://server:8080/ws/v2/producer/persistent/public/default/topic1' ws = websocket.create_connection(topic) message = "Hello Philly ETE Conference" message_bytes = message.encode('ascii') base64_bytes = base64.b64encode(message_bytes) base64_message = base64_bytes.decode('ascii') ws.send(json.dumps({'payload' : base64_message,'properties': {'device' : 'macbook'},'context' : 5})) response = json.loads(ws.recv()) https://pulsar.apache.org/docs/en/client-libraries-websocket/ https://github.com/tspannhw/FLiP-IoT/blob/main/wspulsar.py https://github.com/tspannhw/FLiP-IoT/blob/main/wsreader.py Websockets
  • 33.
    #ossummit Kafka from Python pip3install kafka-python from kafka import KafkaProducer from kafka.errors import KafkaError row = { } row['gasKO'] = str(readings) json_string = json.dumps(row) json_string = json_string.strip() producer = KafkaProducer(bootstrap_servers='pulsar1:9092',retries=3) producer.send('topic-kafka-1', json.dumps(row).encode('utf-8')) producer.flush() https://github.com/streamnative/kop https://docs.streamnative.io/platform/v1.0.0/concepts/kop-concepts Apache Kafka
  • 34.
    #ossummit Deploy Python Functions bin/pulsar-adminfunctions create --auto-ack true --py py/src/sentiment.py --classname "sentiment.Chat" --inputs "persistent://public/default/chat" --log-topic "persistent://public/default/logs" --name Chat --output "persistent://public/default/chatresult" https://github.com/tspannhw/pulsar-pychat-function
  • 35.
    #ossummit Pulsar IO Functionin Python3 from pulsar import Function import json class Chat(Function): def __init__(self): pass def process(self, input, context): logger = context.get_logger() msg_id = context.get_message_id() fields = json.loads(input) https://github.com/tspannhw/pulsar-pychat-function
  • 36.
    #ossummit Building a GolangPulsar App http://pulsar.apache.org/docs/en/client-libraries-go/ go get -u "github.com/apache/pulsar-client-go/pulsar" import ( "log" "time" "github.com/apache/pulsar-client-go/pulsar" ) func main() { client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "pulsar://localhost:6650",OperationTimeout: 30 * time.Second, ConnectionTimeout: 30 * time.Second, }) if err != nil { log.Fatalf("Could not instantiate Pulsar client: %v", err) } defer client.Close() }
  • 37.
    #ossummit Typed Java Client Producer<User>producer = client.newProducer(Schema.AVRO(User.class)).create(); producer.newMessage() .value(User.builder() .userName("pulsar-user") .userId(1L) .build()) .send(); Consumer<User> consumer = client.newConsumer(Schema.AVRO(User.class)).create(); User user = consumer.receive();
  • 38.
    #ossummit Pulsar Producer import java.util.UUID; importjava.net.URL; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2; PulsarClient client = PulsarClient.builder() .serviceUrl(serviceUrl) .authentication( AuthenticationFactoryOAuth2.clientCredentials( new URL(issuerUrl), new URL(credentialsUrl.), audience)) .build();
  • 39.
  • 40.
    #ossummit Spring MQTT Producer MqttMessagemqttMessage = new MqttMessage(); mqttMessage.setPayload(DataUtility.serialize(payload)); mqttMessage.setQos(1); mqttMessage.setRetained(true); mqttClient.publish(topicName, mqttMessage);
  • 41.
    #ossummit Spring Kafka Producer ProducerRecord<String,String> producerRecord = new ProducerRecord<>(topicName, uuidKey.toString(), DataUtility.serializeToJSON(message)); kafkaTemplate.send(producerRecord);
  • 42.
    #ossummit Pulsar Simple Producer StringpulsarKey = UUID.randomUUID().toString(); String OS = System.getProperty("os.name").toLowerCase(); ProducerBuilder<byte[]> producerBuilder = client.newProducer().topic(topic) .producerName("demo"); Producer<byte[]> producer = producerBuilder.create(); MessageId msgID = producer.newMessage().key(pulsarKey).value("msg".getBytes()) .property("device",OS).send(); producer.close(); client.close();
  • 43.
    #ossummit import java.util.function.Function; public classMyFunction implements Function<String, String> { public String apply(String input) { return doBusinessLogic(input); } } Your Code Here Pulsar Function Java
  • 44.
    #ossummit import org.apache.pulsar.client.impl.schema.JSONSchema; import org.apache.pulsar.functions.api.*; publicclass AirQualityFunction implements Function<byte[], Void> { @Override public Void process(byte[] input, Context context) { context.getLogger().debug("File:” + new String(input)); context.newOutputMessage(“topicname”, JSONSchema.of(Observation.class)) .key(UUID.randomUUID().toString()) .property(“prop1”, “value1”) .value(observation) .send(); } } Your Code Here Pulsar Function SDK
  • 45.
    #ossummit Setting Subscription TypeJava Consumer<byte[]> consumer = pulsarClient.newConsumer() .topic(topic) .subscriptionName(“subscriptionName") .subscriptionType(SubscriptionType.Shared) .subscribe();
  • 46.
    #ossummit Subscribing to aTopic and Setting Subscription Name Java Consumer<byte[]> consumer = pulsarClient.newConsumer() .topic(topic) .subscriptionName(“subscriptionName") .subscribe();
  • 47.
    #ossummit Producing Object EventsFrom Java ProducerBuilder<Observation> producerBuilder = pulsarClient.newProducer(JSONSchema.of(Observation.class)) .topic(topicName) .producerName(producerName).sendTimeout(60, TimeUnit.SECONDS); Producer<Observation> producer = producerBuilder.create(); msgID = producer.newMessage() .key(someUniqueKey) .value(observation) .send();
  • 48.
    #ossummit Monitoring and MetricsCheck curl http://pulsar1:8080/admin/v2/persistent/conf/europe/first/stats | python3 -m json.tool bin/pulsar-admin topics stats-internal persistent://conf/europe/first curl http://pulsar1:8080/metrics/ bin/pulsar-admin topics stats-internal persistent://conf/europe/first bin/pulsar-admin topics peek-messages --count 5 --subscription ete-reader persistent://conf/europe/first bin/pulsar-admin topics subscriptions persistent://conf/europe/first
  • 49.
    #ossummit Metrics: Broker Broker metricsare exposed under "/metrics" at port 8080. You can change the port by updating webServicePort to a different port in the broker.conf configuration file. All the metrics exposed by a broker are labeled with cluster=${pulsar_cluster}. The name of Pulsar cluster is the value of ${pulsar_cluster}, configured in the broker.conf file. For more information: https://pulsar.apache.org/docs/en/reference-metrics/#broker
  • 50.
    #ossummit Metrics: Broker These metricsare available for brokers: ● Namespace metrics ○ Replication metrics ● Topic metrics ○ Replication metrics ● ManagedLedgerCache metrics ● ManagedLedger metrics ● LoadBalancing metrics ○ BundleUnloading metrics ○ BundleSplit metrics ● Subscription metrics ● Consumer metrics ● ManagedLedger bookie client metrics
  • 51.
    #ossummit Cleanup bin/pulsar-admin topics deletepersistent://conf/europe/first bin/pulsar-admin namespaces delete conf/europe bin/pulsar-admin tenants delete conf
  • 52.
    #ossummit • Unified MessagingPlatform • AdTech • Fraud Detection • Connected Car • IoT Analytics • Microservices Development Use Cases
  • 54.
    #ossummit Java for Pulsar ●https://github.com/tspannhw/airquality ● https://github.com/tspannhw/FLiPN-AirQuality-REST ● https://github.com/tspannhw/pulsar-airquality-function ● https://github.com/tspannhw/FLiPN-DEVNEXUS-2022 ● https://github.com/tspannhw/FLiP-Py-ADS-B ● https://github.com/tspannhw/pulsar-adsb-function ● https://github.com/tspannhw/airquality-amqp-consumer ● https://github.com/tspannhw/airquality-mqtt-consumer ● https://github.com/tspannhw/airquality-consumer ● https://github.com/tspannhw/airquality-kafka-consumer
  • 55.
    #ossummit Python For Pulsaron Pi ● https://github.com/tspannhw/FLiP-Pi-BreakoutGarden ● https://github.com/tspannhw/FLiP-Pi-Thermal ● https://github.com/tspannhw/FLiP-Pi-Weather ● https://github.com/tspannhw/FLiP-RP400 ● https://github.com/tspannhw/FLiP-Py-Pi-GasThermal ● https://github.com/tspannhw/FLiP-PY-FakeDataPulsar ● https://github.com/tspannhw/FLiP-Py-Pi-EnviroPlus ● https://github.com/tspannhw/PythonPulsarExamples ● https://github.com/tspannhw/pulsar-pychat-function ● https://github.com/tspannhw/FLiP-PulsarDevPython101 ● https://github.com/tspannhw/airquality