These are chat archives for arnaud-lb/php-rdkafka

7th
Sep 2018
sonalithakkar
@sonalithakkar
Sep 07 2018 12:11
Hi

<?php
echo "started----------------------<br> ";
$conf = new \RdKafka\Conf();
$conf->set('socket.timeout.ms', 50); // or socket.blocking.max.ms, depending on librdkafka version
if (function_exists('pcntl_sigprocmask')) {
pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
$conf->set('internal.termination.signal', SIGIO);
} else {
$conf->set('queue.buffering.max.ms', 1);
}

/////////////Producer produces data

$producer = new \RdKafka\Producer($conf);
$producer->setLogLevel(LOG_DEBUG);
$producer->addBrokers("127.0.0.2");
$topic = $producer->newTopic("dbtest");
for ($i = 0; $i < 10; $i++) {
$data[$i] = array(
'name'=>$i,
'email'=>"sonali".$i
);
echo "produced $i----------------------/n";
$payload = json_encode($data[$i]);
$topic->produce(RD_KAFKA_PARTITION_UA, 0, $payload);
$producer->poll(0);
}
while ($producer->getOutQLen() > 0) {
$producer->poll(0);
}
/////////////////////////Consumer consumes data

$conf->set('group.id', 'myConsumerGroup');

// Initial list of Kafka brokers
$conf->set('metadata.broker.list', '127.0.0.2');

$topicConf = new RdKafka\TopicConf();

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');

// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);

//////////////////First Consumer
$consumer1 = new RdKafka\KafkaConsumer($conf);

// Subscribe to topic 'test'
$consumer1->subscribe(['dbtest']);

while (true) {
$j= 0;
$message = $consumer1->consume(120*1000);

switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$dataEx = json_decode($message->payload,true);
var_dump($data);
$sql = "INSERT INTO emp (name, email) VALUES ('".$dataEx['name']."','".$dataEx['email']."')";

    $servername = "localhost";
    $username = "a";
    $password = "a";
    $dbname = "test";

    // Create connection
    $conn = new mysqli($servername, $username, $password, $dbname);
    // Check connection
    if ($conn->connect_error) {
        die("Connection failed: " . $conn->connect_error);
    }

    if ($conn->query($sql) === TRUE) {
        echo "New record created successfully to datbase ".$dbname."/n";
    } else {
        echo "Error: " . $sql . "<br>" . $conn->error;
    }
    $conn->close();

    echo "produced $j----------------------<br> ";
    break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
    echo "No more messages; will wait for more\n";
    break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
    echo "Timed out\n";
    break;
default:
    throw new \Exception($message->errstr(), $message->err);
    break;
}
$j++;

}

echo "here";
/////////////////Second Consumer

$conf->set('group.id', 'myConsumerGroup1');

// Initial list of Kafka brokers
$conf->set('metadata.broker.list', '127.0.0.2');

$topicConf = new RdKafka\TopicConf();

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');

// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);

$consumer2 = new RdKafka\KafkaConsumer($conf);
echo "sonali";
// Subscribe to topic 'test'
$consumer2->subscribe(['dbtest']);

while (true) {

$message2 = $consumer2->consume(1000);
switch ($message2->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$dataEx = json_decode($message2->payload,true);
var_dump($data);
$sql = "INSERT INTO emp (name, email) VALUES ('".$dataEx['name']."','".$dataEx['email']."')";
$servername1 = "localhost";
$username1 = "a";
$password1 = "a";
$dbname1 = "test1";

    // Create connection
    $conn1 = new mysqli($servername1, $username1, $password1, $dbname1);
    // Ch
this is my code with different groupid but still it no working
first consumer consumes all data
so second consumer not getting it
please help
me
sonalithakkar
@sonalithakkar
Sep 07 2018 12:17
$message2 = $consumer2->consume(120*1000);
its by mistake $message2 = $consumer2->consume(1000);