Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    alexandra-default
    @alexandra-default
    Maybe there is no problem with this library, but with behaviour of librdkafka itself?
    Nick
    @nick-zh
    first of all it would be helpfull to know:
    • version of rdkafka
    • version of librdkafka
      it's also hard for me to tell, since it could be related to either producer or consumer. the default log level that is in place should be enough to see broker errors.
      timeout really isn't an error, just means, that there were no more messages to read
      if you see any other errors, please let me know
    alexandra-default
    @alexandra-default
    We use
    librdkafka version (runtime) => 1.1.0
    librdkafka version (build) => 1.1.0.255
    
    ...
    
    echo phpversion('rdkafka');
    4.0.0-dev
    Nick
    @nick-zh
    ok, so if you set setDrMsgCb for your producer, you can check if the message was truly sent to rule this out as the source of delay

    if you need more debugging on consumer side add this:

    $conf->set('debug', 'consumer,cgrp,topic,fetch');

    additionally you could also up the log level (default: 6):

    $conf->set('log_level', '7');
    alexandra-default
    @alexandra-default
    That is so nice of you to provide settings, thanks!
    So there are errors with offset:
    %7|1575541711.847|JOIN|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": join with 1 (1) subscribed topic(s)
    %7|1575541711.847|CGRPMETADATA|rdkafka#consumer-1| [thrd:main]: consumer join: metadata for subscription is up to date (1991ms old)
    %7|1575541711.847|JOIN|rdkafka#consumer-1| [thrd:main]: kafka:9092/1001: Joining group "kafka_consumer_group_test" with 1 subscribed topic(s)
    %7|1575541711.847|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test" changed join state init -> wait-join (v1, state up)
    %7|1575541713.847|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit internal error: Local: No offset stored
    %7|1575541713.847|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
    %7|1575541713.847|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (__NO_OFFSET)
    %7|1575541718.847|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit internal error: Local: No offset stored
    %7|1575541718.847|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
    %7|1575541718.847|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (__NO_OFFSET)
    [13:28:38] Timed out
    Nick
    @nick-zh
    is there any consumer code you can provide so i can check this out?
    alexandra-default
    @alexandra-default
    Sure!
    $topic = 'test';
    
    $this->config = new Conf();
    $this->config->set('debug', 'consumer,cgrp,topic,fetch');
    $this->config->set('metadata.broker.list', env('KAFKA_BROKER_LIST'));
    $this->config->set('auto.offset.reset', 'largest');
    $this->config->set('enable.partition.eof', 'true');
    $this->config->set('group.id', 'kafka_consumer_group_' . $topic);
    
    ...
    
    $consumer = new KafkaConsumer($this->config);
    $consumer->subscribe((array) $topic);
    while (true) {
        $message = $consumer->consume(4*1000);
        switch ($message->err) {
            case RD_KAFKA_RESP_ERR_NO_ERROR:
                echo '[' . date('H:i:s') . "][partition {$message->partition}] {$message->payload} [key: '{$message->key}' offset: {$message->offset}]\n";
                break;
            case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                echo '[' . date('H:i:s') . "][partition {$message->partition}] No more messages; will wait for more [key: '{$message->key}' offset: {$message->offset}]\n";
                break;
            case RD_KAFKA_RESP_ERR__TIMED_OUT:
                echo '[' . date('H:i:s') . "] Timed out \n";
                break;
            default:
                throw new \Exception($message->errstr(), $message->err);
                break;
        }
    }
    Nick
    @nick-zh
    which php version are you using?
    alexandra-default
    @alexandra-default
    php:7.2-fpm
    Nick
    @nick-zh
    kk, will quickly set this up and try to reproduce with your code snippet
    alexandra-default
    @alexandra-default
    That would me awesome, thanks! Do you also need producer setting? Maybe there is a problem with partition amount (8)...
    Nick
    @nick-zh
    yeah sure if you have a producer snippet, that would help, so i can reproduce it more accurately
    otherwise i would just use something like
    while(true) {
            for ($i = 0; $i < 8; $i++) {
                echo 'Writing message ' . $i . "\n";
                $topic->produce($i, 0,'test message','x');
            }
            while ($producer->getOutQLen() > 0) {
                $producer->poll(1);
            }
            usleep(100000);
        }

    ok so i cannot reproduce this with:
    system: alpine
    php: 7.2.25 (fpm image)
    rdkafka: 4.0.0 (pecl)
    librdkafka (build): 1.1.0.255

    i am using a producer to put a message (ever 100ms) into a partition between 0-7 and the consumer (same as your code snippet) works perfectly :)
    therefore my assumption is: either there is a problem with our producer or with the broker

    Paweł Niedzielski
    @steveb_gitlab
    @nick-zh https://travis-ci.org/arnaud-lb/php-rdkafka/jobs/621026799 still fails on librdkafka 0.11.6 :D
    steveb_gitlab @steveb_gitlab bashes head into wall
    Nick
    @nick-zh

    @alexandra-default the log you provided gives me the impression you subscribe and immediately unsubscribe, you could check with this:

    $conf->setRebalanceCb(function (KafkaConsumer $kafka, $err, array $partitions = null) {
        switch ($err) {
            case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                echo "Assign: ";
                var_dump($partitions);
                $kafka->assign($partitions);
                break;
    
            case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                echo "Revoke: ";
                var_dump($partitions);
                $kafka->assign(NULL);
                break;
    
            default:
                throw new \Exception($err);
        }
    });

    to see if there is truly an unassign / revoke, which would explain a lot :D

    alexandra-default
    @alexandra-default
    @nick-zh Good news =) I will look further into it, thanks for your time a lot!
    Paweł Niedzielski
    @steveb_gitlab
    If php-fpm is used, then at times Kafka might not realize that consumer from consumer group has left and still considers him active. Then a new process comes up and asks Kafka for topics/partitions to listen to, and it cannot give one since old consumer is still considered as "subscribed"
    But my bet would be that largest offset as starting point
    Nick
    @nick-zh
    yeah you are right Pawel, if there is a lingering dead client, it will take some time until it is killed, so a new one can register, but i would assume with multiple partitions, a rebalance would be proposed, which should kill the hopefully non repsonding "dead" client
    ales said, this is intended, they only want new messges :)
    *alex
    alexandra-default
    @alexandra-default

    you could check with this

    I have an impression rebalancing working not as expected with

    %7|1575544204.115|SUBSCRIPTION|rdkafka#consumer-1| [thrd:main]:  Topic test with 8 partition(s)
    ...
    Assign: array(0) {
    }
    %7|1575544212.536|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test" received op ASSIGN (v0) in state up (join state wait-assign-rebalance_cb, v1 vs 0)
    %7|1575544212.536|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": new assignment of 0 partition(s) in join state wait-assign-rebalance_cb
    %7|1575544212.536|BARRIER|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": rd_kafka_cgrp_assign:2363: new version barrier v2
    %7|1575544212.536|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": assigning 0 partition(s) in join state wait-assign-rebalance_cb
    %7|1575544212.536|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test" changed join state wait-assign-rebalance_cb -> assigned (v2, state up)
    %7|1575544212.536|BARRIER|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": rd_kafka_cgrp_partitions_fetch_start0:1676: new version barrier v3
    %7|1575544212.536|FETCHSTART|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": starting fetchers for 0 assigned partition(s) in join-state assigned (usable_offsets=no, v3, line 2408)
    %7|1575544212.536|FETCHSTART|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
    %7|1575544213.106|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit internal error: Local: No offset stored
    %7|1575544213.106|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit for 0 partition(s): cgrp auto commit timer: returned: Local: No offset stored
    %7|1575544213.106|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": unassign done in state up (join state assigned): with new assignment: OffsetCommit done (__NO_OFFSET)
    [14:10:13] Timed out
    Nick
    @nick-zh
    yeah, this is not good :D
    Nick
    @nick-zh
    so this could potentially happen, if you have 8 partitions, 8 active consumer and you start a 9th consumer, this one would get exactly what you get :D
    @steveb_gitlab probably we (and by we i mean you XD ) should change this to:
    while (true) {
        $msg = $queue->consume(15000);
        // librdkafka before 1.0 returns message with RD_KAFKA_RESP_ERR__PARTITION_EOF when reaching topic end.
        if (!$msg) {
            continue;
        }
        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $msg->err) {
            throw new Exception($msg->errstr(), $msg->err);
        }
        $messages[] = sprintf("Got message: %s from %s", $msg->payload, $msg->topic_name);
    }
    ah fuck, hang on, not this
    alexandra-default
    @alexandra-default
    I have 8 partitions and yet only 1 consumer =)
    Nick
    @nick-zh
    @alexandra-default is it a local kafka that you are running?
    Paweł Niedzielski
    @steveb_gitlab
    @alexandra-default you launch that consumer by some web url request?
    alexandra-default
    @alexandra-default
    I use Docker container for Apache Kafka.
    I launch consumer "locally" from PHP container.
    Nick
    @nick-zh
    alright, if you change the consumer group to something different, do you get the same result?

    @steveb_gitlab next try:

    //replace the consumer loop
    while (true) {
        $msg = $queue->consume(15000);
        // librdkafka before 1.0 returns message with RD_KAFKA_RESP_ERR__PARTITION_EOF when reaching topic end.
        if (!$msg) {
            continue;
        }
    
        if ($msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
            break;
        } elseif (RD_KAFKA_RESP_ERR_NO_ERROR !== $msg->err) {
            throw new Exception($msg->errstr(), $msg->err);
        }
        $messages[] = sprintf("Got message: %s from %s", $msg->payload, $msg->topic_name);
    }

    this should do it

    also at the beginning:
    $conf->set('enable.partition.eof', 'true');
    alexandra-default
    @alexandra-default

    Oh, I've changed the group, and then got this endless sheet:

    %7|1575545856.530|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [2] at offset 52 (v2)
    %7|1575545856.530|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [3] at offset 51 (v2)
    %7|1575545856.530|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [0] at offset 41 (v2)
    %7|1575545856.530|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [1] at offset 41 (v2)
    %7|1575545856.530|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch 4/4/8 toppar(s)
    %7|1575545856.631|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [3] at offset 51 (v2)
    %7|1575545856.631|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [0] at offset 41 (v2)
    %7|1575545856.631|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [1] at offset 41 (v2)
    %7|1575545856.631|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [2] at offset 52 (v2)
    %7|1575545856.631|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch 4/4/8 toppar(s)
    %7|1575545856.732|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [0] at offset 41 (v2)
    %7|1575545856.732|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [1] at offset 41 (v2)
    %7|1575545856.732|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [2] at offset 52 (v2)
    %7|1575545856.732|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [3] at offset 51 (v2)
    %7|1575545856.732|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch 4/4/8 toppar(s)

    This is better, I assume, but I still didn't get to switch with echoing a message :D

    Nick
    @nick-zh
    since you are latest, maybe there are no new messages, so seeing this though, i think you have some dead consumers in the original group (the amount or more than partitons), this is why it works properly now with a new group.
    just to check if consumption works, you could switch to another group again and set to earliest :)
    alexandra-default
    @alexandra-default

    since you are latest, maybe there are no new messages

    Okay, this time it was a problem with producer lags, soooo...
    Yaaaay, success!

    crying with tears of joy
    Nick
    @nick-zh
    great to hear, happy producing / consuming
    alexandra-default
    @alexandra-default
    Thanks a lot for all your time, you're the best! =)
    Nick
    @nick-zh
    @steveb_gitlab also added it as comment in the PR let me know when we can watch travis do the work XD
    @alexandra-default no worries, happy to help ;)
    Paweł Niedzielski
    @steveb_gitlab
    @nick-zh won't it put it in potentially infinite loop?
    Nick
    @nick-zh
    i hope not, with the eof setting to true, instead of null, we will always get RD_KAFKA_RESP_ERR__PARTITION_EOF on which we break :)
    Paweł Niedzielski
    @steveb_gitlab
    Pushed. It theoretically should do the trick if we receive EOF only if BOTH topics are exhausted
    Nick
    @nick-zh
    we have two topics? sry i didn't check that well, then probably we also need the old count in again :D
    ay :D
    Paweł Niedzielski
    @steveb_gitlab
    two topics or partitions