New PHP client library for the Apache Kafka project. Full support for Kafka 0.7+, with robust socket handling, complete test suite, Zookeeper-based consumer and many other improvements.
In the past couple of months, I've put quite some work into the PHP client library for Apache Kafka, a persistent, distributed, high-throughput publish-subscribe messaging system. Over a year ago I sort of reverse-engineered the protocol, and published a first rudimental library to produce and consume messages with version 0.05. Since then, the Kafka project has evolved a lot, moving from the LinkedIN repositories to the Apache Incubator, gaining traction, committers, features. The protocol has changed slightly too, so the old library doesn't work anymore with the new version (0.6+).
The new library fixes many of the shortcomings of the previous version, and adds tons of functionality:
Overall, this is a pretty solid release, we've been using it in production for over a month without any problem.
To use the library, you can either apply this patch to the official repository, or check out the code from this github repo.
Update: check out the code from the official Kafka-PHP git repository.
<?php
$host = 'localhost';
$port = 9092;
$topic = 'test';
$producer = new Kafka_Producer($host, $port, Kafka_Encoder::COMPRESSION_NONE);
$in = fopen('php://stdin', 'r');
while (true) {
echo "\nEnter comma separated messages:\n";
$messages = explode(',', fgets($in));
$bytes = $producer->send($messages, $topic);
printf("\nSuccessfully sent %d messages (%d bytes)\n\n", count($messages), $bytes);
}<?php
$host = 'localhost';
$port = 9092; //kafka server
$topic = 'test';
$maxSize = 1000000;
$socketTimeout = 5;
$offset = 0;
$partition = 0;
$consumer = new Kafka_SimpleConsumer($host, $port, $socketTimeout, $maxSize);
while (true) {
//create a fetch request for topic "test", partition 0, current offset and fetch size of 1MB
$fetchRequest = new Kafka_FetchRequest($topic, $partition, $offset, $maxSize);
//get the message set from the consumer and print them out
$partialOffset = 0;
$messages = $consumer->fetch($fetchRequest);
foreach ($messages as $msg) {
echo "\nconsumed[$offset][$partialOffset]: " . $msg->payload();
$partialOffset = $messages->validBytes();
}
//advance the offset after consuming each message
$offset += $messages->validBytes();
unset($fetchRequest);
}<?php
// zookeeper address (one or more, separated by commas)
$zkaddress = 'localhost:8121';
// kafka topic to consume from
$topic = 'testtopic';
// kafka consumer group
$group = 'testgroup';
// socket buffer size: must be greater than the largest message in the queue
$socketBufferSize = 10485760; //10 MB
// approximate max number of bytes to get in a batch
$maxBatchSize = 20971520; //20 MB
$zookeeper = new Zookeeper($zkaddress);
$zkconsumer = new Kafka_ZookeeperConsumer(
new Kafka_Registry_Topic($zookeeper),
new Kafka_Registry_Broker($zookeeper),
new Kafka_Registry_Offset($zookeeper, $group),
$topic,
$socketBufferSize
);
$messages = array();
try {
foreach ($zkconsumer as $message) {
// either process each message one by one, or collect them and process them in batches
$messages[] = $message;
if ($zkconsumer->getReadBytes() >= $maxBatchSize) {
break;
}
}
} catch (Kafka_Exception_OffsetOutOfRange $exception) {
// if we haven't received any messages, resync the offsets for the next time, then bomb out
if ($zkconsumer->getReadBytes() == 0) {
$zkconsumer->resyncOffsets();
die($exception->getMessage());
}
// if we did receive some messages before the exception, carry on.
} catch (Kafka_Exception_Socket_Connection $exception) {
// deal with it below
} catch (Kafka_Exception $exception) {
// deal with it below
}
if (null !== $exception) {
// if we haven't received any messages, bomb out
if ($zkconsumer->getReadBytes() == 0) {
die($exception->getMessage());
}
// otherwise log the error, commit the offsets for the messages read so far and return the data
}
// process the data in batches, wait for ACK
$success = doSomethingWithTheMessages($messages);
// Once the data is processed successfully, commit the byte offsets.
if ($success) {
$zkconsumer->commitOffsets();
}
// get an approximate figure on the size of the queue
try {
echo "\nRemaining bytes in queue: " . $consumer->getRemainingSize();
} catch (Kafka_Exception_Socket_Connection $exception) {
die($exception->getMessage());
} catch (Kafka_Exception $exception) {
die($exception->getMessage());
}Please note that in the Zookeeper-based consumer, the offsets are committed manually. This is a difference from the off-the-shelf Scala ZookeeperConsumer, as we needed to control when the offsets were committed, i.e. we wanted to commit the offset only after receiving an ACK that the messages were correctly processed.
If you play with this code, please let me know about your setup and use-case, I'd love to get feedback and suggestions.
Lorenzo
1 response to "Updated Kafka PHP client library"
Why dont you uae namespaces?