These are chat archives for arnaud-lb/php-rdkafka

13th
Jun 2018
karavzeka
@karavzeka
Jun 13 2018 12:50

Hello everyone! Is there some guru here who can help me? I'll describe a logic of my app and I'd like get some advice.

I have script which generates certain amount of child processes. Every child use php-rdkafka to consume messages from topic and process them. Child has certain life time, after which it die. Then parent script generates new child and so on.
When child born, it call initChild() method, where php-rdkafka is initialized.

// Child class

protected function initChild(): void
{
    $this->childBornTime = time();

    $brokerList = [];
    foreach ($this->brokerAddresses as $index => $brokerAddress) {
        $brokerList[] = $brokerAddress['host'] . ':' . $brokerAddress['port'];
    }
    if (empty($brokerList)) {
        throw new \ErrorException('No one broker is assigned');
    }

    $topicConf = new TopicConf();
    $topicConf->set('auto.offset.reset', 'smallest');

    $conf = new Conf();
    $conf->set('client.id', __CLASS__);
    $conf->set('group.id', 'group_' . $this->queueName);
    $conf->set('metadata.broker.list', implode(',', $brokerList));
    $conf->setDefaultTopicConf($topicConf);

    $this->kafkaConsumer = new KafkaConsumer($conf);
    $this->kafkaConsumer->subscribe([$this->queueName]);
}

Then in the while cycle I call method processMsg()

protected function processMsg($msg): void
{
    $msg = $this->kafkaConsumer->consume(self::WAIT_MSG_TIMEOUT_MS);

    if ($msg !== null) {
        switch ($msg->err) {
            case RD_KAFKA_RESP_ERR_NO_ERROR:
                sleep(rand(1, 4));
                echo '[' . date('H:i:s') . "][part {$msg->partition}] {$msg->payload} [key: '{$msg->key}' offset: {$msg->offset}]\n";
                sleep(8);
                break;
            case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                $this->consoleLog('Got PARTITION_EOF');
                break;
            case RD_KAFKA_RESP_ERR__TIMED_OUT:
                $this->consoleLog('Timeout');
                break;
            default:
                throw new \ErrorException($msg->errstr(), $msg->err);
                break;
        }
    } else {
        $this->consoleLog('Timeout');
    }
}

My problem. When child process die and new child process borns and connects to the kafka topic, it get timeouts for some period of time (30-50 seconds). I dont understand why. Can I do something to avoid timeauts after new connection?

I created topic with 2 partitions. Then I run script which generates 2 child processes. This is log how they work.

[15:15:25][26378] Process forked, child pid 26379
[15:15:25][26378] Process forked, child pid 26384
[15:15:27][26384] Timeout
[15:15:27][26379] Timeout
[15:15:30][part 0] Hello Kafka 2 [key: '' offset: 0]
[15:15:31][part 1] Hello Kafka 1 [key: '' offset: 0]
[15:15:41][part 0] Hello Kafka 5 [key: '' offset: 1]
[15:15:42][part 1] Hello Kafka 3 [key: '' offset: 1]
...
[15:17:04][part 0] Hello Kafka 19 [key: '' offset: 9]
[15:17:04][part 1] Hello Kafka 22 [key: '' offset: 9]
[15:17:13][part 1] Hello Kafka 23 [key: '' offset: 10]
[15:17:15][part 0] Hello Kafka 20 [key: '' offset: 10]
[15:17:23][part 1] Hello Kafka 24 [key: '' offset: 11]
[15:17:24][part 0] Hello Kafka 21 [key: '' offset: 11]
[15:17:31][26379] Child process die
[15:17:31][26378] Process forked, child pid 26402
[15:17:32][26384] Child process die
[15:17:32][26378] Process forked, child pid 26407
[15:17:33][26402] Timeout
[15:17:34][26407] Timeout
[15:17:35][26402] Timeout
[15:17:36][26407] Timeout
[15:17:37][26402] Timeout
...
[15:17:56][26407] Timeout
[15:17:57][26402] Timeout
[15:17:58][26407] Timeout
[15:17:59][26402] Timeout
[15:18:00][26407] Timeout
[15:18:02][part 0] Hello Kafka 26 [key: '' offset: 12]
[15:18:02][part 1] Hello Kafka 25 [key: '' offset: 12]
[15:18:12][part 0] Hello Kafka 27 [key: '' offset: 13]
[15:18:14][part 1] Hello Kafka 29 [key: '' offset: 13]

As you can see I got timeouts from 15:17:33 to 15:18:00.