Skip to content

Commit ecf93e1

Browse files
clone codec per worker
added line codec tests missing spec
1 parent 991b999 commit ecf93e1

File tree

4 files changed

+50
-26
lines changed

4 files changed

+50
-26
lines changed

lib/logstash/inputs/udp.rb

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ class LogStash::Inputs::Udp < LogStash::Inputs::Base
3737
# before packets will start dropping.
3838
config :queue_size, :validate => :number, :default => 2000
3939

40+
HOST_FIELD = "host".freeze
41+
4042
public
4143
def initialize(params)
4244
super
@@ -86,7 +88,7 @@ def udp_listener(output_queue)
8688

8789
@input_workers = @workers.times do |i|
8890
@logger.debug("Starting UDP worker thread", :worker => i)
89-
Thread.new { inputworker(i) }
91+
Thread.new(i, @codec.clone) { |i, codec| inputworker(i, codec) }
9092
end
9193

9294
while !stop?
@@ -109,17 +111,15 @@ def udp_listener(output_queue)
109111
end
110112
end # def udp_listener
111113

112-
def inputworker(number)
114+
def inputworker(number, codec)
113115
LogStash::Util::set_thread_name("<udp.#{number}")
114116
begin
115117
while true
116118
payload, client = @input_to_worker.pop
119+
host = client[3]
117120

118-
@codec.decode(payload) do |event|
119-
decorate(event)
120-
event.set("host", client[3]) if event.get("host").nil?
121-
@output_queue.push(event)
122-
end
121+
codec.decode(payload) { |event| push_decoded_event(host, event) }
122+
codec.flush { |event| push_decoded_event(host, event) }
123123
end
124124
rescue => e
125125
@logger.error("Exception in inputworker", "exception" => e, "backtrace" => e.backtrace)
@@ -136,4 +136,12 @@ def stop
136136
@udp.close rescue nil
137137
end
138138

139+
private
140+
141+
def push_decoded_event(host, event)
142+
decorate(event)
143+
event.set(HOST_FIELD, host) if event.get(HOST_FIELD).nil?
144+
@output_queue.push(event)
145+
end
146+
139147
end # class LogStash::Inputs::Udp

logstash-input-udp.gemspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ Gem::Specification.new do |s|
2424

2525
s.add_runtime_dependency 'logstash-codec-plain'
2626
s.add_runtime_dependency 'stud', '~> 0.0.22'
27+
s.add_development_dependency 'logstash-codec-line'
2728
s.add_development_dependency 'logstash-devutils'
2829
end
2930

spec/inputs/udp_spec.rb

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@
99
end
1010

1111
let!(:helper) { UdpHelpers.new }
12-
let(:port) { rand(1024..65535) }
13-
subject { LogStash::Plugin.lookup("input", "udp").new({ "port" => port }) }
12+
let(:client) { LogStash::Inputs::Test::UDPClient.new(port) }
13+
let(:port) { rand(1024..65535) }
14+
let(:config) { { "port" => port } }
15+
subject { LogStash::Plugin.lookup("input","udp").new(config) }
1416

1517
after :each do
1618
subject.close rescue nil
@@ -23,8 +25,6 @@
2325
end
2426

2527
describe "receive" do
26-
27-
let(:client) { LogStash::Inputs::Test::UDPClient.new(port) }
2828
let(:nevents) { 10 }
2929

3030
let(:events) do
@@ -46,10 +46,32 @@
4646
expect(message).to match(/msg \d+/)
4747
end
4848
end
49+
end
50+
51+
describe "multiple lines per datagram using line codec" do
52+
# 3 workers for 3 datagrams send below
53+
let(:config) { { "port" => port, "workers" => 3, "codec" => "line" } }
54+
55+
let(:events) do
56+
helper.input(subject, 8) do
57+
client.send("line1\nline2")
58+
client.send("line3\nline4")
59+
client.send("line5\nline6\nline7\nline8")
60+
end
61+
end
4962

63+
before(:each) do
64+
subject.register
65+
end
66+
67+
it "should receive events been generated" do
68+
expect(events.size).to be(8)
69+
messages = events.map { |event| event.get("message") }.sort # important to sort here because order is unpredictable
70+
messages.each_index {|i| expect(messages[i]).to match("line#{i + 1}")}
71+
end
5072
end
5173

5274
it_behaves_like "an interruptible input plugin" do
53-
let(:config) { { "port" => port } }
75+
# see https://github.com/elastic/logstash-devutils/blob/9c4a1fbf2b0c4547e428c5a40ed84f60aad17f97/lib/logstash/devutils/rspec/shared_examples.rb
5476
end
55-
end
77+
end

spec/support/client.rb

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
module LogStash::Inputs::Test
55

66
class UDPClient
7-
87
attr_reader :host, :port, :socket
98

109
def initialize(port)
@@ -14,24 +13,18 @@ def initialize(port)
1413
socket.connect(host, port)
1514
end
1615

17-
def send(msg="")
16+
def send(msg)
1817
begin
19-
send(msg)
20-
rescue Exception => e
21-
puts "send.exception", e
18+
socket.connect(host, port) if socket.closed?
19+
socket.send(msg, 0)
20+
rescue => e
21+
puts("send exception, retrying", e.inspect)
2222
retry
2323
end
2424
end
2525

26-
def send(msg)
27-
socket.connect(host, port) if socket.closed?
28-
socket.send(msg, 0)
29-
end
30-
3126
def close
32-
socket.close
27+
socket.close unless socket.closed?
3328
end
34-
3529
end
36-
3730
end

0 commit comments

Comments
 (0)