<?php
echo "started----------------------<br> ";
$conf = new \RdKafka\Conf();
$conf->set('socket.timeout.ms', 50); // or socket.blocking.max.ms, depending on librdkafka version
if (function_exists('pcntl_sigprocmask')) {
pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
$conf->set('internal.termination.signal', SIGIO);
} else {
$conf->set('queue.buffering.max.ms', 1);
}
/////////////Producer produces data
$producer = new \RdKafka\Producer($conf);
$producer->setLogLevel(LOG_DEBUG);
$producer->addBrokers("127.0.0.2");
$topic = $producer->newTopic("dbtest");
for ($i = 0; $i < 10; $i++) {
$data[$i] = array(
'name'=>$i,
'email'=>"sonali".$i
);
echo "produced $i----------------------/n";
$payload = json_encode($data[$i]);
$topic->produce(RD_KAFKA_PARTITION_UA, 0, $payload);
$producer->poll(0);
}
while ($producer->getOutQLen() > 0) {
$producer->poll(0);
}
/////////////////////////Consumer consumes data
$conf->set('group.id', 'myConsumerGroup');
// Initial list of Kafka brokers
$conf->set('metadata.broker.list', '127.0.0.2');
$topicConf = new RdKafka\TopicConf();
// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');
// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);
//////////////////First Consumer
$consumer1 = new RdKafka\KafkaConsumer($conf);
// Subscribe to topic 'test'
$consumer1->subscribe(['dbtest']);
while (true) {
$j= 0;
$message = $consumer1->consume(120*1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$dataEx = json_decode($message->payload,true);
var_dump($data);
$sql = "INSERT INTO emp (name, email) VALUES ('".$dataEx['name']."','".$dataEx['email']."')";
$servername = "localhost";
$username = "a";
$password = "a";
$dbname = "test";
// Create connection
$conn = new mysqli($servername, $username, $password, $dbname);
// Check connection
if ($conn->connect_error) {
die("Connection failed: " . $conn->connect_error);
}
if ($conn->query($sql) === TRUE) {
echo "New record created successfully to datbase ".$dbname."/n";
} else {
echo "Error: " . $sql . "<br>" . $conn->error;
}
$conn->close();
echo "produced $j----------------------<br> ";
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
$j++;
}
echo "here";
/////////////////Second Consumer
$conf->set('group.id', 'myConsumerGroup1');
// Initial list of Kafka brokers
$conf->set('metadata.broker.list', '127.0.0.2');
$topicConf = new RdKafka\TopicConf();
// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');
// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);
$consumer2 = new RdKafka\KafkaConsumer($conf);
echo "sonali";
// Subscribe to topic 'test'
$consumer2->subscribe(['dbtest']);
while (true) {
$message2 = $consumer2->consume(1000);
switch ($message2->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$dataEx = json_decode($message2->payload,true);
var_dump($data);
$sql = "INSERT INTO emp (name, email) VALUES ('".$dataEx['name']."','".$dataEx['email']."')";
$servername1 = "localhost";
$username1 = "a";
$password1 = "a";
$dbname1 = "test1";
// Create connection
$conn1 = new mysqli($servername1, $username1, $password1, $dbname1);
// Ch