Skip to content

Commit 77ff664

Browse files
author
Richard Pijnenburg
committed
This came from elasticsearch/logstash-contrib at d183d9cdcb07a56ce916b40b65786dafe6a3255e
0 parents commit 77ff664

File tree

7 files changed

+407
-0
lines changed

7 files changed

+407
-0
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
*.gem
2+
Gemfile.lock
3+
.bundle
4+
vendor

Gemfile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
source 'https://rubygems.org'
2+
gem 'rake'
3+
gem 'gem_publisher'
4+
gem 'archive-tar-minitar'

Rakefile

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
@files=[]
2+
3+
task :default do
4+
system("rake -T")
5+
end
6+

lib/logstash/inputs/sqlite.rb

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
# encoding: utf-8
2+
require "logstash/inputs/base"
3+
require "logstash/namespace"
4+
require "socket"
5+
6+
7+
# Read rows from an sqlite database.
8+
#
9+
# This is most useful in cases where you are logging directly to a table.
10+
# Any tables being watched must have an 'id' column that is monotonically
11+
# increasing.
12+
#
13+
# All tables are read by default except:
14+
# * ones matching 'sqlite_%' - these are internal/adminstrative tables for sqlite
15+
# * 'since_table' - this is used by this plugin to track state.
16+
#
17+
# ## Example
18+
#
19+
# % sqlite /tmp/example.db
20+
# sqlite> CREATE TABLE weblogs (
21+
# id INTEGER PRIMARY KEY AUTOINCREMENT,
22+
# ip STRING,
23+
# request STRING,
24+
# response INTEGER);
25+
# sqlite> INSERT INTO weblogs (ip, request, response)
26+
# VALUES ("1.2.3.4", "/index.html", 200);
27+
#
28+
# Then with this logstash config:
29+
#
30+
# input {
31+
# sqlite {
32+
# path => "/tmp/example.db"
33+
# type => weblogs
34+
# }
35+
# }
36+
# output {
37+
# stdout {
38+
# debug => true
39+
# }
40+
# }
41+
#
42+
# Sample output:
43+
#
44+
# {
45+
# "@source" => "sqlite://sadness/tmp/x.db",
46+
# "@tags" => [],
47+
# "@fields" => {
48+
# "ip" => "1.2.3.4",
49+
# "request" => "/index.html",
50+
# "response" => 200
51+
# },
52+
# "@timestamp" => "2013-05-29T06:16:30.850Z",
53+
# "@source_host" => "sadness",
54+
# "@source_path" => "/tmp/x.db",
55+
# "@message" => "",
56+
# "@type" => "foo"
57+
# }
58+
#
59+
class LogStash::Inputs::Sqlite < LogStash::Inputs::Base
60+
config_name "sqlite"
61+
milestone 1
62+
63+
# The path to the sqlite database file.
64+
config :path, :validate => :string, :required => true
65+
66+
# Any tables to exclude by name.
67+
# By default all tables are followed.
68+
config :exclude_tables, :validate => :array, :default => []
69+
70+
# How many rows to fetch at a time from each SELECT call.
71+
config :batch, :validate => :number, :default => 5
72+
73+
SINCE_TABLE = :since_table
74+
75+
public
76+
def init_placeholder_table(db)
77+
begin
78+
db.create_table SINCE_TABLE do
79+
String :table
80+
Int :place
81+
end
82+
rescue
83+
@logger.debug("since tables already exists")
84+
end
85+
end
86+
87+
public
88+
def get_placeholder(db, table)
89+
since = db[SINCE_TABLE]
90+
x = since.where(:table => "#{table}")
91+
if x[:place].nil?
92+
init_placeholder(db, table)
93+
return 0
94+
else
95+
@logger.debug("placeholder already exists, it is #{x[:place]}")
96+
return x[:place][:place]
97+
end
98+
end
99+
100+
public
101+
def init_placeholder(db, table)
102+
@logger.debug("init placeholder for #{table}")
103+
since = db[SINCE_TABLE]
104+
since.insert(:table => table, :place => 0)
105+
end
106+
107+
public
108+
def update_placeholder(db, table, place)
109+
@logger.debug("set placeholder to #{place}")
110+
since = db[SINCE_TABLE]
111+
since.where(:table => table).update(:place => place)
112+
end
113+
114+
public
115+
def get_all_tables(db)
116+
return db["SELECT * FROM sqlite_master WHERE type = 'table' AND tbl_name != '#{SINCE_TABLE}' AND tbl_name NOT LIKE 'sqlite_%'"].map { |t| t[:name] }.select { |n| !@exclude_tables.include?(n) }
117+
end
118+
119+
public
120+
def get_n_rows_from_table(db, table, offset, limit)
121+
dataset = db["SELECT * FROM #{table}"]
122+
return db["SELECT * FROM #{table} WHERE (id > #{offset}) ORDER BY 'id' LIMIT #{limit}"].map { |row| row }
123+
end
124+
125+
public
126+
def register
127+
require "sequel"
128+
require "jdbc/sqlite3"
129+
@host = Socket.gethostname
130+
@logger.info("Registering sqlite input", :database => @path)
131+
@db = Sequel.connect("jdbc:sqlite:#{@path}")
132+
@tables = get_all_tables(@db)
133+
@table_data = {}
134+
@tables.each do |table|
135+
init_placeholder_table(@db)
136+
last_place = get_placeholder(@db, table)
137+
@table_data[table] = { :name => table, :place => last_place }
138+
end
139+
end # def register
140+
141+
public
142+
def run(queue)
143+
sleep_min = 0.01
144+
sleep_max = 5
145+
sleeptime = sleep_min
146+
147+
begin
148+
@logger.debug("Tailing sqlite db", :path => @path)
149+
loop do
150+
count = 0
151+
@table_data.each do |k, table|
152+
table_name = table[:name]
153+
offset = table[:place]
154+
@logger.debug("offset is #{offset}", :k => k, :table => table_name)
155+
rows = get_n_rows_from_table(@db, table_name, offset, @batch)
156+
count += rows.count
157+
rows.each do |row|
158+
event = LogStash::Event.new("host" => @host, "db" => @db)
159+
decorate(event)
160+
# store each column as a field in the event.
161+
row.each do |column, element|
162+
next if column == :id
163+
event[column.to_s] = element
164+
end
165+
queue << event
166+
@table_data[k][:place] = row[:id]
167+
end
168+
# Store the last-seen row in the database
169+
update_placeholder(@db, table_name, @table_data[k][:place])
170+
end
171+
172+
if count == 0
173+
# nothing found in that iteration
174+
# sleep a bit
175+
@logger.debug("No new rows. Sleeping.", :time => sleeptime)
176+
sleeptime = [sleeptime * 2, sleep_max].min
177+
sleep(sleeptime)
178+
else
179+
sleeptime = sleep_min
180+
end
181+
end # loop
182+
end # begin/rescue
183+
end #run
184+
185+
end # class Logtstash::Inputs::EventLog
186+

logstash-input-sqlite.gemspec

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
Gem::Specification.new do |s|
2+
3+
s.name = 'logstash-input-sqlite'
4+
s.version = '0.1.0'
5+
s.licenses = ['Apache License (2.0)']
6+
s.summary = "Read rows from an sqlite database."
7+
s.description = "Read rows from an sqlite database."
8+
s.authors = ["Elasticsearch"]
9+
s.email = 'richard.pijnenburg@elasticsearch.com'
10+
s.homepage = "http://logstash.net/"
11+
s.require_paths = ["lib"]
12+
13+
# Files
14+
s.files = `git ls-files`.split($\)+::Dir.glob('vendor/*')
15+
16+
# Tests
17+
s.test_files = s.files.grep(%r{^(test|spec|features)/})
18+
19+
# Special flag to let us know this is actually a logstash plugin
20+
s.metadata = { "logstash_plugin" => "true", "group" => "input" }
21+
22+
# Gem dependencies
23+
s.add_runtime_dependency 'logstash', '>= 1.4.0', '< 2.0.0'
24+
25+
s.add_runtime_dependency 'sequel'
26+
s.add_runtime_dependency 'jdbc-sqlite3'
27+
28+
end
29+

rakelib/publish.rake

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
require "gem_publisher"
2+
3+
desc "Publish gem to RubyGems.org"
4+
task :publish_gem do |t|
5+
gem_file = Dir.glob(File.expand_path('../*.gemspec',File.dirname(__FILE__))).first
6+
gem = GemPublisher.publish_if_updated(gem_file, :rubygems)
7+
puts "Published #{gem}" if gem
8+
end
9+

0 commit comments

Comments
 (0)