How to get PHP and Kafka to Play Nicely (and not do it slowly!)

How to get PHP and Kafka to Play Nicely (and not do it slowly!)

So, recently we made the decision to move over to a complete ELK stack set up to handle our Massive Data storage solutions. As part of this change, we introduced a streaming message system called Kafka in to our setup – an easy task, some may think… alas not but I got there!…

Okay, so firstly… if you’re reading this you probably know what Kafka is and are looking for solutions to get it working with PHP, but for those of you who don’t… what is Kafka? Well, in a nutshell…

Kafka is a messaging system. That’s it!

(taken from here) – obviously it’s a little bit more than that, but I suggest you read the Kafka design documentation if you don’t know what it is, as there’s some really good stuff in there about how and why they built their concepts etc.

So what do I need?

I think now is a good time to mention that I’m not going to cover how to install Kafka, or Zookeeper etc as there’s plenty of tutorials out there already. I would suggest if you haven’t done it already, that you come back to this when you’ve got a working set up of Kafka… also, this blog will only cover the “Producing” aspect of using PHP with Kafka, and not consuming (as we do this with Logstash).

You will need however:

librdkafka (v0.9.2+)
php-rdkafka (v1.0.0+) –  (if you click on releases, then click on the … you’ll see all the releases so you can manually install v1.0.0+)
also these stubs for your IDE:  makes life easier (make sure you have version 1.0.0+). If using composer, you can add:

"require-dev": {
  "kwn/php-rdkafka-stubs": "1.0.0"
},

Get those installed, and then we can look at how we use those to produce messages into the Kafka system.

Initialising / Setting up…

So, using php-rdkafka is fairly straight forward, you simply instantiate a Conf() object, set some settings and then get a Producer() up and running and you’re good to go… so let’s have a look at some very basic set up code:

$hosts = ['127.0.0.1']; // you can add your hosts however you want, but rdkafka expects a comma separated string of hosts

// Set the configuration values for Kafka:
$conf = new Conf();
$conf->set('client.id', 'my-application');

// Set up the producer:
$producer = new Producer($conf);
$producer->setLogLevel(LOG_DEBUG);
$producer->addBrokers(implode(',', $hosts)); // These are your kafka hosts

That’s it – done – you now have a working producer which is ready to start spewing stuff into Kafka. At this stage, my main question was “so how do I know if it’s connected or not?” – well, you don’t. You can ask for MetaData from the server, but frankly that’s an expensive operation so we’ll look at error handling etc later when we cover optimising the setup.

So, now let’s start throwing some stuff into Kafka… With rdkafka, you create a topic configuration, create the topic and then send stuff into it. If the topic exists, great – if not it’ll be created so you don’t need to worry about that either:

// Set the topic configuration:
$topicConfig = new TopicConf();
$topicConfig->set('message.timeout.ms', 1000); // this is the first of one of our optimisations, so if it can't write within that time it'll timeout

// Now the Topic:
$kafkaTopic = $producer->newTopic('my_amazing_topic', $topicConfig);

// Set the data:
$message = "I am great!"

// Let's write some data:
try {
  $topic->produce(RD_KAFKA_PARTITION_UA, 0, $message);
  // We have to call the poll function to make sure everything did as it should
  $producer->poll(0);
} catch (\Exception $exception) {
  //something went wrong, so handle your errors as you would...
  return false;
}

…and that’s about it – you can run this in your application and you should now see your data in the Kafka topic of your choice.

So… I did all that, tested, profiled etc then checked New Relic – and blow me if it didn’t increase the response time of the application by over 200% – completely unusable in that state in terms of the delay, so I had to do a lot of digging to find out why – which is why I’m putting those tips here lol… so let’s start to optimise things…

Optimisations

The first point is in the config. There are a gazillion config properties in librdkafka (the library which php-rdkafka uses) which you can change. Here are just a couple I found really help to speed things along (there are also some mentioned in the docs for php-rdkafka, but they don’t work with php-fpm and the other one is deprecated – hence my pain!)

// We can change the amount of time a socket blocking operation lasts for, which will help librdkafka release your application to continue
$conf->set('socket.blocking.max.ms', 1);
// we can also set the buffering time, so we dispatch asap:
$conf->set('queue.buffering.max.ms', 1);
// Finally, we can say to only wait for X messages before sending to Kafka. In a lightweight / custom app, you may only be sending one message, so get it out there straight away
$conf->set('queue.buffering.max.messages', 10);

These few little tweaks will really help and should speed things along for you ensuring your messages are sent as soon as possible and your app can carry on.

How do I know it got there? What if it couldn’t connect?

This was another one that bugged me for a looong time, before realising the php-rdkafka version I was using was < v1.0.0 hence I couldn't use the callback functions. In v1.0.0+ you have the deliveryCallback() function which we use as follows:

// Set the delivery message function in case there is an error:
// This is just a callback so you can easily do something like ($this, 'myFunctionName') as the callback
$conf->setDrMsgCb(function (Producer $kafka, Message $message) {
  if ($message->err) {
    // message permanently failed to be delivered
    // Do something with this knowledge
  } else {
    // message successfully delivered
  }
});

If we then make one more change, and set the poll value to -1 it will wait indefinitely for a response (taking into account other timeouts) and then process the delivery report. In reality, with a working server, it responds almost immediately, and if the server goes down, it will adhere to the timeouts and respond accordingly (around 2 seconds) which is fine for most applications and allows you to start tracking failures so you can fix them:

$producer->poll(-1);

All this should mean you have something like this now…

$hosts = ['127.0.0.1']; // you can add your hosts however you want, but rdkafka expects a comma separated string of hosts

// Set the configuration values for Kafka:
$conf = new Conf();
$conf->set('client.id', 'my-application');

// We can change the amount of time a socket blocking operation lasts for, which will help librdkafka release your application to continue
$conf->set('socket.blocking.max.ms', 1);
// we can also set the buffering time, so we dispatch asap:
$conf->set('queue.buffering.max.ms', 1);
// Finally, we can say to only wait for messages before sending to Kafka. In a lightweight / custom app, you may only be sending one message, so get it out there straight away
$conf->set('queue.buffering.max.messages', 10);

// Set the delivery message function in case there is an error:
// This is just a callback so you can easily do something like ($this, 'myFunctionName') as the callback
$conf->setDrMsgCb(function (Producer $kafka, Message $message) {
  if ($message->err) {
    // message permanently failed to be delivered
    // Do something with this knowledge
  } else {
    // message successfully delivered
  }
});

// Set up the producer:
$producer = new Producer($conf);
$producer->setLogLevel(LOG_DEBUG);
$producer->addBrokers(implode(',', $hosts)); // These are your kafka hosts

// Set the topic configuration:
$topicConfig = new TopicConf();
$topicConfig->set('message.timeout.ms', 1000); // this is the first of one of our optimisations, so if it can't write within that time it'll timeout

// Now the Topic:
$kafkaTopic = $producer->newTopic('my_amazing_topic', $topicConfig);

// Set the data:
$message = "I am great!"

// Let's write some data:
try {
  $topic->produce(RD_KAFKA_PARTITION_UA, 0, $message);
  // We have to call the poll function to make sure everything did as it should
  $producer->poll(-1);
} catch (\Exception $exception) {
  //something went wrong, so handle your errors as you would...
  return false;
}

So that’s about it – with this (now relatively) quick set up, you should have an application which can write data into Kafka and can be consumed by whichever option you decide, be it a PHP lib, logstash, Kafka Connect or any of the many options available.

I’ll be writing a full blog when I get time covering the whole ELK stack, Kafka, Zookeeper and how to basically build a fully redundant system powering immense data collection and analysis, but for now I thought this may help some people who may be struggling with Kafka and PHP as I was.

Let me know your thoughts…

edit: One thing you might want to play with is Kafka Manager – it’s a great GUI for managing your Kafka clusters and you can see throughput rate, consumption etc and really lets you get a handle on any bottlenecks or issues. Check it out!

6 thoughts on “How to get PHP and Kafka to Play Nicely (and not do it slowly!)

  1. Hi, Thanks for article. I did as you described, but producer didn’t send messages if message.timeout.ms = 1000, only when I set it to >= 2500. I tried diffrent config, but all rests against to message.timeout.ms. Do you have any ideas why this is happening and how it can be fixed?

    Like

    • Hmm, not immediately – it sounds like it could be an issue with either Kafka or ZooKeeper depending on your setup, but if it writes if you increase the timeout but not if it doesn’t, I would imagine it could be a network / writing issue. Sorry I can’t be more helpful

      Like

    • Ah this could be an issue with a newer version perhaps (or older) – I know there was some refactoring done for the Conf object, and so it may be a case that this has now changed how it is referenced. As I’ve not kept up to date too much with it I won’t really be able to advise but I would suggest asking on the respective repositories, they may point you to the new code etc, sorry

      Like

  2. Hi, thank you for your tutorial.
    I had no feedback when Kafka was down, and the timeout has helped a lot.

    On the other hand, the setDrMsgCb() never executes. No matter what (Kafka running or down, producing messages and consuming successfully). I have a try / catch and never got an error.
    Versions are the last from about two weeks. Do you have any idea about that?

    Thanks again for your helping tutorial.

    Like

Leave a comment