Skip to content

Commit e596c88

Browse files
martynaszaliaduonisarnaud-lb
authored andcommitted
Message headers support (arnaud-lb#206)
Message headers support
1 parent 766a40b commit e596c88

File tree

5 files changed

+248
-0
lines changed

5 files changed

+248
-0
lines changed

config.m4

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,12 @@ if test "$PHP_RDKAFKA" != "no"; then
6363
AC_MSG_WARN([no rd_kafka_message_timestamp, timestamp support will not be available])
6464
])
6565

66+
AC_CHECK_LIB($LIBNAME,[rd_kafka_message_headers],[
67+
AC_DEFINE(HAVE_RD_KAFKA_MESSAGE_HEADERS,1,[ ])
68+
],[
69+
AC_MSG_WARN([no rd_kafka_message_headers, headers support will not be available])
70+
])
71+
6672
AC_CHECK_LIB($LIBNAME,[rd_kafka_subscribe],[
6773
AC_DEFINE(HAVE_NEW_KAFKA_CONSUMER,1,[ ])
6874
SOURCES="$SOURCES kafka_consumer.c topic_partition.c"

message.c

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,16 @@ void kafka_message_new(zval *return_value, const rd_kafka_message_t *message TSR
4343
timestamp = rd_kafka_message_timestamp(message, &tstype);
4444
#endif /* HAVE_RD_KAFKA_MESSAGE_TIMESTAMP */
4545

46+
#ifdef HAVE_RD_KAFKA_MESSAGE_HEADERS
47+
rd_kafka_headers_t *message_headers = NULL;
48+
rd_kafka_resp_err_t header_response;
49+
const char *header_name = NULL;
50+
const void *header_value = NULL;
51+
size_t header_size = 0;
52+
zval headers_array;
53+
int i;
54+
#endif /* HAVE_RD_KAFKA_MESSAGE_HEADERS */
55+
4656
zend_update_property_long(NULL, return_value, ZEND_STRL("err"), message->err TSRMLS_CC);
4757

4858
if (message->rkt) {
@@ -60,6 +70,21 @@ void kafka_message_new(zval *return_value, const rd_kafka_message_t *message TSR
6070
zend_update_property_stringl(NULL, return_value, ZEND_STRL("key"), message->key, message->key_len TSRMLS_CC);
6171
}
6272
zend_update_property_long(NULL, return_value, ZEND_STRL("offset"), message->offset TSRMLS_CC);
73+
74+
#ifdef HAVE_RD_KAFKA_MESSAGE_HEADERS
75+
rd_kafka_message_headers(message, &message_headers);
76+
if (message_headers != NULL) {
77+
array_init(&headers_array);
78+
for (i = 0; i < rd_kafka_header_cnt(message_headers); i++) {
79+
header_response = rd_kafka_header_get_all(message_headers, i, &header_name, &header_value, &header_size);
80+
if (header_response != RD_KAFKA_RESP_ERR_NO_ERROR) {
81+
break;
82+
}
83+
rdkafka_add_assoc_string(&headers_array, header_name, (char*)header_value);
84+
}
85+
zend_update_property(NULL, return_value, ZEND_STRL("headers"), &headers_array TSRMLS_CC);
86+
}
87+
#endif
6388
}
6489

6590
/* {{{ proto string RdKafka\Message::errstr()
@@ -120,4 +145,7 @@ void kafka_message_minit(TSRMLS_D) { /* {{{ */
120145
zend_declare_property_null(ce_kafka_message, ZEND_STRL("len"), ZEND_ACC_PUBLIC TSRMLS_CC);
121146
zend_declare_property_null(ce_kafka_message, ZEND_STRL("key"), ZEND_ACC_PUBLIC TSRMLS_CC);
122147
zend_declare_property_null(ce_kafka_message, ZEND_STRL("offset"), ZEND_ACC_PUBLIC TSRMLS_CC);
148+
#ifdef HAVE_RD_KAFKA_MESSAGE_HEADERS
149+
zend_declare_property_null(ce_kafka_message, ZEND_STRL("headers"), ZEND_ACC_PUBLIC TSRMLS_CC);
150+
#endif
123151
} /* }}} */

php_rdkafka_priv.h

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,18 @@ static inline zval *rdkafka_hash_get_current_data_ex(HashTable *ht, HashPosition
9999
return zend_hash_get_current_data_ex(ht, pos);
100100
}
101101

102+
static inline char *rdkafka_hash_get_current_key_ex(HashTable *ht, HashPosition *pos)
103+
{
104+
zend_string* key;
105+
zend_ulong index;
106+
107+
if (zend_hash_get_current_key_ex(ht, &key, &index, pos) == HASH_KEY_IS_STRING) {
108+
return key->val;
109+
}
110+
111+
return NULL;
112+
}
113+
102114
#define rdkafka_add_assoc_string(arg, key, str) add_assoc_string(arg, key, str)
103115

104116
#define RDKAFKA_RETURN_STRING(str) RETURN_STRING(str)
@@ -214,6 +226,19 @@ static inline zval **rdkafka_hash_get_current_data_ex(HashTable *ht, HashPositio
214226
return NULL;
215227
}
216228

229+
static inline char **rdkafka_hash_get_current_key_ex(HashTable *ht, HashPosition *pos)
230+
{
231+
char *key = NULL;
232+
uint klen;
233+
ulong index;
234+
235+
if (zend_hash_get_current_key_ex(ht, &key, &klen, &index, 0, pos) == HASH_KEY_IS_STRING) {
236+
return key;
237+
}
238+
239+
return NULL;
240+
}
241+
217242
#define rdkafka_add_assoc_string(arg, key, str) add_assoc_string(arg, key, str, 1)
218243

219244
#define RDKAFKA_RETURN_STRING(str) RETURN_STRING(str, 1)

tests/message_headers.phpt

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
--TEST--
2+
Message headers
3+
--SKIPIF--
4+
<?php
5+
RD_KAFKA_VERSION >= 0x000b04ff || die("skip");
6+
file_exists(__DIR__."/test_env.php") || die("skip");
7+
--FILE--
8+
<?php
9+
10+
require __DIR__."/test_env.php";
11+
12+
$delivered = 0;
13+
14+
$conf = new RdKafka\Conf();
15+
$conf->setErrorCb(function ($producer, $err, $errstr) {
16+
printf("%s: %s\n", rd_kafka_err2str($err), $errstr);
17+
exit;
18+
});
19+
$conf->setDrMsgCb(function ($producer, $msg) use (&$delivered) {
20+
if ($msg->err) {
21+
throw new Exception("Message delivery failed: " . $msg->errstr());
22+
}
23+
$delivered++;
24+
});
25+
26+
$producer = new RdKafka\Producer($conf);
27+
28+
if ($producer->addBrokers(TEST_KAFKA_BROKERS) < 1) {
29+
echo "Failed adding brokers\n";
30+
exit;
31+
}
32+
33+
$topicName = sprintf("test_rdkafka_%s", uniqid());
34+
35+
$topic = $producer->newTopic($topicName);
36+
37+
if (!$producer->getMetadata(false, $topic, 2*1000)) {
38+
echo "Failed to get metadata, is broker down?\n";
39+
}
40+
41+
$headers = [
42+
['key' => 'value'],
43+
[
44+
'key1' => 'value1',
45+
'key2' => 'value2',
46+
'key3' => 'value3',
47+
],
48+
[],
49+
null,
50+
['key'],
51+
];
52+
53+
foreach ($headers as $index => $header) {
54+
$topic->producev(0, 0, "message $index", null, $header);
55+
$producer->poll(0);
56+
}
57+
58+
while ($producer->getOutQLen()) {
59+
$producer->poll(50);
60+
}
61+
62+
printf("%d messages delivered\n", $delivered);
63+
64+
$consumer = new RdKafka\Consumer($conf);
65+
$consumer->addBrokers(TEST_KAFKA_BROKERS);
66+
67+
$topic = $consumer->newTopic($topicName);
68+
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
69+
70+
$messages = [];
71+
72+
while (true) {
73+
$msg = $topic->consume(0, 60*1000);
74+
if (!$msg) {
75+
continue;
76+
}
77+
switch ($msg->err) {
78+
case RD_KAFKA_RESP_ERR_NO_ERROR:
79+
$headersString = $msg->headers ?? [];
80+
array_walk($headersString, function(&$value, $key) {
81+
$value = "{$key}: {$value}";
82+
});
83+
if (empty($headersString)) {
84+
$headersString = "none";
85+
} else {
86+
$headersString = implode(", ", $headersString);
87+
}
88+
printf("Got message: %s | Headers: %s\n", $msg->payload, $headersString);
89+
break;
90+
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
91+
echo "EOF\n";
92+
break 2;
93+
default:
94+
throw new Exception($message->errstr());
95+
}
96+
}
97+
--EXPECT--
98+
5 messages delivered
99+
Got message: message 0 | Headers: key: value
100+
Got message: message 1 | Headers: key1: value1, key2: value2, key3: value3
101+
Got message: message 2 | Headers: none
102+
Got message: message 3 | Headers: none
103+
Got message: message 4 | Headers: none
104+
EOF

topic.c

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,9 +401,94 @@ PHP_METHOD(RdKafka__ProducerTopic, produce)
401401
}
402402
/* }}} */
403403

404+
#ifdef HAVE_RD_KAFKA_MESSAGE_HEADERS
405+
/* {{{ proto void RdKafka\ProducerTopic::producev(int $partition, int $msgflags[, string $payload, string $key, array $headers])
406+
Produce and send a single message to broker (with headers possibility). */
407+
408+
ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_producev, 0, 0, 2)
409+
ZEND_ARG_INFO(0, partition)
410+
ZEND_ARG_INFO(0, msgflags)
411+
ZEND_ARG_INFO(0, payload)
412+
ZEND_ARG_INFO(0, key)
413+
ZEND_ARG_INFO(0, headers)
414+
ZEND_END_ARG_INFO()
415+
416+
PHP_METHOD(RdKafka__ProducerTopic, producev)
417+
{
418+
long partition;
419+
long msgflags;
420+
char *payload = NULL;
421+
arglen_t payload_len = 0;
422+
char *key = NULL;
423+
arglen_t key_len = 0;
424+
rd_kafka_resp_err_t err;
425+
kafka_topic_object *intern;
426+
HashTable *headersParam = NULL;
427+
HashPosition headersParamPos;
428+
char *header_key;
429+
zeval *header_value;
430+
rd_kafka_headers_t *headers;
431+
432+
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ll|s!s!h!", &partition, &msgflags, &payload, &payload_len, &key, &key_len, &headersParam) == FAILURE) {
433+
return;
434+
}
435+
436+
if (partition != RD_KAFKA_PARTITION_UA && (partition < 0 || partition > 0x7FFFFFFF)) {
437+
zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0 TSRMLS_CC, "Out of range value '%ld' for $partition", partition TSRMLS_CC);
438+
return;
439+
}
440+
441+
if (msgflags != 0) {
442+
zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0 TSRMLS_CC, "Invalid value '%ld' for $msgflags", msgflags TSRMLS_CC);
443+
return;
444+
}
445+
446+
intern = get_kafka_topic_object(getThis() TSRMLS_CC);
447+
448+
if (headersParam != NULL && zend_hash_num_elements(headersParam) > 0) {
449+
headers = rd_kafka_headers_new(zend_hash_num_elements(headersParam));
450+
for (zend_hash_internal_pointer_reset_ex(headersParam, &headersParamPos);
451+
(header_value = rdkafka_hash_get_current_data_ex(headersParam, &headersParamPos)) != NULL &&
452+
(header_key = rdkafka_hash_get_current_key_ex(headersParam, &headersParamPos)) != NULL;
453+
zend_hash_move_forward_ex(headersParam, &headersParamPos)) {
454+
convert_to_string_ex(header_value);
455+
rd_kafka_header_add(
456+
headers,
457+
header_key,
458+
-1, // Auto detect header title length
459+
Z_STRVAL_P(ZEVAL(header_value)),
460+
-1 // Auto detect header value length
461+
);
462+
}
463+
} else {
464+
headers = rd_kafka_headers_new(0);
465+
}
466+
467+
err = rd_kafka_producev(
468+
NULL,
469+
RD_KAFKA_V_RKT(intern->rkt),
470+
RD_KAFKA_V_PARTITION(partition),
471+
RD_KAFKA_V_MSGFLAGS(msgflags | RD_KAFKA_MSG_F_COPY),
472+
RD_KAFKA_V_VALUE(payload, payload_len),
473+
RD_KAFKA_V_KEY(key, key_len),
474+
RD_KAFKA_V_HEADERS(headers),
475+
RD_KAFKA_V_END
476+
);
477+
478+
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
479+
zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err TSRMLS_CC);
480+
return;
481+
}
482+
}
483+
/* }}} */
484+
#endif
485+
404486
static const zend_function_entry kafka_producer_topic_fe[] = {
405487
PHP_ME(RdKafka, __construct, arginfo_kafka___private_construct, ZEND_ACC_PRIVATE)
406488
PHP_ME(RdKafka__ProducerTopic, produce, arginfo_kafka_produce, ZEND_ACC_PUBLIC)
489+
#ifdef HAVE_RD_KAFKA_MESSAGE_HEADERS
490+
PHP_ME(RdKafka__ProducerTopic, producev, arginfo_kafka_producev, ZEND_ACC_PUBLIC)
491+
#endif
407492
PHP_FE_END
408493
};
409494

0 commit comments

Comments
 (0)