These are chat archives for arnaud-lb/php-rdkafka

8th
Mar 2016
axiao
@joostshao
Mar 08 2016 07:19
i
hi, any one ?
Arnaud Le Blanc
@arnaud-lb
Mar 08 2016 09:09
Hi @joostshao
rd_kafka_errno2err() converts the system’s errno variable (obtained by calling rd_kafka_errno()) to a librdkafka err code
axiao
@joostshao
Mar 08 2016 09:46
hi, but i always get an error code: 15
25 => Local: Communication failure with broker
the message is : Local: Communication failure with broker
Arnaud Le Blanc
@arnaud-lb
Mar 08 2016 09:47
how to you get the $errno ?
axiao
@joostshao
Mar 08 2016 09:47
the code is 25 (sorry )
$message = rd_kafka_err2str(rd_kafka_errno2err($errno));
$errno =  rd_kafka_errno();
$message = rd_kafka_err2str(rd_kafka_errno2err($errno));
but the kafka cluster have write the payloads
i am sure, kafkacat can read the payloads
Arnaud Le Blanc
@arnaud-lb
Mar 08 2016 09:48
What are you calling just before rd_kafka_errno() ?
axiao
@joostshao
Mar 08 2016 09:49
$this->topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode(['device'=>$device]));
just produce the payload
<?php

class ReceiverModel extends Model
{

    private $topicConf = null;
    private $producer = null;
    private $topic = null;

    public function __construct()
    {
        $this->kafkaConfig = Yaf_Application::app()->getConfig()->kafka->toArray();
        $this->topicConf = new RdKafka\TopicConf();
        $this->topicConf->set("auto.commit.interval.ms", 1e3);
        $this->topicConf->set("offset.store.sync.interval.ms", 60e3);
        $this->producer = new RdKafka\Producer();
        $this->producer->setLogLevel(LOG_DEBUG);
        $this->producer->addBrokers($this->kafkaConfig['brokers']);
        $this->topic = $this->producer->newTopic($this->kafkaConfig['topic'], $this->topicConf);
        // dump($this->producer->getMetadata(true, NULL, 60e3)->getTopics()->count()); exit();
    }

    public function syncProducer(array $device,  array $session,  array $event){
        // dump($device, $session, $event); exit();
        if(!empty($device)){
            $result = $this->topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode(['device'=>$device]));
        }
        if(!empty($session)){
            $this->topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode(['session'=>$session]));
        }    
        if(!empty($event)){
            $this->topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode(['event'=>$event]));
        }
        return ['status'=>'success', 'message'=>'发送完成!'];
        // $errno =  rd_kafka_errno();
        // rd_kafka_resp_err_t
        // 25 => Local: Communication failure with broker
        // if(empty($errno)){
        //     return ['status'=>'success', 'message'=>'发送完成!'];
        // } else {
        //     $message = rd_kafka_err2str(rd_kafka_errno2err($errno));
        //     dump($errno, $message);
        // }
    }

}
can you help me ?
Arnaud Le Blanc
@arnaud-lb
Mar 08 2016 09:51
thanks
you don’t have to check errno() after calling ->produce(). This method throws an exception when an error occurs
axiao
@joostshao
Mar 08 2016 09:57
but , there alway have errors
hei, Man !
if i don't check error number , i can not get the status about kafka cluster
Arnaud Le Blanc
@arnaud-lb
Mar 08 2016 10:01
The value returned by rd_kafka_errno() is irrelevant after a call to produce(), you can safely ignore it. ProducerTopic::produce() already checks the return value of rd_kafka_produce(), and throws an exception if it’s -1 (which indicates an error).

```Return Values

Returns no value.

Return Values

Returns no value.
i get your opinion.
i am a little dejected.
Arnaud Le Blanc
@arnaud-lb
Mar 08 2016 10:07
I mean that internally the PHP method ProducerTopic::produce() calls the C function rd_kafka_produce (). The C function returns either 0 (success), or -1 (failure). If it returns -1, the PHP method throws an exception. Else, it returns nothing.
axiao
@joostshao
Mar 08 2016 10:08
yes, i know it.
if failed it will throw a PHP exception
thank you !