Updated Kafka PHP client library

LinkedIN KafkaIn the past couple of months, I've put quite some work into the PHP client library for Apache Kafka, a persistent, distributed, high-throughput publish-subscribe messaging system. Over a year ago I sort of reverse-engineered the protocol, and published a first rudimental library to produce and consume messages with version 0.05. Since then, the Kafka project has evolved a lot, moving from the LinkedIN repositories to the Apache Incubator, gaining traction, committers, features. The protocol has changed slightly too, so the old library doesn't work anymore with the new version (0.6+).

New PHP client library

The new library fixes many of the shortcomings of the previous version, and adds tons of functionality:

  • added support for GZIP compression (both in the producer and the consumer);
  • implemented stream-based iteration of messages (rather than loading the entire response in memory);
  • added base Kafka_Exception class (extending RuntimeException) and some child classes to catch specific errors;
  • added validation and error checking in several parts of the code (like CRC validation of messages, stream connection/EOF checks, response code verification);
  • added example producer for compressed messages;
  • completely refactored socket handling, to be more robust, with better error checking and handling of edge-cases;
  • added support for 64bit offsets;
  • better checks for responses from Kafka (fixed connection hanging);
  • added Zookeeper-based consumer, with support for multiple consumer groups, and for manual offset commit action (so it's possible to wait for an ACK from the message processor before advancing the offsets), and example code;
  • added support for OffsetRequest and getOffsetsBefore() in the SimpleConsumer class to query the state of the queue and recover consumption after old offsets become invalid because of expired data;
  • support for connection timeouts in microseconds;
  • vastly improved test suite.


Overall, this is a pretty solid release, we've been using it in production for over a month without any problem.

Get it while it's hot

To use the library, you can either apply this patch to the official repository, or check out the code from this github repo.
Update: check out the code from the official Kafka-PHP git repository.

Example code for Producer

<?php
$host = 'localhost';
$port = 9092;
$topic = 'test';

$producer = new Kafka_Producer($host, $port, Kafka_Encoder::COMPRESSION_NONE);
$in = fopen('php://stdin', 'r');
while (true) {
	echo "\nEnter comma separated messages:\n";
	$messages = explode(',', fgets($in));
	$bytes = $producer->send($messages, $topic);
	printf("\nSuccessfully sent %d messages (%d bytes)\n\n", count($messages), $bytes);
}

Example code for Simple Consumer

<?php
$host = 'localhost';
$port   = 9092; //kafka server
$topic   = 'test';
$maxSize = 1000000;
$socketTimeout = 5;

$offset    = 0;
$partition = 0;

$consumer = new Kafka_SimpleConsumer($host, $port, $socketTimeout, $maxSize);
while (true) {
	//create a fetch request for topic "test", partition 0, current offset and fetch size of 1MB
	$fetchRequest = new Kafka_FetchRequest($topic, $partition, $offset, $maxSize);
	//get the message set from the consumer and print them out
	$partialOffset = 0;
	$messages = $consumer->fetch($fetchRequest);
	foreach ($messages as $msg) {
		echo "\nconsumed[$offset][$partialOffset]: " . $msg->payload();
		$partialOffset = $messages->validBytes();
	}
	//advance the offset after consuming each message
	$offset += $messages->validBytes();
	unset($fetchRequest);
}

Example code for Zookeeper-based Consumer

<?php
// zookeeper address (one or more, separated by commas)
$zkaddress = 'localhost:8121';

// kafka topic to consume from
$topic = 'testtopic';

// kafka consumer group
$group = 'testgroup';

// socket buffer size: must be greater than the largest message in the queue
$socketBufferSize = 10485760; //10 MB

// approximate max number of bytes to get in a batch
$maxBatchSize = 20971520; //20 MB

$zookeeper = new Zookeeper($zkaddress);

$zkconsumer = new Kafka_ZookeeperConsumer(
	new Kafka_Registry_Topic($zookeeper),
	new Kafka_Registry_Broker($zookeeper),
	new Kafka_Registry_Offset($zookeeper, $group),
	$topic,
	$socketBufferSize
);

$messages = array();
try {
	foreach ($zkconsumer as $message) {
		// either process each message one by one, or collect them and process them in batches
		$messages[] = $message;
		if ($zkconsumer->getReadBytes() >= $maxBatchSize) {
			break;
		}
	}
} catch (Kafka_Exception_OffsetOutOfRange $exception) {
	// if we haven't received any messages, resync the offsets for the next time, then bomb out
	if ($zkconsumer->getReadBytes() == 0) {
		$zkconsumer->resyncOffsets();
		die($exception->getMessage());
	}
	// if we did receive some messages before the exception, carry on.
} catch (Kafka_Exception_Socket_Connection $exception) {
	// deal with it below
} catch (Kafka_Exception $exception) {
	// deal with it below
}

if (null !== $exception) {
	// if we haven't received any messages, bomb out
	if ($zkconsumer->getReadBytes() == 0) {
		die($exception->getMessage());
	}
	// otherwise log the error, commit the offsets for the messages read so far and return the data
}

// process the data in batches, wait for ACK

$success = doSomethingWithTheMessages($messages);

// Once the data is processed successfully, commit the byte offsets.
if ($success) {
	$zkconsumer->commitOffsets();
}

// get an approximate figure on the size of the queue
try {
	echo "\nRemaining bytes in queue: " . $consumer->getRemainingSize();
} catch (Kafka_Exception_Socket_Connection $exception) {
	die($exception->getMessage());
} catch (Kafka_Exception $exception) {
	die($exception->getMessage());
}

Please note that in the Zookeeper-based consumer, the offsets are committed manually. This is a difference from the off-the-shelf Scala ZookeeperConsumer, as we needed to control when the offsets were committed, i.e. we wanted to commit the offset only after receiving an ACK that the messages were correctly processed.

Have fun

If you play with this code, please let me know about your setup and use-case, I'd love to get feedback and suggestions.




1 response to "Updated Kafka PHP client library"

Why dont you uae namespaces?

Lorenzo Alberton

Lorenzo Alberton Lorenzo PHP5 ZCE - Zend Certified Engineer has been working with large enterprise UK companies for the past years and is now Chief Tech Architect at DataSift. He's an international conference speaker and a long-time contributor to many open source projects. Lorenzo Alberton's profile on LinkedIN View Lorenzo Alberton's Twitter stream

Lorenzo Alberton - Sun Certified MySQL 5 Developer

Tags

AJAX, Apache, Book Review, Charset, Cheat Sheet, Data structures, Database, Firebird SQL, Hadoop, Imagick, INFORMATION_SCHEMA, JavaScript, Kafka, Linux, Message Queues, mod_rewrite, Monitoring, MySQL, NoSQL, Oracle, PDO, PEAR, Performance, PHP, PostgreSQL, Profiling, Scalability, Security, SPL, SQL Server, SQLite, Testing, Tutorial, TYPO3, Windows, Zend Framework

Buy me a book - Introduction To Information Retrieval