Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    alexandra-default
    @alexandra-default
    So there are errors with offset:
    %7|1575541711.847|JOIN|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": join with 1 (1) subscribed topic(s)
    %7|1575541711.847|CGRPMETADATA|rdkafka#consumer-1| [thrd:main]: consumer join: metadata for subscription is up to date (1991ms old)
    %7|1575541711.847|JOIN|rdkafka#consumer-1| [thrd:main]: kafka:9092/1001: Joining group "kafka_consumer_group_test" with 1 subscribed topic(s)
    %7|1575541711.847|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test" changed join state init -> wait-join (v1, state up)
    %7|1575541713.847|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit internal error: Local: No offset stored
    %7|1575541713.847|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
    %7|1575541713.847|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (__NO_OFFSET)
    %7|1575541718.847|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit internal error: Local: No offset stored
    %7|1575541718.847|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
    %7|1575541718.847|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (__NO_OFFSET)
    [13:28:38] Timed out
    Nick
    @nick-zh
    is there any consumer code you can provide so i can check this out?
    alexandra-default
    @alexandra-default
    Sure!
    $topic = 'test';
    
    $this->config = new Conf();
    $this->config->set('debug', 'consumer,cgrp,topic,fetch');
    $this->config->set('metadata.broker.list', env('KAFKA_BROKER_LIST'));
    $this->config->set('auto.offset.reset', 'largest');
    $this->config->set('enable.partition.eof', 'true');
    $this->config->set('group.id', 'kafka_consumer_group_' . $topic);
    
    ...
    
    $consumer = new KafkaConsumer($this->config);
    $consumer->subscribe((array) $topic);
    while (true) {
        $message = $consumer->consume(4*1000);
        switch ($message->err) {
            case RD_KAFKA_RESP_ERR_NO_ERROR:
                echo '[' . date('H:i:s') . "][partition {$message->partition}] {$message->payload} [key: '{$message->key}' offset: {$message->offset}]\n";
                break;
            case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                echo '[' . date('H:i:s') . "][partition {$message->partition}] No more messages; will wait for more [key: '{$message->key}' offset: {$message->offset}]\n";
                break;
            case RD_KAFKA_RESP_ERR__TIMED_OUT:
                echo '[' . date('H:i:s') . "] Timed out \n";
                break;
            default:
                throw new \Exception($message->errstr(), $message->err);
                break;
        }
    }
    Nick
    @nick-zh
    which php version are you using?
    alexandra-default
    @alexandra-default
    php:7.2-fpm
    Nick
    @nick-zh
    kk, will quickly set this up and try to reproduce with your code snippet
    alexandra-default
    @alexandra-default
    That would me awesome, thanks! Do you also need producer setting? Maybe there is a problem with partition amount (8)...
    Nick
    @nick-zh
    yeah sure if you have a producer snippet, that would help, so i can reproduce it more accurately
    otherwise i would just use something like
    while(true) {
            for ($i = 0; $i < 8; $i++) {
                echo 'Writing message ' . $i . "\n";
                $topic->produce($i, 0,'test message','x');
            }
            while ($producer->getOutQLen() > 0) {
                $producer->poll(1);
            }
            usleep(100000);
        }

    ok so i cannot reproduce this with:
    system: alpine
    php: 7.2.25 (fpm image)
    rdkafka: 4.0.0 (pecl)
    librdkafka (build): 1.1.0.255

    i am using a producer to put a message (ever 100ms) into a partition between 0-7 and the consumer (same as your code snippet) works perfectly :)
    therefore my assumption is: either there is a problem with our producer or with the broker

    Paweł Niedzielski
    @steveb_gitlab
    @nick-zh https://travis-ci.org/arnaud-lb/php-rdkafka/jobs/621026799 still fails on librdkafka 0.11.6 :D
    steveb_gitlab @steveb_gitlab bashes head into wall
    Nick
    @nick-zh

    @alexandra-default the log you provided gives me the impression you subscribe and immediately unsubscribe, you could check with this:

    $conf->setRebalanceCb(function (KafkaConsumer $kafka, $err, array $partitions = null) {
        switch ($err) {
            case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                echo "Assign: ";
                var_dump($partitions);
                $kafka->assign($partitions);
                break;
    
            case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                echo "Revoke: ";
                var_dump($partitions);
                $kafka->assign(NULL);
                break;
    
            default:
                throw new \Exception($err);
        }
    });

    to see if there is truly an unassign / revoke, which would explain a lot :D

    alexandra-default
    @alexandra-default
    @nick-zh Good news =) I will look further into it, thanks for your time a lot!
    Paweł Niedzielski
    @steveb_gitlab
    If php-fpm is used, then at times Kafka might not realize that consumer from consumer group has left and still considers him active. Then a new process comes up and asks Kafka for topics/partitions to listen to, and it cannot give one since old consumer is still considered as "subscribed"
    But my bet would be that largest offset as starting point
    Nick
    @nick-zh
    yeah you are right Pawel, if there is a lingering dead client, it will take some time until it is killed, so a new one can register, but i would assume with multiple partitions, a rebalance would be proposed, which should kill the hopefully non repsonding "dead" client
    ales said, this is intended, they only want new messges :)
    *alex
    alexandra-default
    @alexandra-default

    you could check with this

    I have an impression rebalancing working not as expected with

    %7|1575544204.115|SUBSCRIPTION|rdkafka#consumer-1| [thrd:main]:  Topic test with 8 partition(s)
    ...
    Assign: array(0) {
    }
    %7|1575544212.536|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test" received op ASSIGN (v0) in state up (join state wait-assign-rebalance_cb, v1 vs 0)
    %7|1575544212.536|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": new assignment of 0 partition(s) in join state wait-assign-rebalance_cb
    %7|1575544212.536|BARRIER|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": rd_kafka_cgrp_assign:2363: new version barrier v2
    %7|1575544212.536|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": assigning 0 partition(s) in join state wait-assign-rebalance_cb
    %7|1575544212.536|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test" changed join state wait-assign-rebalance_cb -> assigned (v2, state up)
    %7|1575544212.536|BARRIER|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": rd_kafka_cgrp_partitions_fetch_start0:1676: new version barrier v3
    %7|1575544212.536|FETCHSTART|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": starting fetchers for 0 assigned partition(s) in join-state assigned (usable_offsets=no, v3, line 2408)
    %7|1575544212.536|FETCHSTART|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
    %7|1575544213.106|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit internal error: Local: No offset stored
    %7|1575544213.106|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit for 0 partition(s): cgrp auto commit timer: returned: Local: No offset stored
    %7|1575544213.106|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": unassign done in state up (join state assigned): with new assignment: OffsetCommit done (__NO_OFFSET)
    [14:10:13] Timed out
    Nick
    @nick-zh
    yeah, this is not good :D
    Nick
    @nick-zh
    so this could potentially happen, if you have 8 partitions, 8 active consumer and you start a 9th consumer, this one would get exactly what you get :D
    @steveb_gitlab probably we (and by we i mean you XD ) should change this to:
    while (true) {
        $msg = $queue->consume(15000);
        // librdkafka before 1.0 returns message with RD_KAFKA_RESP_ERR__PARTITION_EOF when reaching topic end.
        if (!$msg) {
            continue;
        }
        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $msg->err) {
            throw new Exception($msg->errstr(), $msg->err);
        }
        $messages[] = sprintf("Got message: %s from %s", $msg->payload, $msg->topic_name);
    }
    ah fuck, hang on, not this
    alexandra-default
    @alexandra-default
    I have 8 partitions and yet only 1 consumer =)
    Nick
    @nick-zh
    @alexandra-default is it a local kafka that you are running?
    Paweł Niedzielski
    @steveb_gitlab
    @alexandra-default you launch that consumer by some web url request?
    alexandra-default
    @alexandra-default
    I use Docker container for Apache Kafka.
    I launch consumer "locally" from PHP container.
    Nick
    @nick-zh
    alright, if you change the consumer group to something different, do you get the same result?

    @steveb_gitlab next try:

    //replace the consumer loop
    while (true) {
        $msg = $queue->consume(15000);
        // librdkafka before 1.0 returns message with RD_KAFKA_RESP_ERR__PARTITION_EOF when reaching topic end.
        if (!$msg) {
            continue;
        }
    
        if ($msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
            break;
        } elseif (RD_KAFKA_RESP_ERR_NO_ERROR !== $msg->err) {
            throw new Exception($msg->errstr(), $msg->err);
        }
        $messages[] = sprintf("Got message: %s from %s", $msg->payload, $msg->topic_name);
    }

    this should do it

    also at the beginning:
    $conf->set('enable.partition.eof', 'true');
    alexandra-default
    @alexandra-default

    Oh, I've changed the group, and then got this endless sheet:

    %7|1575545856.530|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [2] at offset 52 (v2)
    %7|1575545856.530|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [3] at offset 51 (v2)
    %7|1575545856.530|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [0] at offset 41 (v2)
    %7|1575545856.530|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [1] at offset 41 (v2)
    %7|1575545856.530|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch 4/4/8 toppar(s)
    %7|1575545856.631|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [3] at offset 51 (v2)
    %7|1575545856.631|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [0] at offset 41 (v2)
    %7|1575545856.631|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [1] at offset 41 (v2)
    %7|1575545856.631|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [2] at offset 52 (v2)
    %7|1575545856.631|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch 4/4/8 toppar(s)
    %7|1575545856.732|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [0] at offset 41 (v2)
    %7|1575545856.732|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [1] at offset 41 (v2)
    %7|1575545856.732|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [2] at offset 52 (v2)
    %7|1575545856.732|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [3] at offset 51 (v2)
    %7|1575545856.732|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch 4/4/8 toppar(s)

    This is better, I assume, but I still didn't get to switch with echoing a message :D

    Nick
    @nick-zh
    since you are latest, maybe there are no new messages, so seeing this though, i think you have some dead consumers in the original group (the amount or more than partitons), this is why it works properly now with a new group.
    just to check if consumption works, you could switch to another group again and set to earliest :)
    alexandra-default
    @alexandra-default

    since you are latest, maybe there are no new messages

    Okay, this time it was a problem with producer lags, soooo...
    Yaaaay, success!

    crying with tears of joy
    Nick
    @nick-zh
    great to hear, happy producing / consuming
    alexandra-default
    @alexandra-default
    Thanks a lot for all your time, you're the best! =)
    Nick
    @nick-zh
    @steveb_gitlab also added it as comment in the PR let me know when we can watch travis do the work XD
    @alexandra-default no worries, happy to help ;)
    Paweł Niedzielski
    @steveb_gitlab
    @nick-zh won't it put it in potentially infinite loop?
    Nick
    @nick-zh
    i hope not, with the eof setting to true, instead of null, we will always get RD_KAFKA_RESP_ERR__PARTITION_EOF on which we break :)
    Paweł Niedzielski
    @steveb_gitlab
    Pushed. It theoretically should do the trick if we receive EOF only if BOTH topics are exhausted
    Nick
    @nick-zh
    we have two topics? sry i didn't check that well, then probably we also need the old count in again :D
    ay :D
    Paweł Niedzielski
    @steveb_gitlab
    two topics or partitions
    thats the whole point of that test :P
    two topics
    • test_rdkafka0<UNIQID>
    • test_rdkafka1<UNIQID>
    oh crap, now I understand what that $eof was for... :D
    Nick
    @nick-zh
    no worries, just re-add it, afterwards i think it should be fine
    Paweł Niedzielski
    @steveb_gitlab
    isn't there something in message that would allow us to detect which topic message arrived from?
    public $topic_name ;