File tree Expand file tree Collapse file tree 2 files changed +69
-0
lines changed Expand file tree Collapse file tree 2 files changed +69
-0
lines changed Original file line number Diff line number Diff line change @@ -23,6 +23,7 @@ def method_missing(method, *args)
2323end
2424
2525require 'fluent/plugin/out_kafka'
26+ require 'fluent/plugin/out_kafka_buffered'
2627require 'fluent/plugin/out_kafka2'
2728require 'fluent/plugin/in_kafka'
2829require 'fluent/plugin/in_kafka_group'
Original file line number Diff line number Diff line change 1+ require 'helper'
2+ require 'fluent/output'
3+
4+ class KafkaBufferedOutputTest < Test ::Unit ::TestCase
5+ def setup
6+ Fluent ::Test . setup
7+ end
8+
9+ BASE_CONFIG = %[
10+ type kafka_buffered
11+ ]
12+
13+ CONFIG = BASE_CONFIG + %[
14+ default_topic kitagawakeiko
15+ brokers localhost:9092
16+ ]
17+
18+ def create_driver ( conf = CONFIG , tag = 'test' )
19+ Fluent ::Test ::BufferedOutputTestDriver . new ( Fluent ::KafkaOutputBuffered , tag ) . configure ( conf )
20+ end
21+
22+ def test_configure
23+ assert_nothing_raised ( Fluent ::ConfigError ) {
24+ create_driver ( BASE_CONFIG )
25+ }
26+
27+ assert_nothing_raised ( Fluent ::ConfigError ) {
28+ create_driver ( CONFIG )
29+ }
30+
31+ assert_nothing_raised ( Fluent ::ConfigError ) {
32+ create_driver ( CONFIG + %[
33+ buffer_type memory
34+ ] )
35+ }
36+
37+ d = create_driver
38+ assert_equal 'kitagawakeiko' , d . instance . default_topic
39+ assert_equal 'localhost:9092' , d . instance . brokers
40+ end
41+
42+ def test_format
43+ d = create_driver
44+ end
45+
46+ data ( "crc32" => "crc32" ,
47+ "murmur2" => "murmur2" )
48+ def test_partitioner_hash_function ( data )
49+ hash_type = data
50+ d = create_driver ( CONFIG + %[partitioner_hash_function #{ hash_type } ] )
51+ assert_nothing_raised do
52+ d . instance . refresh_client
53+ end
54+ end
55+
56+ def test_mutli_worker_support
57+ d = create_driver
58+ assert_equal true , d . instance . multi_workers_ready?
59+
60+ end
61+
62+ def test_write
63+ d = create_driver
64+ time = Time . parse ( "2011-01-02 13:14:15 UTC" ) . to_i
65+ d . emit ( { "a" => 1 } , time )
66+ d . emit ( { "a" => 2 } , time )
67+ end
68+ end
You can’t perform that action at this time.
0 commit comments