Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Nick
    @nick-zh

    @alexandra-default the log you provided gives me the impression you subscribe and immediately unsubscribe, you could check with this:

    $conf->setRebalanceCb(function (KafkaConsumer $kafka, $err, array $partitions = null) {
        switch ($err) {
            case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                echo "Assign: ";
                var_dump($partitions);
                $kafka->assign($partitions);
                break;
    
            case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                echo "Revoke: ";
                var_dump($partitions);
                $kafka->assign(NULL);
                break;
    
            default:
                throw new \Exception($err);
        }
    });

    to see if there is truly an unassign / revoke, which would explain a lot :D

    alexandra-default
    @alexandra-default
    @nick-zh Good news =) I will look further into it, thanks for your time a lot!
    Paweł Niedzielski
    @steveb_gitlab
    If php-fpm is used, then at times Kafka might not realize that consumer from consumer group has left and still considers him active. Then a new process comes up and asks Kafka for topics/partitions to listen to, and it cannot give one since old consumer is still considered as "subscribed"
    But my bet would be that largest offset as starting point
    Nick
    @nick-zh
    yeah you are right Pawel, if there is a lingering dead client, it will take some time until it is killed, so a new one can register, but i would assume with multiple partitions, a rebalance would be proposed, which should kill the hopefully non repsonding "dead" client
    ales said, this is intended, they only want new messges :)
    *alex
    alexandra-default
    @alexandra-default

    you could check with this

    I have an impression rebalancing working not as expected with

    %7|1575544204.115|SUBSCRIPTION|rdkafka#consumer-1| [thrd:main]:  Topic test with 8 partition(s)
    ...
    Assign: array(0) {
    }
    %7|1575544212.536|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test" received op ASSIGN (v0) in state up (join state wait-assign-rebalance_cb, v1 vs 0)
    %7|1575544212.536|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": new assignment of 0 partition(s) in join state wait-assign-rebalance_cb
    %7|1575544212.536|BARRIER|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": rd_kafka_cgrp_assign:2363: new version barrier v2
    %7|1575544212.536|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": assigning 0 partition(s) in join state wait-assign-rebalance_cb
    %7|1575544212.536|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test" changed join state wait-assign-rebalance_cb -> assigned (v2, state up)
    %7|1575544212.536|BARRIER|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": rd_kafka_cgrp_partitions_fetch_start0:1676: new version barrier v3
    %7|1575544212.536|FETCHSTART|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": starting fetchers for 0 assigned partition(s) in join-state assigned (usable_offsets=no, v3, line 2408)
    %7|1575544212.536|FETCHSTART|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
    %7|1575544213.106|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit internal error: Local: No offset stored
    %7|1575544213.106|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit for 0 partition(s): cgrp auto commit timer: returned: Local: No offset stored
    %7|1575544213.106|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka_consumer_group_test": unassign done in state up (join state assigned): with new assignment: OffsetCommit done (__NO_OFFSET)
    [14:10:13] Timed out
    Nick
    @nick-zh
    yeah, this is not good :D
    Nick
    @nick-zh
    so this could potentially happen, if you have 8 partitions, 8 active consumer and you start a 9th consumer, this one would get exactly what you get :D
    @steveb_gitlab probably we (and by we i mean you XD ) should change this to:
    while (true) {
        $msg = $queue->consume(15000);
        // librdkafka before 1.0 returns message with RD_KAFKA_RESP_ERR__PARTITION_EOF when reaching topic end.
        if (!$msg) {
            continue;
        }
        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $msg->err) {
            throw new Exception($msg->errstr(), $msg->err);
        }
        $messages[] = sprintf("Got message: %s from %s", $msg->payload, $msg->topic_name);
    }
    ah fuck, hang on, not this
    alexandra-default
    @alexandra-default
    I have 8 partitions and yet only 1 consumer =)
    Nick
    @nick-zh
    @alexandra-default is it a local kafka that you are running?
    Paweł Niedzielski
    @steveb_gitlab
    @alexandra-default you launch that consumer by some web url request?
    alexandra-default
    @alexandra-default
    I use Docker container for Apache Kafka.
    I launch consumer "locally" from PHP container.
    Nick
    @nick-zh
    alright, if you change the consumer group to something different, do you get the same result?

    @steveb_gitlab next try:

    //replace the consumer loop
    while (true) {
        $msg = $queue->consume(15000);
        // librdkafka before 1.0 returns message with RD_KAFKA_RESP_ERR__PARTITION_EOF when reaching topic end.
        if (!$msg) {
            continue;
        }
    
        if ($msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
            break;
        } elseif (RD_KAFKA_RESP_ERR_NO_ERROR !== $msg->err) {
            throw new Exception($msg->errstr(), $msg->err);
        }
        $messages[] = sprintf("Got message: %s from %s", $msg->payload, $msg->topic_name);
    }

    this should do it

    also at the beginning:
    $conf->set('enable.partition.eof', 'true');
    alexandra-default
    @alexandra-default

    Oh, I've changed the group, and then got this endless sheet:

    %7|1575545856.530|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [2] at offset 52 (v2)
    %7|1575545856.530|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [3] at offset 51 (v2)
    %7|1575545856.530|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [0] at offset 41 (v2)
    %7|1575545856.530|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [1] at offset 41 (v2)
    %7|1575545856.530|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch 4/4/8 toppar(s)
    %7|1575545856.631|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [3] at offset 51 (v2)
    %7|1575545856.631|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [0] at offset 41 (v2)
    %7|1575545856.631|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [1] at offset 41 (v2)
    %7|1575545856.631|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [2] at offset 52 (v2)
    %7|1575545856.631|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch 4/4/8 toppar(s)
    %7|1575545856.732|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [0] at offset 41 (v2)
    %7|1575545856.732|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [1] at offset 41 (v2)
    %7|1575545856.732|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [2] at offset 52 (v2)
    %7|1575545856.732|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [3] at offset 51 (v2)
    %7|1575545856.732|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch 4/4/8 toppar(s)

    This is better, I assume, but I still didn't get to switch with echoing a message :D

    Nick
    @nick-zh
    since you are latest, maybe there are no new messages, so seeing this though, i think you have some dead consumers in the original group (the amount or more than partitons), this is why it works properly now with a new group.
    just to check if consumption works, you could switch to another group again and set to earliest :)
    alexandra-default
    @alexandra-default

    since you are latest, maybe there are no new messages

    Okay, this time it was a problem with producer lags, soooo...
    Yaaaay, success!

    crying with tears of joy
    Nick
    @nick-zh
    great to hear, happy producing / consuming
    alexandra-default
    @alexandra-default
    Thanks a lot for all your time, you're the best! =)
    Nick
    @nick-zh
    @steveb_gitlab also added it as comment in the PR let me know when we can watch travis do the work XD
    @alexandra-default no worries, happy to help ;)
    Paweł Niedzielski
    @steveb_gitlab
    @nick-zh won't it put it in potentially infinite loop?
    Nick
    @nick-zh
    i hope not, with the eof setting to true, instead of null, we will always get RD_KAFKA_RESP_ERR__PARTITION_EOF on which we break :)
    Paweł Niedzielski
    @steveb_gitlab
    Pushed. It theoretically should do the trick if we receive EOF only if BOTH topics are exhausted
    Nick
    @nick-zh
    we have two topics? sry i didn't check that well, then probably we also need the old count in again :D
    ay :D
    Paweł Niedzielski
    @steveb_gitlab
    two topics or partitions
    thats the whole point of that test :P
    two topics
    • test_rdkafka0<UNIQID>
    • test_rdkafka1<UNIQID>
    oh crap, now I understand what that $eof was for... :D
    Nick
    @nick-zh
    no worries, just re-add it, afterwards i think it should be fine
    Paweł Niedzielski
    @steveb_gitlab
    isn't there something in message that would allow us to detect which topic message arrived from?
    public $topic_name ;
    should be better than just incrementing ;)
    Nick
    @nick-zh
    the problme is, the eof event doesn't have a topic, the message on the other hand does :)
    Paweł Niedzielski
    @steveb_gitlab
    wait, we receive EOF for a topic, but we don't know which one? :D
    Nick
    @nick-zh
    i can test, but i would assume no :D
    Paweł Niedzielski
    @steveb_gitlab
    Ok, we'll leave it for some other time
    my normal work has to be done :)
    I'll just push that $eof change
    Nick
    @nick-zh
    i was wrong, we do get the topic, yay
    no worries, same here :D
    this should do then