Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Paweł Niedzielski
    @steveb_gitlab
    and it's not really an issue with phprdkafka (or librdkafka actually)
    Zakhar Morozov
    @fortael
    I'm not quite understood you. Our java team use it in that configuration. So i expect php-extension able to do the same. I tried to use .pem file but the i got Java JAAS configuration is not supported too.
    Paweł Niedzielski
    @steveb_gitlab
    @fortael Java client is not the same as librdkafka (C library for Kafka client for all other languages - node, python, or in this case php)
    librdkafka itself does not support Java Truststores as the exception is telling you
    Zakhar Morozov
    @fortael
    Got it
    Paweł Niedzielski
    @steveb_gitlab
    Judging from the fact that the librdkafka community didn't implement it it's either proprietary and cannot be replicated to librdkafka, or it's complicated for some other reason
    but I'd say the former
    Paweł Niedzielski
    @steveb_gitlab
    btw @fortael
    see this discussion for possible solution for you: arnaud-lb/php-rdkafka#225
    BOR4
    @BOR4

    hey everyone!

    I am working on some very old legacy PHP project and I don't have much room to improve on stuff. We recently implemented sending to kafka and issue now is we have quite a lot of connections from PHP project to Kafka in TIME_WAIT.
    Unfortunately we are opening new connection for each request which is far for optimal. My questions is, is there some way to close TCP connection after flush().

    another questions is: can I somehow try utilize connection.max.idle.ms for producing messages?
    Nick
    @nick-zh
    heya
    I think freeing the producer (setting it to null) probably will do the trick. also if you are pre librdkafka:1.0, this might too:
    https://github.com/arnaud-lb/php-rdkafka#socketblockingmaxms-librdkafka--100
    from 1.x onward this is not necessary anymore
    if destroying the producer object is not an option connections.max.idle.ms can maybe used to achieve this too but i haven't tried it.
    BOR4
    @BOR4
    thank you Nick I will try it!
    alexandra-default
    @alexandra-default

    Good day everyone!
    I am trying to test out this library, but have strange behaviour on high-level consumer, default auto rebalancing in specific. I suppose there are some mistakes with Conf() settings, but have troubles with debugging on my own, can you help me out?
    We using Docker for local tests. There are eight partitions in the topic. While consuming messages with 4 sec timeout Consumer provides this info:

    [17:39:16][partition 0] Timed out [key: '' offset: 0]
    [17:39:20][partition 0] Timed out [key: '' offset: 0]
    [17:39:20][partition 4] No more messages; will wait for more [key: '' offset: 14]
    [17:39:24][partition 0] Timed out [key: '' offset: 0]
    [17:39:28][partition 0] Timed out [key: '' offset: 0]
    [17:39:32][partition 0] Timed out [key: '' offset: 0]
    [17:39:36][partition 0] Timed out [key: '' offset: 0]
    [17:39:40][partition 0] Timed out [key: '' offset: 0]
    [17:39:44][partition 0] Timed out [key: '' offset: 0]
    [17:39:48][partition 0] Timed out [key: '' offset: 0]
    [17:39:52][partition 0] Timed out [key: '' offset: 0]
    [17:39:56][partition 0] Timed out [key: '' offset: 0]
    [17:40:00][partition 0] Timed out [key: '' offset: 0]
    [17:40:04][partition 0] Timed out [key: '' offset: 0]
    [17:40:08][partition 0] Timed out [key: '' offset: 0]
    [17:40:12][partition 0] Timed out [key: '' offset: 0]
    [17:40:16][partition 0] Timed out [key: '' offset: 0]
    [17:40:20][partition 0] Timed out [key: '' offset: 0]
    [17:40:24][partition 0] Timed out [key: '' offset: 0]
    [17:40:28][partition 0] Timed out [key: '' offset: 0]
    [17:40:32][partition 0] Timed out [key: '' offset: 0]
    [17:40:36][partition 0] Timed out [key: '' offset: 0]

    Why Consumer sticks only to partition 0 with timeout error although there are other seven partitions, even with successful consuming?

    Here is current Conf() state
    $this->config = new Conf();
    $this->config->set('topic.metadata.refresh.interval.ms', env('KAFKA_REFRESH_INTERVAL_MSEC')); // 10000
    $this->config->set('metadata.broker.list', env('KAFKA_BROKER_LIST'));
    $this->config->set('auto.offset.reset', 'largest');
    $this->config->set('enable.partition.eof', 'true');
    Nick
    @nick-zh
    heya
    can you elaborte a bit more, so are messages actually not being consumed or are you just wondering, after consumption, why it sticks to partition 0?
    Nick
    @nick-zh
    Ok, so i checked the code, librdkafka actually returns null on timeout, so the 0 is coming from us (the extension, 99% sure), so you can ignore the partition info on this "error" since we don't have that info, i will check if we can improve this, so partition is NULL. If your concern is, that the consumer is not "jumping" around and picking up new messages from other partitions, no worries, works like a charm ;)
    Nick
    @nick-zh
    PR is created ;) should be fixed soon(ish) :D
    alexandra-default
    @alexandra-default

    Oh, thanks a lot! You are awesome with fast responding =)

    so are messages actually not being consumed

    Yeah, this is the case. I am 100% sure there are messages in the topic for each partition (check it with Kafdrop/Kafkamanager), but consumer skips most of them with an error, very rarely successfully consuming a message. There is a single consumer for the topic within a group:

    $this->config->set('group.id', 'kafka_consumer_group_' . $topic);

    I add a rebalance callback as documented example stated for logging partition assign and got this:

    [10:46:41] Timed out
    [10:46:45] Timed out
    Assign: array(1) {
      [0]=>
      object(RdKafka\TopicPartition)#483 (3) {
        ["topic"]=>
        string(9) "test"
        ["partition"]=>
        int(0)
        ["offset"]=>
        int(-1001)
      }
    }
    [10:46:46][partition 0] No more messages; will wait for more [key: '' offset: 22]
    [10:46:50] Timed out
    [10:46:54] Timed out
    [10:46:58] Timed out
    [10:47:02] Timed out
    [10:47:06] Timed out

    I wonder if there are problems with offsets (I use auto.offset.reset = largest).

    Nick
    @nick-zh
    most certainly, earliest means: for the very first time, start at the beginning, latest means: for the very first time, start at the end
    so in your case you are only consuming new messages.
    Nick
    @nick-zh
    if you have already commited offsets, you need to use a new consumer group to get all the messages
    and maybe for futher explenation, timed out is not really an error (more of a soft error), just means in your timeout there was no message to consume,
    high level consumer balancing can take a few seconds in the beginning
    alexandra-default
    @alexandra-default

    so in your case you are only consuming new messages

    Yes, this is intended. New messages are published in the topic (after consumer start), but consumer don't consume then at all except rare cases, even after few minutes, so I don't think this error isn't applicable. I don't understand why sometimes message are consumed successfully (so this means Kafka setting are correct, right?), but mostly there are only errors.

    How can I debug what is the problem behind 'Timed out' (like, is there is problem with connection, group default settings, maybe I need to switch to low-level consumer). Is there any data I can provide?

    Maybe there is no problem with this library, but with behaviour of librdkafka itself?
    Nick
    @nick-zh
    first of all it would be helpfull to know:
    • version of rdkafka
    • version of librdkafka
      it's also hard for me to tell, since it could be related to either producer or consumer. the default log level that is in place should be enough to see broker errors.
      timeout really isn't an error, just means, that there were no more messages to read
      if you see any other errors, please let me know
    alexandra-default
    @alexandra-default
    We use
    librdkafka version (runtime) => 1.1.0
    librdkafka version (build) => 1.1.0.255
    
    ...
    
    echo phpversion('rdkafka');
    4.0.0-dev
    Nick
    @nick-zh
    ok, so if you set setDrMsgCb for your producer, you can check if the message was truly sent to rule this out as the source of delay

    if you need more debugging on consumer side add this:

    $conf->set('debug', 'consumer,cgrp,topic,fetch');

    additionally you could also up the log level (default: 6):

    $conf->set('log_level', '7');
    alexandra-default
    @alexandra-default
    That is so nice of you to provide settings, thanks!
    So there are errors with offset:
    %7|1575541711.847|JOIN|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": join with 1 (1) subscribed topic(s)
    %7|1575541711.847|CGRPMETADATA|rdkafka#consumer-1| [thrd:main]: consumer join: metadata for subscription is up to date (1991ms old)
    %7|1575541711.847|JOIN|rdkafka#consumer-1| [thrd:main]: kafka:9092/1001: Joining group "kafka_consumer_group_test" with 1 subscribed topic(s)
    %7|1575541711.847|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test" changed join state init -> wait-join (v1, state up)
    %7|1575541713.847|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit internal error: Local: No offset stored
    %7|1575541713.847|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
    %7|1575541713.847|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (__NO_OFFSET)
    %7|1575541718.847|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit internal error: Local: No offset stored
    %7|1575541718.847|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
    %7|1575541718.847|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (__NO_OFFSET)
    [13:28:38] Timed out
    Nick
    @nick-zh
    is there any consumer code you can provide so i can check this out?
    alexandra-default
    @alexandra-default
    Sure!
    $topic = 'test';
    
    $this->config = new Conf();
    $this->config->set('debug', 'consumer,cgrp,topic,fetch');
    $this->config->set('metadata.broker.list', env('KAFKA_BROKER_LIST'));
    $this->config->set('auto.offset.reset', 'largest');
    $this->config->set('enable.partition.eof', 'true');
    $this->config->set('group.id', 'kafka_consumer_group_' . $topic);
    
    ...
    
    $consumer = new KafkaConsumer($this->config);
    $consumer->subscribe((array) $topic);
    while (true) {
        $message = $consumer->consume(4*1000);
        switch ($message->err) {
            case RD_KAFKA_RESP_ERR_NO_ERROR:
                echo '[' . date('H:i:s') . "][partition {$message->partition}] {$message->payload} [key: '{$message->key}' offset: {$message->offset}]\n";
                break;
            case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                echo '[' . date('H:i:s') . "][partition {$message->partition}] No more messages; will wait for more [key: '{$message->key}' offset: {$message->offset}]\n";
                break;
            case RD_KAFKA_RESP_ERR__TIMED_OUT:
                echo '[' . date('H:i:s') . "] Timed out \n";
                break;
            default:
                throw new \Exception($message->errstr(), $message->err);
                break;
        }
    }
    Nick
    @nick-zh
    which php version are you using?
    alexandra-default
    @alexandra-default
    php:7.2-fpm
    Nick
    @nick-zh
    kk, will quickly set this up and try to reproduce with your code snippet
    alexandra-default
    @alexandra-default
    That would me awesome, thanks! Do you also need producer setting? Maybe there is a problem with partition amount (8)...
    Nick
    @nick-zh
    yeah sure if you have a producer snippet, that would help, so i can reproduce it more accurately
    otherwise i would just use something like
    while(true) {
            for ($i = 0; $i < 8; $i++) {
                echo 'Writing message ' . $i . "\n";
                $topic->produce($i, 0,'test message','x');
            }
            while ($producer->getOutQLen() > 0) {
                $producer->poll(1);
            }
            usleep(100000);
        }

    ok so i cannot reproduce this with:
    system: alpine
    php: 7.2.25 (fpm image)
    rdkafka: 4.0.0 (pecl)
    librdkafka (build): 1.1.0.255

    i am using a producer to put a message (ever 100ms) into a partition between 0-7 and the consumer (same as your code snippet) works perfectly :)
    therefore my assumption is: either there is a problem with our producer or with the broker

    Paweł Niedzielski
    @steveb_gitlab
    @nick-zh https://travis-ci.org/arnaud-lb/php-rdkafka/jobs/621026799 still fails on librdkafka 0.11.6 :D
    steveb_gitlab @steveb_gitlab bashes head into wall
    Nick
    @nick-zh

    @alexandra-default the log you provided gives me the impression you subscribe and immediately unsubscribe, you could check with this:

    $conf->setRebalanceCb(function (KafkaConsumer $kafka, $err, array $partitions = null) {
        switch ($err) {
            case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                echo "Assign: ";
                var_dump($partitions);
                $kafka->assign($partitions);
                break;
    
            case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                echo "Revoke: ";
                var_dump($partitions);
                $kafka->assign(NULL);
                break;
    
            default:
                throw new \Exception($err);
        }
    });

    to see if there is truly an unassign / revoke, which would explain a lot :D

    alexandra-default
    @alexandra-default
    @nick-zh Good news =) I will look further into it, thanks for your time a lot!
    Paweł Niedzielski
    @steveb_gitlab
    If php-fpm is used, then at times Kafka might not realize that consumer from consumer group has left and still considers him active. Then a new process comes up and asks Kafka for topics/partitions to listen to, and it cannot give one since old consumer is still considered as "subscribed"
    But my bet would be that largest offset as starting point
    Nick
    @nick-zh
    yeah you are right Pawel, if there is a lingering dead client, it will take some time until it is killed, so a new one can register, but i would assume with multiple partitions, a rebalance would be proposed, which should kill the hopefully non repsonding "dead" client
    ales said, this is intended, they only want new messges :)
    *alex
    alexandra-default
    @alexandra-default

    you could check with this

    I have an impression rebalancing working not as expected with

    %7|1575544204.115|SUBSCRIPTION|rdkafka#consumer-1| [thrd:main]:  Topic test with 8 partition(s)
    ...
    Assign: array(0) {
    }
    %7|1575544212.536|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test" received op ASSIGN (v0) in state up (join state wait-assign-rebalance_cb, v1 vs 0)
    %7|1575544212.536|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": new assignment of 0 partition(s) in join state wait-assign-rebalance_cb
    %7|1575544212.536|BARRIER|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": rd_kafka_cgrp_assign:2363: new version barrier v2
    %7|1575544212.536|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": assigning 0 partition(s) in join state wait-assign-rebalance_cb
    %7|1575544212.536|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test" changed join state wait-assign-rebalance_cb -> assigned (v2, state up)
    %7|1575544212.536|BARRIER|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": rd_kafka_cgrp_partitions_fetch_start0:1676: new version barrier v3
    %7|1575544212.536|FETCHSTART|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": starting fetchers for 0 assigned partition(s) in join-state assigned (usable_offsets=no, v3, line 2408)
    %7|1575544212.536|FETCHSTART|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
    %7|1575544213.106|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit internal error: Local: No offset stored
    %7|1575544213.106|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit for 0 partition(s): cgrp auto commit timer: returned: Local: No offset stored
    %7|1575544213.106|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": unassign done in state up (join state assigned): with new assignment: OffsetCommit done (__NO_OFFSET)
    [14:10:13] Timed out
    Nick
    @nick-zh
    yeah, this is not good :D
    Nick
    @nick-zh
    so this could potentially happen, if you have 8 partitions, 8 active consumer and you start a 9th consumer, this one would get exactly what you get :D