Skip to content

Commit 8484909

Browse files
authored
Merge pull request arnaud-lb#179 from awons/awons/allow-message-deletion
Allow for null key and null message
2 parents 24ed215 + b84e8fc commit 8484909

File tree

3 files changed

+97
-5
lines changed

3 files changed

+97
-5
lines changed

tests/allow_null_payload.phpt

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
--TEST--
2+
Allow null payload
3+
--SKIPIF--
4+
<?php
5+
file_exists(__DIR__."/test_env.php") || die("skip");
6+
--FILE--
7+
<?php
8+
9+
require __DIR__."/test_env.php";
10+
11+
$topicName = sprintf('test_rdkafka_%s', uniqid());
12+
13+
$producer = new RdKafka\Producer();
14+
$producer->addBrokers(TEST_KAFKA_BROKERS);
15+
$topic = $producer->newTopic($topicName);
16+
17+
$topic->produce(0, 0, NULL, 'message_key_1');
18+
19+
while ($producer->getOutQLen() > 0) {
20+
$producer->poll(50);
21+
}
22+
23+
$consumer = new RdKafka\Consumer();
24+
$consumer->addBrokers(TEST_KAFKA_BROKERS);
25+
26+
$topic = $consumer->newTopic($topicName);
27+
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
28+
29+
while (true) {
30+
$message = $topic->consume(0, 1000);
31+
if ($message === null) {
32+
continue;
33+
}
34+
switch ($message->err) {
35+
case RD_KAFKA_RESP_ERR_NO_ERROR:
36+
var_dump($message->payload);
37+
var_dump($message->key);
38+
break 2;
39+
}
40+
}
41+
42+
$topic->consumeStop(0);
43+
44+
--EXPECTF--
45+
NULL
46+
string(13) "message_key_1"
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
--TEST--
2+
Allow null payload
3+
--SKIPIF--
4+
<?php
5+
file_exists(__DIR__."/test_env.php") || die("skip");
6+
--FILE--
7+
<?php
8+
9+
require __DIR__."/test_env.php";
10+
11+
$topicName = sprintf('test_rdkafka_%s', uniqid());
12+
13+
$producer = new RdKafka\Producer();
14+
$producer->addBrokers(TEST_KAFKA_BROKERS);
15+
$topic = $producer->newTopic($topicName);
16+
17+
$topic->produce(0, 0);
18+
19+
while ($producer->getOutQLen() > 0) {
20+
$producer->poll(50);
21+
}
22+
23+
$consumer = new RdKafka\Consumer();
24+
$consumer->addBrokers(TEST_KAFKA_BROKERS);
25+
26+
$topic = $consumer->newTopic($topicName);
27+
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
28+
29+
while (true) {
30+
$message = $topic->consume(0, 1000);
31+
if ($message === null) {
32+
continue;
33+
}
34+
switch ($message->err) {
35+
case RD_KAFKA_RESP_ERR_NO_ERROR:
36+
var_dump($message->payload);
37+
var_dump($message->key);
38+
break 2;
39+
}
40+
}
41+
42+
$topic->consumeStop(0);
43+
44+
--EXPECTF--
45+
NULL
46+
NULL

topic.c

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -353,10 +353,10 @@ static const zend_function_entry kafka_kafka_consumer_topic_fe[] = {
353353
PHP_FE_END
354354
};
355355

356-
/* {{{ proto void RdKafka\ProducerTopic::produce(int $partition, int $msgflags, string $payload[, string $key])
356+
/* {{{ proto void RdKafka\ProducerTopic::produce(int $partition, int $msgflags[, string $payload, string $key])
357357
Produce and send a single message to broker. */
358358

359-
ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_produce, 0, 0, 3)
359+
ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_produce, 0, 0, 2)
360360
ZEND_ARG_INFO(0, partition)
361361
ZEND_ARG_INFO(0, msgflags)
362362
ZEND_ARG_INFO(0, payload)
@@ -367,15 +367,15 @@ PHP_METHOD(RdKafka__ProducerTopic, produce)
367367
{
368368
long partition;
369369
long msgflags;
370-
char *payload;
371-
arglen_t payload_len;
370+
char *payload = NULL;
371+
arglen_t payload_len = 0;
372372
char *key = NULL;
373373
arglen_t key_len = 0;
374374
int ret;
375375
rd_kafka_resp_err_t err;
376376
kafka_topic_object *intern;
377377

378-
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "lls|s!", &partition, &msgflags, &payload, &payload_len, &key, &key_len) == FAILURE) {
378+
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ll|s!s!", &partition, &msgflags, &payload, &payload_len, &key, &key_len) == FAILURE) {
379379
return;
380380
}
381381

0 commit comments

Comments
 (0)