These are chat archives for arnaud-lb/php-rdkafka

8th
Nov 2018
jg111010
@jg101110
Nov 08 2018 08:54
@karavzeka thank you .but the php-fpm process bloking when connect a offline kafka service, I code like below :'''
'''$conf=new \RdKafka\Conf();
$conf->set('socket.timeout.ms',50);
$conf->set('socket.blocking.max.ms',50);
    if (function_exists('pcntl_sigprocmask')) {
        pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
        $conf->set('internal.termination.signal', SIGIO);
    } else {
        $conf->set('queue.buffering.max.ms', 1);
    }
    $conf->setErrorCb(function($kafka,$err,$reason){
            #echo("Kafka error: %s (reason: %s)\n".rd_kafka_err2str($err)."\n".$reason);
            throw new Exception($reason, $err);
    });
    $conf->setDrMsgCb(function($kafka,$message){
        if($message->err){
           # echo($message->err."\n");
           throw new Exception($message->errstr(), $message->err);
        }
    });

    $rk = new \RdKafka\Producer($conf);
    $rk->setLogLevel(LOG_DEBUG);

    $rk->addBrokers(self::$hostList[0]);
    $rkTopic = $rk->newTopic('test');
    $rkTopic->produce(RD_KAFKA_PARTITION_UA, 0, $msg);//json_encode($log)

    while ($rk->getOutQLen() > 0) {
        $rk->poll(50);
    }'''