Skip to content

Commit 17d18a6

Browse files
author
Vitalii Cherkashyn
committed
kafka example
1 parent ee455cb commit 17d18a6

File tree

21 files changed

+1212
-0
lines changed

21 files changed

+1212
-0
lines changed

kafka/README.md

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# API Tutorial
2+
3+
You can use this sample code to learn more about the Apache Kafka 0.9 API, this code should run without change on the MapR Streams
4+
5+
6+
## Create topics for sample
7+
8+
maprcli stream create -path /user/user01/pump
9+
maprcli stream edit -path /user/user01/pump -produceperm u:user01 -consumeperm u:user01 -topicperm u:user01
10+
maprcli stream topic create -path /user/user01/pump -topic input -partitions 3
11+
maprcli stream topic create -path /user/user01/pump -topic alert -partitions 3
12+
13+
14+
## Simple Producer and Consumer
15+
16+
java -cp ms-lab-1.0.jar:`mapr classpath` example.simple.Producer /user/user01/pump:input
17+
18+
java -cp ms-lab-1.0.jar:`mapr classpath` example.simple.Consumer /user/user01/pump:input 0
19+
20+
java -cp ms-lab-1.0.jar:`mapr classpath` example.simple.Consumer /user/user01/pump:input 0
21+
22+
java -cp ms-lab-1.0.jar:`mapr classpath` example.simple.Consumer /user/user01/pump:input 1
23+
24+
java -cp ms-lab-1.0.jar:`mapr classpath` example.simple.Consumer /user/user01/pump:input 1
25+
26+
27+
## Partitions
28+
29+
java -cp ms-lab-1.0.jar:`mapr classpath` example.partition.Producer /user/user01/pump:input
30+
31+
java -cp ms-lab-1.0.jar:`mapr classpath` example.partition.Consumer /user/user01/pump:input
32+
33+
java -cp ms-lab-1.0.jar:`mapr classpath` example.partition.Consumer /user/user01/pump:input
34+
```
35+
36+
## Consumer Groups
37+
```sh
38+
java -cp ms-lab-1.0.jar:`mapr classpath` example.group.Producer /user/user01/pump:input
39+
40+
java -cp ms-lab-1.0.jar:`mapr classpath` example.group.Consumer /user/user01/pump:input group2
41+
42+
java -cp ms-lab-1.0.jar:`mapr classpath` example.group.Consumer /user/user01/pump:input group1
43+
44+
java -cp ms-lab-1.0.jar:`mapr classpath` example.group.Consumer /user/user01/pump:input group1
45+
```
46+
47+
## Sequence
48+
49+
java -cp ms-lab-1.0.jar:`mapr classpath` example.sequence.Sample send /user/user01/pump:input 5
50+
java -cp ms-lab-1.0.jar:`mapr classpath` example.sequence.Sample get /user/user01/pump:input 5

kafka/pom.xml

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>ms-lab</groupId>
8+
<artifactId>ms-lab</artifactId>
9+
<version>1.0</version>
10+
11+
<repositories>
12+
<repository>
13+
<id>central</id>
14+
<url>http://repo.maven.apache.org/maven2/</url>
15+
<snapshots>
16+
<enabled>true</enabled>
17+
</snapshots>
18+
<releases>
19+
<enabled>true</enabled>
20+
</releases>
21+
</repository>
22+
23+
<repository>
24+
<id>mapr-releases</id>
25+
<url>http://repository.mapr.com/maven/</url>
26+
<snapshots>
27+
<enabled>false</enabled>
28+
</snapshots>
29+
<releases>
30+
<enabled>true</enabled>
31+
</releases>
32+
</repository>
33+
34+
35+
</repositories>
36+
37+
<dependencies>
38+
39+
<dependency>
40+
<groupId>org.apache.kafka</groupId>
41+
<artifactId>kafka-clients</artifactId>
42+
<version>0.9.0.0-mapr-1602-streams-5.1.0</version>
43+
44+
</dependency>
45+
46+
47+
</dependencies>
48+
<name>ms-lab</name>
49+
<properties>
50+
<maven.compiler.source>1.7</maven.compiler.source>
51+
<maven.compiler.target>1.7</maven.compiler.target>
52+
</properties>
53+
</project>

kafka/src/.DS_Store

6 KB
Binary file not shown.

kafka/src/main/.DS_Store

6 KB
Binary file not shown.

kafka/src/main/java/.DS_Store

6 KB
Binary file not shown.
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package example.group;
2+
3+
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
4+
import org.apache.kafka.clients.consumer.ConsumerRecord;
5+
import org.apache.kafka.clients.consumer.ConsumerRecords;
6+
import org.apache.kafka.clients.consumer.KafkaConsumer;
7+
import org.apache.kafka.common.TopicPartition;
8+
9+
import java.util.Arrays;
10+
import java.util.Collection;
11+
import java.util.Properties;
12+
import java.util.Scanner;
13+
14+
/**
15+
*
16+
*/
17+
public class Consumer {
18+
19+
private static Scanner in;
20+
private static boolean stop = false;
21+
22+
public static void main(String[] argv) throws Exception {
23+
if (argv.length != 2) {
24+
System.err.println("Please specify 2 parameters ");
25+
System.exit(-1);
26+
}
27+
in = new Scanner(System.in);
28+
final String topicName = argv[0];
29+
final String groupId = argv[1];
30+
31+
Consumer.GroupConsumerThread ct = new Consumer.GroupConsumerThread(topicName, groupId);
32+
ct.start();
33+
String line = "";
34+
while (!line.equals("exit")) {
35+
line = in.next();
36+
}
37+
stop = true;
38+
System.out.println("Stopping consumer .....");
39+
ct.join();
40+
41+
}
42+
43+
private static class GroupConsumerThread extends Thread {
44+
45+
private String topicName;
46+
private String groupId;
47+
48+
public GroupConsumerThread(String topicName, String groupId) {
49+
this.topicName = topicName;
50+
this.groupId = groupId;
51+
}
52+
53+
@Override
54+
public void run() {
55+
//Configure Stream Consumer
56+
Properties configProperties = new Properties();
57+
58+
configProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
59+
configProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
60+
configProperties.put("group.id", groupId);
61+
final KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configProperties);
62+
63+
// Generate Debug message about which TOPIC the consumer is listening to
64+
consumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() {
65+
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
66+
System.out.println("Stopped listening to " + Arrays.toString(collection.toArray()));
67+
}
68+
69+
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
70+
System.out.println("Started listening to " + Arrays.toString(collection.toArray()));
71+
}
72+
});
73+
74+
//Start processing messages
75+
while (true) {
76+
ConsumerRecords<String, String> records = consumer.poll(100);
77+
if (stop == true) {
78+
consumer.close();
79+
System.exit(0);
80+
}
81+
for (ConsumerRecord<String, String> record : records) {
82+
System.out.println("Offset -> " + record.offset() + ", Message -> " + record.value());
83+
}
84+
}
85+
}
86+
}
87+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package example.group;
2+
3+
import org.apache.kafka.clients.producer.*;
4+
import org.apache.log4j.Logger;
5+
6+
import java.util.Properties;
7+
import java.util.Scanner;
8+
9+
/**
10+
*
11+
*/
12+
public class Producer {
13+
private static final Logger logger = Logger.getLogger(Producer.class);
14+
private static Scanner in;
15+
16+
public static void main(String[] argv){
17+
if(argv.length !=1){
18+
System.err.println("Please specify 1 parameters ");
19+
System.exit(-1);
20+
}
21+
String topicName = argv[0];
22+
in = new Scanner(System.in);
23+
System.out.println("Enter message(type exit to quit)");
24+
25+
//Configure the Producer
26+
Properties configProperties = new Properties();
27+
28+
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
29+
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
30+
org.apache.kafka.clients.producer.Producer producer = new KafkaProducer<String, String>(configProperties);
31+
String line = in.nextLine();
32+
33+
//Listen to user input and send every message to server
34+
while(!line.equals("exit")) {
35+
System.out.println("Sending message -> " + line);
36+
ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topicName, line);
37+
producer.send(rec, new Callback() {
38+
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
39+
System.out.println( "Message Sent Successfully Topic -> " + recordMetadata.topic() + " Partition -> " + recordMetadata.partition() +" Offset-> " + recordMetadata.offset());
40+
}
41+
});
42+
line = in.nextLine();
43+
}
44+
in.close();
45+
producer.close();
46+
}
47+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package example.partition;
2+
3+
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
4+
import org.apache.kafka.clients.consumer.ConsumerRecord;
5+
import org.apache.kafka.clients.consumer.ConsumerRecords;
6+
import org.apache.kafka.clients.consumer.KafkaConsumer;
7+
import org.apache.kafka.common.TopicPartition;
8+
9+
import java.util.Arrays;
10+
import java.util.Collection;
11+
import java.util.Properties;
12+
import java.util.Scanner;
13+
14+
/**
15+
*
16+
*/
17+
public class Consumer {
18+
19+
private static Scanner in;
20+
private static boolean stop = false;
21+
public static void main(String[] argv)throws Exception{
22+
if (argv.length != 1) {
23+
System.err.println("Please specify 1 parameters ");
24+
System.exit(-1);
25+
}
26+
in = new Scanner(System.in);
27+
final String topicName = argv[0];
28+
29+
Consumer.PartitionConumserThread ct = new Consumer.PartitionConumserThread(topicName);
30+
ct.start();
31+
String line = "";
32+
while (!line.equals("exit")) {
33+
line = in.next();
34+
}
35+
stop = true;
36+
System.out.println("Stopping consumer .....");
37+
ct.join();
38+
39+
40+
}
41+
42+
43+
public static class PartitionConumserThread extends Thread{
44+
private String topicName;
45+
public PartitionConumserThread(String topicName) {
46+
this.topicName = topicName;
47+
}
48+
@Override
49+
public void run() {
50+
//Initialize the Consumer configuration
51+
Properties configProperties = new Properties();
52+
53+
configProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
54+
configProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
55+
configProperties.put("group.id", "partition");
56+
57+
// Generate Debug message about which TOPIC the consumer is listening to
58+
final KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configProperties);
59+
consumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() {
60+
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
61+
System.out.println("Stopped listening to " + Arrays.toString(collection.toArray()));
62+
}
63+
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
64+
System.out.println("Started listening to " + Arrays.toString(collection.toArray()));
65+
}
66+
});
67+
68+
//Start processing messages
69+
while (true) {
70+
ConsumerRecords<String, String> records = consumer.poll(100);
71+
if (stop == true) {
72+
consumer.close();
73+
System.exit(0);
74+
}
75+
for (ConsumerRecord<String, String> record : records)
76+
System.out.println("Offset -> " +record.offset() + ", Message -> " + record.value());
77+
}
78+
}
79+
}
80+
81+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package example.partition;
2+
3+
import java.util.Map;
4+
5+
import org.apache.kafka.common.Cluster;
6+
7+
/**
8+
*
9+
* TOPIC - The TOPIC name key - The key to partition on (or null if no key)
10+
* keyBytes - The serialized key to partition on( or null if no key) value - The
11+
* value to partition on or null valueBytes - The serialized value to partition
12+
* on or null cluster - The current cluster metadata
13+
*/
14+
public class DemoPartitioner implements org.apache.kafka.clients.producer.StreamsPartitioner {
15+
16+
@Override
17+
public void close() {
18+
19+
}
20+
21+
@Override
22+
public void configure(Map<String, ?> map) {
23+
24+
}
25+
26+
@Override
27+
public int partition(java.lang.String topic,
28+
java.lang.Object key,
29+
byte[] keyBytes,
30+
java.lang.Object value,
31+
byte[] valueBytes,
32+
int numPartitions) {
33+
System.out.println("DemoPartitioner TOPIC " + topic + " key " + key + " value " + value + " partitions " + numPartitions);
34+
if (value == null) {
35+
return 0;
36+
} else {
37+
38+
int p = Math.abs(value.hashCode() % numPartitions);
39+
System.out.println("DemoPartitioner returning partition " + p);
40+
return p;
41+
}
42+
}
43+
}

0 commit comments

Comments
 (0)