Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
880d77a
Refactor: review/normalize (exception) logging
kares Mar 29, 2022
d315921
Refactor: code for improved readability
kares Mar 29, 2022
ce5a90c
Test: add some happy/error path tests
kares Mar 29, 2022
30ddbe7
a close operation should release client sockets
kares Mar 29, 2022
87afddc
Feat: ssl_supported_protocols option
kares Mar 30, 2022
dd4072f
Refactor: less noise on warnings
kares Mar 30, 2022
4490a22
Docs: copy-pasta for ssl_supported_protocols
kares Mar 30, 2022
33b61ae
jruby-openssl >= 0.12.2 dependency needed
kares Mar 30, 2022
7cae392
minor
kares Mar 30, 2022
63e7813
Test: unit test for ssl_supported_protocols
kares Mar 30, 2022
5909151
a note on the current limitation
kares Mar 30, 2022
3cca9ea
Test: JSON dependency
kares Mar 30, 2022
d391254
Test: let's just skip the test on 6.x
kares Mar 30, 2022
dbdf66d
Revert test container detection hack
kares Apr 18, 2022
3737657
bump + changelog
kares Apr 18, 2022
b2210b6
require latest LS 8.1 due jruby-openssl pinning
kares Apr 28, 2022
0c94ec0
redundant docs related to Java 8
kares Apr 28, 2022
0d68ece
one only
kares Apr 28, 2022
53e138d
Update lib/logstash/outputs/tcp.rb
kares May 5, 2022
d5b7dc3
remove limitation on setting only ranges
kares May 5, 2022
1193e63
might happen multiple times
kares May 5, 2022
c063e2e
close em all!
kares May 5, 2022
051ae4c
Refactor: logging helpers to do ~ same
kares May 5, 2022
69115c2
(more) thread-safe closing of client threads
kares May 5, 2022
b775674
nonblock accept - to not be waiting while closing
kares May 5, 2022
7b4d61f
a bit of thead naming convention - yay!
kares May 5, 2022
c4f1628
Update lib/logstash/outputs/tcp.rb
kares May 9, 2022
656a9af
restore filtering out dead threads from i-var
kares May 9, 2022
5481865
Review: use pipeline_id when naming threads
kares May 9, 2022
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,2 +1,8 @@
import:
- logstash-plugins/.ci:travis/travis.yml@1.x
- logstash-plugins/.ci:travis/defaults.yml@1.x
- logstash-plugins/.ci:travis/exec.yml@1.x

env:
jobs:
- ELASTIC_STACK_VERSION=8.x
- SNAPSHOT=true ELASTIC_STACK_VERSION=8.x
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 6.1.0
- Feat: ssl_supported_protocols (TLSv1.3) [#47](https://github.com/logstash-plugins/logstash-output-tcp/pull/47)
- Fix: close server and client sockets on plugin close

## 6.0.2
- Fix: unable to start with password protected key [#45](https://github.com/logstash-plugins/logstash-output-tcp/pull/45)

Expand Down
15 changes: 15 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
| <<plugins-{type}s-{plugin}-ssl_enable>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-ssl_key>> |a valid filesystem path|No
| <<plugins-{type}s-{plugin}-ssl_key_passphrase>> |<<password,password>>|No
| <<plugins-{type}s-{plugin}-ssl_supported_protocols>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-ssl_verify>> |<<boolean,boolean>>|No
|=======================================================================

Expand Down Expand Up @@ -130,6 +131,20 @@ SSL key path

SSL key passphrase

[id="plugins-{type}s-{plugin}-ssl_supported_protocols"]
===== `ssl_supported_protocols`

* Value type is <<string,string>>
* Allowed values are: `'TLSv1.1'`, `'TLSv1.2'`, `'TLSv1.3'`
* Default depends on the JDK being used. With up-to-date Logstash, the default is `['TLSv1.2', 'TLSv1.3']`.
`'TLSv1.1'` is not considered secure and is only provided for legacy applications.

List of allowed SSL/TLS versions to use when establishing a secure connection.

NOTE: If you configure the plugin to use `'TLSv1.1'` on any recent JVM, such as the one packaged with Logstash,
the protocol is disabled by default and needs to be enabled manually by changing `jdk.tls.disabledAlgorithms` in
the *$JDK_HOME/conf/security/java.security* configuration file. That is, `TLSv1.1` needs to be removed from the list.

[id="plugins-{type}s-{plugin}-ssl_verify"]
===== `ssl_verify`

Expand Down
121 changes: 89 additions & 32 deletions lib/logstash/outputs/tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,38 +51,43 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base
# SSL key passphrase
config :ssl_key_passphrase, :validate => :password, :default => nil

# NOTE: the default setting [] uses SSL engine defaults
config :ssl_supported_protocols, :validate => ['TLSv1.1', 'TLSv1.2', 'TLSv1.3'], :default => [], :list => true

class Client
public

def initialize(socket, logger)
@socket = socket
@logger = logger
@queue = Queue.new
end

public
def run
loop do
begin
@socket.write(@queue.pop)
rescue => e
@logger.warn("tcp output exception", :socket => @socket,
:exception => e)
log_warn 'socket write failed:', e, socket: (@socket ? @socket.to_s : nil)
break
end
end
end # def run

public
def write(msg)
@queue.push(msg)
end # def write

def close
@socket.close
rescue => e
log_warn 'socket close failed:', e, socket: (@socket ? @socket.to_s : nil)
end
end # class Client

private
def setup_ssl
require "openssl"

@ssl_context = OpenSSL::SSL::SSLContext.new
@ssl_context = new_ssl_context
if @ssl_cert
@ssl_context.cert = OpenSSL::X509::Certificate.new(File.read(@ssl_cert))
if @ssl_key
Expand All @@ -104,50 +109,74 @@ def setup_ssl
@ssl_context.cert_store = @cert_store
@ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER|OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT
end
end # def setup_ssl

public
@ssl_context.min_version = :TLS1_1 # not strictly required - JVM should have disabled TLSv1
if ssl_supported_protocols.any?
disabled_protocols = ['TLSv1.1', 'TLSv1.2', 'TLSv1.3'] - ssl_supported_protocols
unless OpenSSL::SSL.const_defined? :OP_NO_TLSv1_3 # work-around JRuby-OpenSSL bug - missing constant
@ssl_context.max_version = :TLS1_2 if disabled_protocols.delete('TLSv1.3')
end
# mapping 'TLSv1.2' -> OpenSSL::SSL::OP_NO_TLSv1_2
disabled_protocols.map! { |v| OpenSSL::SSL.const_get "OP_NO_#{v.sub('.', '_')}" }
@ssl_context.options = disabled_protocols.reduce(@ssl_context.options, :|)
end
@ssl_context
end
private :setup_ssl

# @note to be able to hook up into #ssl_context from tests
def new_ssl_context
OpenSSL::SSL::SSLContext.new
end
private :new_ssl_context

# @overload Base#register
def register
require "socket"
require "stud/try"
if @ssl_enable
setup_ssl
end # @ssl_enable
@closed = Concurrent::AtomicBoolean.new(false)
setup_ssl if @ssl_enable

if server?
@logger.info("Starting tcp output listener", :address => "#{@host}:#{@port}")
begin
@server_socket = TCPServer.new(@host, @port)
rescue Errno::EADDRINUSE
@logger.error("Could not start TCP server: Address in use",
:host => @host, :port => @port)
@logger.error("Could not start tcp server: Address in use", host: @host, port: @port)
raise
end
if @ssl_enable
@server_socket = OpenSSL::SSL::SSLServer.new(@server_socket, @ssl_context)
end # @ssl_enable
@client_threads = []
@client_threads = Concurrent::Array.new

@accept_thread = Thread.new(@server_socket) do |server_socket|
LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|server_accept")
loop do
Thread.start(server_socket.accept) do |client_socket|
break if @closed.value
client_socket = server_socket.accept_nonblock exception: false
if client_socket == :wait_readable
IO.select [ server_socket ]
next
end
Thread.start(client_socket) do |client_socket|
# monkeypatch a 'peer' method onto the socket.
client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
@logger.debug("Accepted connection", :client => client_socket.peer,
:server => "#{@host}:#{@port}")
@logger.debug("accepted connection", client: client_socket.peer, server: "#{@host}:#{@port}")
client = Client.new(client_socket, @logger)
Thread.current[:client] = client
LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|client_socket-#{@client_threads.size}")
@client_threads << Thread.current
client.run
client.run unless @closed.value
end
end
end

@codec.on_event do |event, payload|
@client_threads.select!(&:alive?)
@client_threads.each do |client_thread|
client_thread[:client].write(payload)
end
@client_threads.reject! {|t| !t.alive? }
end
else
client_socket = nil
Expand All @@ -163,18 +192,35 @@ def register
# Now send the payload
client_socket.syswrite(payload) if w.any?
rescue => e
@logger.warn("tcp output exception", :host => @host, :port => @port,
:exception => e, :backtrace => e.backtrace)
log_warn "client socket failed:", e, host: @host, port: @port, socket: (client_socket ? client_socket.to_s : nil)
client_socket.close rescue nil
client_socket = nil
sleep @reconnect_interval
retry
end
end
end
end # def register
end

# @overload Base#receive
def receive(event)
@codec.encode(event)
end

# @overload Base#close
def close
@closed.make_true
@server_socket.close rescue nil if @server_socket

return unless @client_threads
@client_threads.each do |thread|
client = thread[:client]
client.close rescue nil if client
end
end

private

def connect
begin
client_socket = TCPSocket.new(@host, @port)
Expand All @@ -183,29 +229,40 @@ def connect
begin
client_socket.connect
rescue OpenSSL::SSL::SSLError => ssle
@logger.error("SSL Error", :exception => ssle, :backtrace => ssle.backtrace)
log_error 'connect ssl failure:', ssle, backtrace: false
# NOTE(mrichar1): Hack to prevent hammering peer
sleep(5)
raise
end
end
client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
@logger.debug("Opened connection", :client => "#{client_socket.peer}")
@logger.debug("opened connection", :client => client_socket.peer)
return client_socket
rescue StandardError => e
@logger.error("Failed to connect: #{e.message}", :exception => e.class, :backtrace => e.backtrace)
rescue => e
log_error 'failed to connect:', e
sleep @reconnect_interval
retry
end
end # def connect

private
def server?
@mode == "server"
end # def server?

public
def receive(event)
@codec.encode(event)
end # def receive
def pipeline_id
execution_context.pipeline_id || 'main'
end

def log_warn(msg, e, backtrace: @logger.debug?, **details)
details = details.merge message: e.message, exception: e.class
details[:backtrace] = e.backtrace if backtrace
@logger.warn(msg, details)
end

def log_error(msg, e, backtrace: @logger.info?, **details)
details = details.merge message: e.message, exception: e.class
details[:backtrace] = e.backtrace if backtrace
@logger.error(msg, details)
end

end # class LogStash::Outputs::Tcp
7 changes: 5 additions & 2 deletions logstash-output-tcp.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-output-tcp'
s.version = '6.0.2'
s.version = '6.1.0'
s.licenses = ['Apache License (2.0)']
s.summary = "Writes events over a TCP socket"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand All @@ -21,11 +21,14 @@ Gem::Specification.new do |s|

# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"

s.add_runtime_dependency 'logstash-core', '>= 8.1.0'
s.add_runtime_dependency 'logstash-codec-json'
s.add_runtime_dependency 'stud'

s.add_runtime_dependency 'jruby-openssl', '>= 0.12.2' # 0.12 supports TLSv1.3

s.add_development_dependency 'logstash-devutils'
s.add_development_dependency 'logstash-codec-plain'
s.add_development_dependency 'flores'
end

Loading