hey everyone!
I am working on some very old legacy PHP project and I don't have much room to improve on stuff. We recently implemented sending to kafka and issue now is we have quite a lot of connections from PHP project to Kafka in TIME_WAIT.
Unfortunately we are opening new connection for each request which is far for optimal. My questions is, is there some way to close TCP connection after flush().
Good day everyone!
I am trying to test out this library, but have strange behaviour on high-level consumer, default auto rebalancing in specific. I suppose there are some mistakes with Conf()
settings, but have troubles with debugging on my own, can you help me out?
We using Docker for local tests. There are eight partitions in the topic. While consuming messages with 4 sec timeout Consumer provides this info:
[17:39:16][partition 0] Timed out [key: '' offset: 0]
[17:39:20][partition 0] Timed out [key: '' offset: 0]
[17:39:20][partition 4] No more messages; will wait for more [key: '' offset: 14]
[17:39:24][partition 0] Timed out [key: '' offset: 0]
[17:39:28][partition 0] Timed out [key: '' offset: 0]
[17:39:32][partition 0] Timed out [key: '' offset: 0]
[17:39:36][partition 0] Timed out [key: '' offset: 0]
[17:39:40][partition 0] Timed out [key: '' offset: 0]
[17:39:44][partition 0] Timed out [key: '' offset: 0]
[17:39:48][partition 0] Timed out [key: '' offset: 0]
[17:39:52][partition 0] Timed out [key: '' offset: 0]
[17:39:56][partition 0] Timed out [key: '' offset: 0]
[17:40:00][partition 0] Timed out [key: '' offset: 0]
[17:40:04][partition 0] Timed out [key: '' offset: 0]
[17:40:08][partition 0] Timed out [key: '' offset: 0]
[17:40:12][partition 0] Timed out [key: '' offset: 0]
[17:40:16][partition 0] Timed out [key: '' offset: 0]
[17:40:20][partition 0] Timed out [key: '' offset: 0]
[17:40:24][partition 0] Timed out [key: '' offset: 0]
[17:40:28][partition 0] Timed out [key: '' offset: 0]
[17:40:32][partition 0] Timed out [key: '' offset: 0]
[17:40:36][partition 0] Timed out [key: '' offset: 0]
Why Consumer sticks only to partition 0 with timeout error although there are other seven partitions, even with successful consuming?
Conf()
state$this->config = new Conf();
$this->config->set('topic.metadata.refresh.interval.ms', env('KAFKA_REFRESH_INTERVAL_MSEC')); // 10000
$this->config->set('metadata.broker.list', env('KAFKA_BROKER_LIST'));
$this->config->set('auto.offset.reset', 'largest');
$this->config->set('enable.partition.eof', 'true');
Oh, thanks a lot! You are awesome with fast responding =)
so are messages actually not being consumed
Yeah, this is the case. I am 100% sure there are messages in the topic for each partition (check it with Kafdrop/Kafkamanager), but consumer skips most of them with an error, very rarely successfully consuming a message. There is a single consumer for the topic within a group:
$this->config->set('group.id', 'kafka_consumer_group_' . $topic);
I add a rebalance callback as documented example stated for logging partition assign and got this:
[10:46:41] Timed out
[10:46:45] Timed out
Assign: array(1) {
[0]=>
object(RdKafka\TopicPartition)#483 (3) {
["topic"]=>
string(9) "test"
["partition"]=>
int(0)
["offset"]=>
int(-1001)
}
}
[10:46:46][partition 0] No more messages; will wait for more [key: '' offset: 22]
[10:46:50] Timed out
[10:46:54] Timed out
[10:46:58] Timed out
[10:47:02] Timed out
[10:47:06] Timed out
I wonder if there are problems with offsets (I use auto.offset.reset = largest
).
so in your case you are only consuming new messages
Yes, this is intended. New messages are published in the topic (after consumer start), but consumer don't consume then at all except rare cases, even after few minutes, so I don't think this error isn't applicable. I don't understand why sometimes message are consumed successfully (so this means Kafka setting are correct, right?), but mostly there are only errors.
How can I debug what is the problem behind 'Timed out' (like, is there is problem with connection, group default settings, maybe I need to switch to low-level consumer). Is there any data I can provide?
if you need more debugging on consumer side add this:
$conf->set('debug', 'consumer,cgrp,topic,fetch');
additionally you could also up the log level (default: 6):
$conf->set('log_level', '7');
%7|1575541711.847|JOIN|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": join with 1 (1) subscribed topic(s)
%7|1575541711.847|CGRPMETADATA|rdkafka#consumer-1| [thrd:main]: consumer join: metadata for subscription is up to date (1991ms old)
%7|1575541711.847|JOIN|rdkafka#consumer-1| [thrd:main]: kafka:9092/1001: Joining group "kafka_consumer_group_test" with 1 subscribed topic(s)
%7|1575541711.847|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test" changed join state init -> wait-join (v1, state up)
%7|1575541713.847|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit internal error: Local: No offset stored
%7|1575541713.847|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
%7|1575541713.847|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (__NO_OFFSET)
%7|1575541718.847|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit internal error: Local: No offset stored
%7|1575541718.847|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
%7|1575541718.847|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (__NO_OFFSET)
[13:28:38] Timed out
$topic = 'test';
$this->config = new Conf();
$this->config->set('debug', 'consumer,cgrp,topic,fetch');
$this->config->set('metadata.broker.list', env('KAFKA_BROKER_LIST'));
$this->config->set('auto.offset.reset', 'largest');
$this->config->set('enable.partition.eof', 'true');
$this->config->set('group.id', 'kafka_consumer_group_' . $topic);
...
$consumer = new KafkaConsumer($this->config);
$consumer->subscribe((array) $topic);
while (true) {
$message = $consumer->consume(4*1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
echo '[' . date('H:i:s') . "][partition {$message->partition}] {$message->payload} [key: '{$message->key}' offset: {$message->offset}]\n";
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo '[' . date('H:i:s') . "][partition {$message->partition}] No more messages; will wait for more [key: '{$message->key}' offset: {$message->offset}]\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo '[' . date('H:i:s') . "] Timed out \n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
while(true) {
for ($i = 0; $i < 8; $i++) {
echo 'Writing message ' . $i . "\n";
$topic->produce($i, 0,'test message','x');
}
while ($producer->getOutQLen() > 0) {
$producer->poll(1);
}
usleep(100000);
}
ok so i cannot reproduce this with:
system: alpine
php: 7.2.25 (fpm image)
rdkafka: 4.0.0 (pecl)
librdkafka (build): 1.1.0.255
i am using a producer to put a message (ever 100ms) into a partition between 0-7 and the consumer (same as your code snippet) works perfectly :)
therefore my assumption is: either there is a problem with our producer or with the broker
@alexandra-default the log you provided gives me the impression you subscribe and immediately unsubscribe, you could check with this:
$conf->setRebalanceCb(function (KafkaConsumer $kafka, $err, array $partitions = null) {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
echo "Assign: ";
var_dump($partitions);
$kafka->assign($partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
echo "Revoke: ";
var_dump($partitions);
$kafka->assign(NULL);
break;
default:
throw new \Exception($err);
}
});
to see if there is truly an unassign / revoke, which would explain a lot :D
largest
offset as starting point
you could check with this
I have an impression rebalancing working not as expected with
%7|1575544204.115|SUBSCRIPTION|rdkafka#consumer-1| [thrd:main]: Topic test with 8 partition(s)
...
Assign: array(0) {
}
%7|1575544212.536|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test" received op ASSIGN (v0) in state up (join state wait-assign-rebalance_cb, v1 vs 0)
%7|1575544212.536|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": new assignment of 0 partition(s) in join state wait-assign-rebalance_cb
%7|1575544212.536|BARRIER|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": rd_kafka_cgrp_assign:2363: new version barrier v2
%7|1575544212.536|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": assigning 0 partition(s) in join state wait-assign-rebalance_cb
%7|1575544212.536|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test" changed join state wait-assign-rebalance_cb -> assigned (v2, state up)
%7|1575544212.536|BARRIER|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": rd_kafka_cgrp_partitions_fetch_start0:1676: new version barrier v3
%7|1575544212.536|FETCHSTART|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": starting fetchers for 0 assigned partition(s) in join-state assigned (usable_offsets=no, v3, line 2408)
%7|1575544212.536|FETCHSTART|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1575544213.106|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit internal error: Local: No offset stored
%7|1575544213.106|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit for 0 partition(s): cgrp auto commit timer: returned: Local: No offset stored
%7|1575544213.106|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": unassign done in state up (join state assigned): with new assignment: OffsetCommit done (__NO_OFFSET)
[14:10:13] Timed out
while (true) {
$msg = $queue->consume(15000);
// librdkafka before 1.0 returns message with RD_KAFKA_RESP_ERR__PARTITION_EOF when reaching topic end.
if (!$msg) {
continue;
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $msg->err) {
throw new Exception($msg->errstr(), $msg->err);
}
$messages[] = sprintf("Got message: %s from %s", $msg->payload, $msg->topic_name);
}