File tree Expand file tree Collapse file tree 2 files changed +67
-0
lines changed Expand file tree Collapse file tree 2 files changed +67
-0
lines changed Original file line number Diff line number Diff line change @@ -22,6 +22,7 @@ def method_missing(method, *args)
2222end
2323
2424require 'fluent/plugin/out_kafka'
25+ require 'fluent/plugin/in_kafka'
2526
2627class Test ::Unit ::TestCase
2728end
Original file line number Diff line number Diff line change 1+ require 'helper'
2+ require 'fluent/test/driver/input'
3+ require 'securerandom'
4+
5+ class KafkaInputTest < Test ::Unit ::TestCase
6+ def setup
7+ Fluent ::Test . setup
8+ end
9+
10+ TOPIC_NAME = "kafka-input-#{ SecureRandom . uuid } "
11+
12+ CONFIG = %[
13+ @type kafka
14+ brokers localhost:9092
15+ format text
16+ @label @kafka
17+ topics #{ TOPIC_NAME }
18+ ]
19+
20+ def create_driver ( conf = CONFIG )
21+ Fluent ::Test ::Driver ::Input . new ( Fluent ::KafkaInput ) . configure ( conf )
22+ end
23+
24+
25+ def test_configure
26+ d = create_driver
27+ assert_equal TOPIC_NAME , d . instance . topics
28+ assert_equal 'text' , d . instance . format
29+ assert_equal 'localhost:9092' , d . instance . brokers
30+ end
31+
32+ def test_multi_worker_support
33+ d = create_driver
34+ assert_false d . instance . multi_workers_ready?
35+ end
36+
37+ class ConsumeTest < self
38+ def setup
39+ @kafka = Kafka . new ( [ "localhost:9092" ] , client_id : 'kafka' )
40+ @producer = @kafka . producer
41+ end
42+
43+ def teardown
44+ @kafka . delete_topic ( TOPIC_NAME )
45+ @kafka . close
46+ end
47+
48+ def test_consume
49+ conf = %[
50+ @type kafka
51+ brokers localhost:9092
52+ format text
53+ @label @kafka
54+ topics #{ TOPIC_NAME }
55+ ]
56+ d = create_driver
57+
58+ d . run ( expect_records : 1 , timeout : 10 ) do
59+ @producer . produce ( "Hello, fluent-plugin-kafka!" , topic : TOPIC_NAME )
60+ @producer . deliver_messages
61+ end
62+ expected = { 'message' => 'Hello, fluent-plugin-kafka!' }
63+ assert_equal expected , d . events [ 0 ] [ 2 ]
64+ end
65+ end
66+ end
You can’t perform that action at this time.
0 commit comments