Skip to content

Commit ccaf082

Browse files
committed
Add idle callback to the Consumer
1 parent 7d30e64 commit ccaf082

File tree

3 files changed

+82
-1
lines changed

3 files changed

+82
-1
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ messages. Simply pass it a message you wish to commit. For most cases it's recom
6767
and manually commit each message, so that you don't ever "lose" a message if your application dies
6868
in the middle of the processing.
6969

70+
You can also (optionally) set an idle callback using `setIdleCallback` method. This callback will be called whenever
71+
there are no messages to return. The idle interval is then equal to the `timeout` parameter if provided, or to the
72+
`consumerRequestTimeoutMs` option if set, otherwise to the proxy configuration option `consumer.request.timeout.ms`.
73+
7074
```php
7175
$consumerFactory = new ConsumerFactory($restClient);
7276
$consumer = $consumerFactory->create('your-consumer-group', Subscription::topic('your-topic'));

src/Consumer.php

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
use Grongor\KafkaRest\Api\Value\Response\Message;
1010
use Grongor\KafkaRest\Exception\ConsumerClosed;
1111
use Throwable;
12+
use function count;
1213

1314
final class Consumer
1415
{
@@ -18,6 +19,9 @@ final class Consumer
1819
/** @var Api\Value\Response\Consumer */
1920
private $consumer;
2021

22+
/** @var callable|null */
23+
private $idleCallback;
24+
2125
public function __construct(RestClient $client, Api\Value\Response\Consumer $consumer)
2226
{
2327
$this->client = $client;
@@ -33,6 +37,11 @@ public function __destruct()
3337
$this->close();
3438
}
3539

40+
public function setIdleCallback(callable $idleCallback) : void
41+
{
42+
$this->idleCallback = $idleCallback;
43+
}
44+
3645
/**
3746
* @return Message[]
3847
*/
@@ -44,7 +53,14 @@ public function consume(?int $timeout = null, ?int $maxBytes = null) : iterable
4453

4554
try {
4655
while (true) {
47-
yield from $this->client->getConsumerMessages($this->consumer, $timeout, $maxBytes);
56+
$messages = $this->client->getConsumerMessages($this->consumer, $timeout, $maxBytes);
57+
if ($this->idleCallback !== null && count($messages) === 0) {
58+
($this->idleCallback)();
59+
60+
continue;
61+
}
62+
63+
yield from $messages;
4864
}
4965
} catch (Throwable $throwable) {
5066
$this->close();

tests/ConsumerTest.php

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,67 @@ public function testConsume(?int $timeout, ?int $maxBytes) : void
7171
}
7272
}
7373

74+
/**
75+
* @dataProvider providerConsume
76+
*/
77+
public function testConsumeWithIdleCallback(?int $timeout, ?int $maxBytes) : void
78+
{
79+
$consumerVO = new \Grongor\KafkaRest\Api\Value\Response\Consumer();
80+
81+
$firstMessages = [];
82+
$secondMessages = [new Message(), new Message()];
83+
$thirdMessages = [];
84+
85+
$client = Mockery::mock(RestClient::class);
86+
$client->shouldReceive('getConsumerMessages')
87+
->once()
88+
->with($consumerVO, $timeout, $maxBytes)
89+
->andReturns($firstMessages);
90+
$client->shouldReceive('getConsumerMessages')
91+
->once()
92+
->with($consumerVO, $timeout, $maxBytes)
93+
->andReturns($secondMessages);
94+
$client->shouldReceive('getConsumerMessages')
95+
->once()
96+
->with($consumerVO, $timeout, $maxBytes)
97+
->andReturns($thirdMessages);
98+
$client->shouldReceive('getConsumerMessages')
99+
->once()
100+
->andThrow(new Exception('some exception'));
101+
102+
$client->shouldReceive('deleteConsumer')
103+
->once()
104+
->with($consumerVO);
105+
106+
$callbackCalledTimes = 0;
107+
108+
$consumer = new Consumer($client, $consumerVO);
109+
$consumer->setIdleCallback(static function () use (&$callbackCalledTimes) : void {
110+
$callbackCalledTimes++;
111+
});
112+
$messages = $consumer->consume($timeout, $maxBytes);
113+
114+
self::assertInstanceOf(Generator::class, $messages);
115+
116+
self::assertTrue($messages->valid());
117+
self::assertSame($secondMessages[0], $messages->current());
118+
119+
$messages->next();
120+
self::assertTrue($messages->valid());
121+
self::assertSame($secondMessages[1], $messages->current());
122+
123+
self::assertTrue($messages->valid());
124+
125+
try {
126+
$messages->next();
127+
self::fail('Expected an exception');
128+
} catch (ConsumerClosed $exception) {
129+
self::assertFalse($messages->valid());
130+
}
131+
132+
self::assertSame(2, $callbackCalledTimes);
133+
}
134+
74135
/**
75136
* @return iterable<array<mixed>>
76137
*/

0 commit comments

Comments
 (0)