PHP-rdkafka is a thin librdkafka binding providing a working PHP 5 / PHP 7 Kafka 0.8 / 0.9 client.
It supports the consumer, producer, and metadata APIs.
The API ressembles as much as possible to librdkafka's.
- Installation
- Examples
- Usage
- API
- RdKafka\Consumer
- RdKafka\Producer
- RdKafka
- RdKafka\Conf
- RdKafka\TopicConf
- RdKafka\Topic
- RdKafka\ConsumerTopic
- RdKafka\ProducerTopic
- RdKafka\Message
- RdKafka\Queue
- RdKafka\Exception
- RdKafka\Metadata
- RdKafka\Metadata\Topic
- RdKafka\Metadata\Broker
- RdKafka\Metadata\Partition
- RdKafka\Metadata\Collection
- Functions
- Constants
- Credits
- License
php-rdkafka depends on the stable version of librdkafka
php-rdkafka is compatible with PHP 5 (master branch, PECL release); and has an experimental PHP 7 branch
For PHP version 7, installation from source should be preferred.
sudo pecl install channel://pecl.php.net/rdkafka-alpha brew tap homebrew/dupes brew tap homebrew/versions brew tap homebrew/homebrew-php brew install homebrew/php/php70-rdkafka php70-rdkafka is the rdkafka package for PHP 7.0. Replace 70 by 53, 54, 55, or 56 for PHP version 5.3, 5.4, 5.5, or 5.6, respectively.
Source can be downloaded from a stable release (recommended). Alternatively, clone the master or php7 branches for php5 or php7, respectively.
The extension can be compiled and installed like this:
phpize ./configure make sudo make install # Add extension=rdkafka.so to your php.ini: echo extension=rdkafka.so|sudo tee -a /path/to/php.ini See examples
For producing, we first need to create a producer, and to add brokers (Kafka servers) to it:
<?php $rk = new RdKafka\Producer(); $rk->setLogLevel(LOG_DEBUG); $rk->addBrokers("10.0.0.1,10.0.0.2");Next, we create a topic instance from the producer:
<?php $topic = $rk->newTopic("test"); From there, we can produce as much messages as we want, using the produce method:
<?php $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload"); The first argument is the partition. RD_KAFKA_PARTITION_UA stands for unassigned, and lets librdkafka choose the partition.
The second argument are message flags and should always be 0, currently.
The message payload can be anything.
For consuming, we first need to create a consumer, and to add brokers (Kafka servers) to it:
<?php $rk = new RdKafka\Consumer(); $rk->setLogLevel(LOG_DEBUG); $rk->addBrokers("10.0.0.1,10.0.0.2");Next, create a topic instance by calling the newTopic() method, and start consuming on partition 0:
<?php $topic = $rk->newTopic("test"); // The first argument is the partition to consume from. // The second argument is the offset at which to start consumption. Valid values // are: RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED. $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);Next, retrieve the consumed messages:
<?php while (true) { // The first argument is the partition (again). // The second argument is the timeout. $msg = $topic->consume(0, 1000); if ($msg->err) { echo $msg->errstr(), "\n"; break; } else { echo $msg->payload, "\n"; } }Consuming from multiple topics and/or partitions can be done by telling librdkafka to forward all messages from these topics/partitions to an internal queue, and then consuming from this queue:
Creating the queue:
<?php $queue = $rk->newQueue();Adding topars to the queue:
<?php $topic1 = $rk->newTopic("topic1"); $topic1->consumeQueueStart(0, RD_KAFKA_OFFSET_BEGINNING, $queue); $topic1->consumeQueueStart(1, RD_KAFKA_OFFSET_BEGINNING, $queue); $topic2 = $rk->newTopic("topic2"); $topic2->consumeQueueStart(0, RD_KAFKA_OFFSET_BEGINNING, $queue);Next, retrieve the consumed messages from the queue:
<?php while (true) { // The only argument is the timeout. $msg = $queue->consume(1000); if ($msg->err) { echo $msg->errstr(), "\n"; break; } else { echo $msg->payload, "\n"; } }librdkafka can store offsets in a local file, or on the broker. The default is local file, and as soon as you start using RD_KAFKA_OFFSET_STORED as consuming offset, rdkafka starts to store the offset.
By default, the file is created in the current directory, with a named based on the topic and the partition. The directory can be changed by setting the offset.store.path configuration property.
Other interesting properties are: offset.store.sync.interval.ms, offset.store.method, auto.commit.interval.ms, auto.commit.enable, offset.store.method, group.id.
<?php $topicConf = new RdKafka\TopicConf(); $topicConf->set("auto.commit.interval.ms", 1e3); $topicConf->set("offset.store.sync.interval.ms", 60e3); $topic = $rk->newTopic("test", $topicConf); $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);librdkafka will buffer up to 1GB of messages for each consumed partition by default. You can lower memory usage by reducing the value of the queued.max.messages.kbytes parameter on your consumers.
Each consumer and procuder instance will fetch topics metadata at an interval defined by the topic.metadata.refresh.interval.ms parameter. Depending on your librdkafka version, the parameter defaults to 10 seconds, or 600 seconds.
librdkafka fetches the metadata for all topics of the cluster by default. Setting topic.metadata.refresh.sparse to the string "true" makes sure that librdkafka fetches only the topics he uses.
Setting topic.metadata.refresh.sparse to "true", and topic.metadata.refresh.interval.ms to 600 seconds (plus some jitter) can reduce the bandwidth a lot, depending on the number of consumers and topics.
This setting allows librdkafka threads to terminate as soon as librdkafka is done with them. This effectively allows your PHP processes / requests to terminate quickly.
When enabling this, you have to mask the signal like this:
<?php // once pcntl_sigprocmask(SIG_BLOCK, array(SIGIO)); // any time $conf->set('internal.termination.signal', SIGIO);$producer = new RdKafka\Producer(RdKafka\Conf $conf = null);Creates a new Kafka producer and starts its operation.
$conf is an optional RdKafka\Conf instance that will be used instead of the default configuration. The $conf object is copied, and changing $conf after that as no effect on the producer. See RdKafka\Conf for more information.
$topic = $producer->newTopic(string $topic, RdKafka\TopicConf $conf = null);Creates a new RdKafka\ProducerTopic instance for topic named $topic.
$conf is an optional configuration for the topic that will be used instead of the default topic configuration. The $conf object is copied by this function, and changing $conf after that has no effect on the topic. See RdKafka\TopicConf for more information.
$qlen = $producer->outqLen();Returns the current out queue length: messages waiting to be sent to, or acknowledged by, the broker.
$producer->poll(int $timeout_ms);Polls the Producer handle for events.
$consumer = new RdKafka\Consumer(RdKafka\Conf $conf = null);Creates a new Kafka consumer and starts its operation.
$conf is an optional RdKafka\Conf instance that will be used instead of the default configuration. The $conf object is copied, and changing $conf after that as no effect on the producer. See RdKafka\Conf for more information.
$queue = $consumer->newQueue();Returns a RdKafka\Queue instance.
$topic = $consumer->newTopic(string $topic, RdKafka\TopicConf $conf = null);Creates a new RdKafka\ConsumerTopic for topic named $topic.
$conf is an optional configuration for the topic that will be used instead of the default topic configuration. The $conf object is copied by this function, and changing $conf after that has no effect on the topic. See RdKafka\TopicConf for more information.
RdKafka is the base class for RdKafka\Producer and RdKafka\Consumer.
$rk->addBrokers(string $brokerList);Adds a one or more brokers to the instance's list of initial brokers. Additional brokers will be discovered automatically as soon as rdkafka connects to a broker by querying the broker metadata.
If a broker name resolves to multiple addresses (and possibly address families) all will be used for connection attempts in round-robin fashion.
$brokerList is a ,-separated list of brokers in the format: <host1>[:<port1>],<host2>[:<port2>]...
Example:
$rk->addBrokers("10.0.0.1:9092,10.0.0.2");Returns the number of brokers successfully added.
NOTE: Brokers may also be defined with the metadata.broker.list configuration property.
$rk->setLogLevel(int $level);Specifies the maximum logging level produced by internal kafka logging and debugging. If the debug configuration property is set the level is automatically adjusted to LOG_DEBUG.
Valid values for $level are any of the syslog LOG_* priorities: https://php.net/manual/en/function.syslog.php
$metadata = $rk->metadata(bool $all_topics, RdKafka\Topic $only_topic = null, int $timeout_ms); Request Metadata from broker.
- all_topics - if true: request info about all topics in cluster, if false: only request info about locally known topics.
- only_rkt - only request info about this topic
- timeout_ms - maximum response time before failing.
Returns a RdKafka\Metadata
$conf = new RdKafka\Conf(); Creates a new configuration. The list of available configuration properties is documented at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
$dump = $conf->dump();Dump the configuration properties and values to an array.
$conf->set(string $name, string $value); Sets a configuration property.
Throws a RdKafka\Exception on error.
$conf = new RdKafka\TopicConf();Creates a new topic configuration. See RdKafka\Conf.
Set partitioner callback.
Allowed values are RD_KAFKA_MSG_PARTITIONER_RANDOM, RD_KAFKA_MSG_PARTITIONER_CONSISTENT.
RdKafka\Topic is the base class for RdKafka\ConsumerTopic and RdKafka\ProducerTopic.
$name = $topic->getName();Returns the topic name.
New ConsumerTopic instances can be created by calling RdKafka\Consumer::newTopic().
$topic->consumeStart(int $partition, int $offset); Start consuming messages for $partition at offset $offset which may either be a proper offset (0..N) or one of the the special offsets:
RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED, rd_kafka_offset_tail(..).
rdkafka will attempt to keep queued.min.messages (config property) messages in the local queue by repeatedly fetching batches of messages from the broker until the threshold is reached.
The application shall use the consume() method to consume messages from the local queue, each kafka message being represented as a RdKafka\Message object.
consumeStart()must not be called multiple times for the same topic and partition without stopping consumption first with consumeStop().
Throws a RdKafka\Exception on error.
$topic->consumeStop(int $partition);Stop consuming messages for $partition, purging all messages currently in the local queue.
Throws a RdKafka\Exception on error.
$topic->consumeQueueStart(int $partition, int $offset, RdKafka\Queue $queue);Same as consumeStart() but re-routes incoming messages to the provided queue $queue. The application must use one of the RdKafka\Queue::consume*() functions to receive fetched messages.
consumeQueueStart() must not be called multiple times for the same topic and partition without stopping consumption first with consumeStop().
consumeStart() and consumeQueueStart() must not be combined for the same topic and partition.
Throws a RdKafka\Exception on error.
$message = $topic->consume(int $partition, int $timeout_ms);Consume a single message from $partition.
$timeout_ms is maximum amount of time to wait for a message to be received. Consumer must have been previously started with consumeStart().
Returns NULL on timeout.
Throws a RdKafka\Exception on error.
NOTE: The returned message's ..->err must be checked for errors.
NOTE: ..->err == RD_KAFKA_RESP_ERR__PARTITION_EOF signals that the end of the partition has been reached, which should typically not be considered an error. The application should handle this case (e.g., ignore).
See Topic::getName()
$topic->offsetStore($message->partition, $message->offset+1);Store offset offset for topic rkt partition partition. The offset will be commited (written) to the offset store according to auto.commit.interval.ms.
NOTE: auto.commit.enable must be set to "false" when using this API.
Throws a RdKafka\Exception on error.
New ProducerTopic instances can be created by calling RdKafka\Producer::newTopic().
See Topic::getName()
$topic->produce(int $partition, int $msgflags, string $payload, string $key = null)Produce and send a single message to broker.
produce() is an asynch non-blocking API.
$partition is the target partition, either:
RD_KAFKA_PARTITION_UA(unassigned) for automatic partitioning using the topic's partitioner function, or- a fixed partition (0..N)
$msgflags must be 0.
$payload is the message payload.
$key is an optional message key, if non-NULL it will be passed to the topic partitioner as well as be sent with the message to the broker and passed on to the consumer.
Throws a RdKafka\Exception on error.
A Kafka message as returned by the consuming methods.
This object has two purposes:
- provide the application with a consumed message. (
->err== 0) - report per-topic+partition consumer errors (
->err!= 0)
The application must check err to decide what action to take.
Non-zero for error signaling. Use errstr() for a string representation.
Topic name
Partition
When err == 0: the message payload
When err == 0: Optional message key
When err == 0: Message offset
$errstr = $message->errstr();When err != 0, returns the string representation of the error.
New Queue instances can be created by calling RdKafka\Consumer::newQueue().
Message queues allows the application to re-route consumed messages from multiple topic+partitions into one single queue point. This queue point, containing messages from a number of topic+partitions, may then be served by a single consume() call, rather than one per topic+partition combination.
See RdKafka\ConsumerTopic::consumeQueueStart(), RdKafka\Queue::consume().
$message = $queue->consume(int $timeout_ms);See RdKafka\ConsumerTopic::consume()
Exceptions thrown by php-rdkafka are of this type.
Metadata container.
See RdKafka::metadata().
$id = $metadata->getOrigBrokerId();Returns the broker originating this metadata.
$name = $metadata->getOrigBrokerName();Returns the name of originating broker.
$brokers = metadata->getBrokers(); printf("There are %d brokers", count($brokers)); foreach ($brokers as $broker) { ... }Returns a RdKafka\Metadata\Collection of RdKafka\Metadata\Broker.
$topics = $metadata->getTopics(); printf("There are %d topics", count($topics)); foreach ($topics as $topic) { ... }Returns a RdKafka\Metadata\Collection of RdKafka\Metadata\Topic.
Metadata: Broker information.
$id = $broker->getId();Returns the broker id.
$host = $broker->getHost();Returns the broker hostname.
$port = $broker->getPort();Returns the broker port.
Metadata: Topic information.
$name = $topic->getTopic();Returns the topic name.
$name = $topic->getErr();Returns the topic error reported by broker.
$topics = $topic->getPartitions(); printf("There are %d partitions", count($partitions)); foreach ($partitions as $partition) { ... }Returns a RdKafka\Metadata\Collection of RdKafka\Metadata\Partition.
Metadata: Partition information.
$id = $partition->getId();Returns the partition id.
$err = $partition->getErr();Returns the partition error reported by broker.
$leader = $partition->getLeader();Returns the leader broker id.
$replicas = $partitions->getReplicas(); printf("There are %d replicas", count($replicas)); foreach ($replicas as $replica) { ... }Returns a RdKafka\Metadata\Collection of replica broker ids for this partition.
$replicas = $partitions->getIsrs(); printf("There are %d In-Sync-Replicas", count($replicas)); foreach ($replicas as $replica) { ... }Returns a RdKafka\Metadata\Collection of In-Sync-Replica broker ids for this partition.
RdKafka\Metadata\Collection implements Iterator (can be used in foreach), and Countable (can be used in count()).
Returns a human readable representation of a kafka error
Converts errno to a rd_kafka_resp_err_t error code
Returns errno
$offset = rd_kafka_offset_tail($cnt);Returns a special offset to start consuming $cnt messages from topic's current .._END offset. That is, if current end offset is 12345 and $cnt is 200, it will start consuming from offset 12345-200 = 12145.
Start consuming from beginning of kafka partition queue: oldest msg
Start consuming from end of kafka partition queue: next msg
Start consuming from offset retrieved from offset store
Unassigned partition.
The unassigned partition is used by the producer API for messages that should be partitioned using the configured or default partitioner.
librdkafka version
Interpreted as hex MM.mm.rr.xx:
- MM = Major
- mm = minor
- rr = revision
- xx = currently unused
I.e.: 0x00080100 = 0.8.1
begin internal error codes
Received message is incorrect
Bad/unknown compression
Broker is going away
Generic failure
Broker transport error
Critical system resource failure
Failed to resolve broker
Produced message timed out
Reached the end of the topic+partition queue on the broker. Not really an error.
Permanent: Partition does not exist in cluster.
File or filesystem error
Permanent: Topic does not exist in cluster.
All broker connections are down.
Invalid argument, or invalid configuration
Operation timed out
Queue is full
ISR count < required.acks
end internal error codes
Random partitioner.
This is the default partitioner.
Returns a random partition between 0 and the number of partitions minus 1.
Consistent partitioner.
Uses consistent hashing to map identical keys onto identical partitions.
Returns a partition between 0 and number of partitions minus 1 based on the crc value of the key.
Documentation copied from librdkafka.h.
Authors: see contributors.
php-rdkafka is released under the MIT license.