Skip to content

Commit 9107756

Browse files
committed
kafka test update
1 parent 7056e53 commit 9107756

File tree

1 file changed

+5
-5
lines changed

1 file changed

+5
-5
lines changed

src/main/scala/com/hyzs/spark/streaming/DirectKafkaWordCount.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import org.apache.spark.streaming.kafka010._
1212
object DirectKafkaWordCount {
1313
def main(args: Array[String]) {
1414

15-
val Array(brokers, groupId, topics) = Array("111.230.17.36:9094","testGroup02","jd_data01")
15+
val Array(brokers, groupId, topics) = Array("111.230.17.36:9094","testGroup","kylin_streaming_topic")
1616

1717
// Create context with 2 second batch interval
1818
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local")
@@ -32,12 +32,12 @@ object DirectKafkaWordCount {
3232
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
3333

3434
// Get the lines, split them into words, count the words and print
35-
val lines = messages.map(_.value)
35+
/* val lines = messages.map(_.value)
3636
val words = lines.flatMap(_.split(" "))
3737
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
38-
wordCounts.print()
38+
wordCounts.print()*/
3939

40-
/* messages.foreachRDD{rdd =>
40+
messages.foreachRDD{rdd =>
4141
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
4242
rdd.foreachPartition { item =>
4343
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
@@ -46,7 +46,7 @@ object DirectKafkaWordCount {
4646
println(s"The record content is ${item.toList.mkString}")
4747
}
4848
rdd.count()
49-
}*/
49+
}
5050

5151
// Start the computation
5252
ssc.start()

0 commit comments

Comments
 (0)