Skip to content

Commit 3eed8f1

Browse files
committed
Condition can be specified for consumers
1 parent d58d6c7 commit 3eed8f1

File tree

3 files changed

+26
-3
lines changed

3 files changed

+26
-3
lines changed

lib/consumer/postgres/postgres.rb

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@ def self.included(cls)
66
end
77
end
88

9-
def configure(session: nil, batch_size: nil, position_store: nil)
9+
def configure(session: nil, batch_size: nil, position_store: nil, condition: nil)
1010
MessageStore::Postgres::Get.configure(
1111
self,
12-
session: session,
13-
batch_size: batch_size
12+
batch_size: batch_size,
13+
condition: condition,
14+
session: session
1415
)
1516

1617
PositionStore.configure(

test/automated/condition.rb

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
require_relative './automated_init'
2+
3+
context "Condition" do
4+
stream_name = Controls::StreamName.example
5+
6+
condition = 'position = 0'
7+
8+
consumer = Controls::Consumer::Example.build(stream_name, condition: condition)
9+
10+
context "Get Dependency" do
11+
get = consumer.get
12+
13+
get_condition = get.condition
14+
15+
test "Condition is configured" do
16+
assert(get_condition == condition)
17+
end
18+
end
19+
end

test/interactive/start_consumer.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@
1010
end
1111
end
1212

13+
condition = ENV['CONDITION']
14+
1315
Actor::Supervisor.start do
1416
Controls::Consumer::Incrementing.start(
1517
category,
18+
condition: condition,
1619
position_update_interval: position_update_interval
1720
)
1821
end

0 commit comments

Comments
 (0)