1+ package  com .hyzs .spark .streaming 
2+ 
3+ import  org .apache .kafka .clients .consumer .ConsumerConfig 
4+ import  org .apache .kafka .common .serialization .StringDeserializer 
5+ import  org .apache .spark .{SparkConf , TaskContext }
6+ import  org .apache .spark .streaming ._ 
7+ import  org .apache .spark .streaming .kafka010 ._ 
8+ 
9+ /** 
10+  * Created by xk on 2018/11/23. 
11+  */  
12+ object  DirectKafkaWordCount  {
13+  def  main (args : Array [String ]) {
14+ 
15+  val  Array (brokers, groupId, topics) =  Array (" 111.230.17.36:9094" " testGroup02" " jd_data01" 
16+ 
17+  //  Create context with 2 second batch interval
18+  val  sparkConf  =  new  SparkConf ().setAppName(" DirectKafkaWordCount" " local" 
19+  val  ssc  =  new  StreamingContext (sparkConf, Seconds (10 ))
20+ 
21+  //  Create direct kafka stream with brokers and topics
22+  val  topicsSet  =  topics.split(" ," 
23+  val  kafkaParams  =  Map [String , Object ](
24+  ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG  ->  brokers,
25+  ConsumerConfig .GROUP_ID_CONFIG  ->  groupId,
26+  ConsumerConfig .AUTO_OFFSET_RESET_CONFIG  ->  " earliest" 
27+  ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG  ->  classOf [StringDeserializer ],
28+  ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG  ->  classOf [StringDeserializer ])
29+  val  messages  =  KafkaUtils .createDirectStream[String , String ](
30+  ssc,
31+  LocationStrategies .PreferConsistent ,
32+  ConsumerStrategies .Subscribe [String , String ](topicsSet, kafkaParams))
33+ 
34+  //  Get the lines, split them into words, count the words and print
35+  val  lines  =  messages.map(_.value)
36+  val  words  =  lines.flatMap(_.split("  " 
37+  val  wordCounts  =  words.map(x =>  (x, 1L )).reduceByKey(_ +  _)
38+  wordCounts.print()
39+ 
40+ /*  messages.foreachRDD{rdd =>
41+  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
42+  rdd.foreachPartition { item => 
43+  val o: OffsetRange = offsetRanges(TaskContext.get.partitionId) 
44+ 
45+  println(s"The record from topic [${o.topic}] is in partition ${o.partition} which offset from ${o.fromOffset} to ${o.untilOffset}") 
46+  println(s"The record content is ${item.toList.mkString}") 
47+  } 
48+  rdd.count() 
49+  }*/  
50+ 
51+  //  Start the computation
52+  ssc.start()
53+  ssc.awaitTermination()
54+  }
55+ }
0 commit comments