Skip to content

Commit e87dde2

Browse files
author
Clement Thuault
committed
dr_msg_cb: Added support for message delivery callback
1 parent 8e5d3b4 commit e87dde2

File tree

5 files changed

+87
-0
lines changed

5 files changed

+87
-0
lines changed

conf.c

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ void kafka_conf_callbacks_dtor(kafka_conf_callbacks *cbs TSRMLS_DC) /* {{{ */
4545
{
4646
kafka_conf_callback_dtor(&cbs->error TSRMLS_CC);
4747
kafka_conf_callback_dtor(&cbs->rebalance TSRMLS_CC);
48+
kafka_conf_callback_dtor(&cbs->dr_msg TSRMLS_CC);
4849
} /* }}} */
4950

5051
static void kafka_conf_callback_copy(kafka_conf_callback *to, kafka_conf_callback *from TSRMLS_DC) /* {{{ */
@@ -59,6 +60,7 @@ void kafka_conf_callbacks_copy(kafka_conf_callbacks *to, kafka_conf_callbacks *f
5960
{
6061
kafka_conf_callback_copy(&to->error, &from->error TSRMLS_CC);
6162
kafka_conf_callback_copy(&to->rebalance, &from->rebalance TSRMLS_CC);
63+
kafka_conf_callback_copy(&to->dr_msg, &from->dr_msg TSRMLS_CC);
6264
} /* }}} */
6365

6466
static void kafka_conf_free(void *object TSRMLS_DC) /* {{{ */
@@ -158,6 +160,45 @@ static void kafka_conf_error_cb(rd_kafka_t *rk, int err, const char *reason, voi
158160
zval_ptr_dtor(&zreason);
159161
}
160162

163+
static void kafka_conf_dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *msg, void *opaque)
164+
{
165+
kafka_conf_callbacks *cbs = (kafka_conf_callbacks*) opaque;
166+
zval *retval;
167+
zval **args[2];
168+
zval *zrk;
169+
zval *zrkmsg;
170+
TSRMLS_FETCH();
171+
172+
if (!opaque) {
173+
return;
174+
}
175+
176+
if (!cbs->dr_msg.fci.function_name) {
177+
return;
178+
}
179+
180+
ALLOC_INIT_ZVAL(zrk);
181+
KAFKA_ZVAL_ZVAL(zrk, &cbs->rk, 1, 0);
182+
183+
ALLOC_INIT_ZVAL(zrkmsg);
184+
kafka_message_new(zrkmsg, msg);
185+
186+
args[0] = &zrk;
187+
args[1] = &zrkmsg;
188+
189+
cbs->dr_msg.fci.retval_ptr_ptr = &retval;
190+
cbs->dr_msg.fci.params = args;
191+
cbs->dr_msg.fci.param_count = 2;
192+
193+
zend_call_function(&cbs->dr_msg.fci, &cbs->dr_msg.fcc TSRMLS_CC);
194+
195+
if (retval) {
196+
zval_ptr_dtor(&retval);
197+
}
198+
zval_ptr_dtor(&zrk);
199+
zval_ptr_dtor(&zrkmsg);
200+
}
201+
161202
#ifdef HAVE_NEW_KAFKA_CONSUMER
162203
static void kafka_conf_rebalance_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque)
163204
{
@@ -396,6 +437,41 @@ PHP_METHOD(RdKafka__Conf, setErrorCb)
396437
}
397438
/* }}} */
398439

440+
/* {{{ proto void RdKafka\Conf::setDrMsgCb(callable $callback)
441+
Sets the delivery report callback */
442+
443+
ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_conf_set_dr_msg_cb, 0, 0, 1)
444+
ZEND_ARG_INFO(0, callback)
445+
ZEND_END_ARG_INFO()
446+
447+
PHP_METHOD(RdKafka__Conf, setDrMsgCb)
448+
{
449+
zend_fcall_info fci;
450+
zend_fcall_info_cache fcc;
451+
kafka_conf_object *intern;
452+
453+
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "f", &fci, &fcc) == FAILURE) {
454+
return;
455+
}
456+
457+
intern = get_kafka_conf_object(getThis() TSRMLS_CC);
458+
if (!intern) {
459+
return;
460+
}
461+
462+
Z_ADDREF_P(fci.function_name);
463+
464+
if (intern->cbs.dr_msg.fci.function_name) {
465+
zval_ptr_dtor(&intern->cbs.dr_msg.fci.function_name);
466+
}
467+
468+
intern->cbs.dr_msg.fci = fci;
469+
intern->cbs.dr_msg.fcc = fcc;
470+
471+
rd_kafka_conf_set_dr_msg_cb(intern->u.conf, kafka_conf_dr_msg_cb);
472+
}
473+
/* }}} */
474+
399475
#ifdef HAVE_NEW_KAFKA_CONSUMER
400476
/* {{{ proto void RdKafka\Conf::setRebalanceCb(mixed $callback)
401477
Set rebalance callback for use with coordinated consumer group balancing */
@@ -509,6 +585,7 @@ static const zend_function_entry kafka_conf_fe[] = {
509585
PHP_ME(RdKafka__Conf, setDefaultTopicConf, arginfo_kafka_conf_set_default_topic_conf, ZEND_ACC_PUBLIC)
510586
#endif /* HAVE_NEW_KAFKA_CONSUMER */
511587
PHP_ME(RdKafka__Conf, setErrorCb, arginfo_kafka_conf_set_error_cb, ZEND_ACC_PUBLIC)
588+
PHP_ME(RdKafka__Conf, setDrMsgCb, arginfo_kafka_conf_set_dr_msg_cb, ZEND_ACC_PUBLIC)
512589
#ifdef HAVE_NEW_KAFKA_CONSUMER
513590
PHP_ME(RdKafka__Conf, setRebalanceCb, arginfo_kafka_conf_set_rebalance_cb, ZEND_ACC_PUBLIC)
514591
#endif /* HAVE_NEW_KAFKA_CONSUMER */

conf.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ typedef struct _kafka_conf_callbacks {
4040
zval rk;
4141
kafka_conf_callback error;
4242
kafka_conf_callback rebalance;
43+
kafka_conf_callback dr_msg;
4344
} kafka_conf_callbacks;
4445

4546
typedef struct _kafka_conf_object {

kafka_consumer.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -658,4 +658,5 @@ void kafka_kafka_consumer_minit(TSRMLS_D) /* {{{ */
658658

659659
zend_declare_property_null(ce, ZEND_STRL("error_cb"), ZEND_ACC_PRIVATE TSRMLS_CC);
660660
zend_declare_property_null(ce, ZEND_STRL("rebalance_cb"), ZEND_ACC_PRIVATE TSRMLS_CC);
661+
zend_declare_property_null(ce, ZEND_STRL("dr_msg_cb"), ZEND_ACC_PRIVATE TSRMLS_CC);
661662
} /* }}} */

rdkafka.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -648,6 +648,7 @@ PHP_MINIT_FUNCTION(rdkafka)
648648
ce_kafka->create_object = kafka_new;
649649

650650
zend_declare_property_null(ce_kafka, ZEND_STRL("error_cb"), ZEND_ACC_PRIVATE TSRMLS_CC);
651+
zend_declare_property_null(ce_kafka, ZEND_STRL("dr_cb"), ZEND_ACC_PRIVATE TSRMLS_CC);
651652

652653
INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Consumer", kafka_consumer_fe);
653654
ce_kafka_consumer = zend_register_internal_class_ex(&ce, ce_kafka, NULL TSRMLS_CC);

tests/conf.phpt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ $conf->setErrorCb(function () { });
3333
$dump = $conf->dump();
3434
var_dump(isset($dump["error_cb"]));
3535

36+
echo "Setting dr_msg callback\n";
37+
$conf->setDrMsgCb(function () { });
38+
$dump = $conf->dump();
39+
var_dump(isset($dump["dr_msg_cb"]));
40+
3641
echo "Dumping conf\n";
3742
var_dump(array_intersect_key($conf->dump(), array(
3843
"client.id" => true,
@@ -50,6 +55,8 @@ Setting an invalid property
5055
Caught a RdKafka\Exception: No such configuration property: "invalid"
5156
Setting error callback
5257
bool(true)
58+
Setting dr_msg callback
59+
bool(true)
5360
Dumping conf
5461
array(3) {
5562
["client.id"]=>

0 commit comments

Comments
 (0)