Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Nick
    @nick-zh
    yeah, this is not good :D
    Nick
    @nick-zh
    so this could potentially happen, if you have 8 partitions, 8 active consumer and you start a 9th consumer, this one would get exactly what you get :D
    @steveb_gitlab probably we (and by we i mean you XD ) should change this to:
    while (true) {
        $msg = $queue->consume(15000);
        // librdkafka before 1.0 returns message with RD_KAFKA_RESP_ERR__PARTITION_EOF when reaching topic end.
        if (!$msg) {
            continue;
        }
        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $msg->err) {
            throw new Exception($msg->errstr(), $msg->err);
        }
        $messages[] = sprintf("Got message: %s from %s", $msg->payload, $msg->topic_name);
    }
    ah fuck, hang on, not this
    alexandra-default
    @alexandra-default
    I have 8 partitions and yet only 1 consumer =)
    Nick
    @nick-zh
    @alexandra-default is it a local kafka that you are running?
    Paweł Niedzielski
    @steveb_gitlab
    @alexandra-default you launch that consumer by some web url request?
    alexandra-default
    @alexandra-default
    I use Docker container for Apache Kafka.
    I launch consumer "locally" from PHP container.
    Nick
    @nick-zh
    alright, if you change the consumer group to something different, do you get the same result?

    @steveb_gitlab next try:

    //replace the consumer loop
    while (true) {
        $msg = $queue->consume(15000);
        // librdkafka before 1.0 returns message with RD_KAFKA_RESP_ERR__PARTITION_EOF when reaching topic end.
        if (!$msg) {
            continue;
        }
    
        if ($msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
            break;
        } elseif (RD_KAFKA_RESP_ERR_NO_ERROR !== $msg->err) {
            throw new Exception($msg->errstr(), $msg->err);
        }
        $messages[] = sprintf("Got message: %s from %s", $msg->payload, $msg->topic_name);
    }

    this should do it

    also at the beginning:
    $conf->set('enable.partition.eof', 'true');
    alexandra-default
    @alexandra-default

    Oh, I've changed the group, and then got this endless sheet:

    %7|1575545856.530|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [2] at offset 52 (v2)
    %7|1575545856.530|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [3] at offset 51 (v2)
    %7|1575545856.530|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [0] at offset 41 (v2)
    %7|1575545856.530|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [1] at offset 41 (v2)
    %7|1575545856.530|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch 4/4/8 toppar(s)
    %7|1575545856.631|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [3] at offset 51 (v2)
    %7|1575545856.631|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [0] at offset 41 (v2)
    %7|1575545856.631|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [1] at offset 41 (v2)
    %7|1575545856.631|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [2] at offset 52 (v2)
    %7|1575545856.631|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch 4/4/8 toppar(s)
    %7|1575545856.732|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [0] at offset 41 (v2)
    %7|1575545856.732|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [1] at offset 41 (v2)
    %7|1575545856.732|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [2] at offset 52 (v2)
    %7|1575545856.732|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch topic test [3] at offset 51 (v2)
    %7|1575545856.732|FETCH|rdkafka#consumer-1| [thrd:kafka:9092/1001]: kafka:9092/1001: Fetch 4/4/8 toppar(s)

    This is better, I assume, but I still didn't get to switch with echoing a message :D

    Nick
    @nick-zh
    since you are latest, maybe there are no new messages, so seeing this though, i think you have some dead consumers in the original group (the amount or more than partitons), this is why it works properly now with a new group.
    just to check if consumption works, you could switch to another group again and set to earliest :)
    alexandra-default
    @alexandra-default

    since you are latest, maybe there are no new messages

    Okay, this time it was a problem with producer lags, soooo...
    Yaaaay, success!

    crying with tears of joy
    Nick
    @nick-zh
    great to hear, happy producing / consuming
    alexandra-default
    @alexandra-default
    Thanks a lot for all your time, you're the best! =)
    Nick
    @nick-zh
    @steveb_gitlab also added it as comment in the PR let me know when we can watch travis do the work XD
    @alexandra-default no worries, happy to help ;)
    Paweł Niedzielski
    @steveb_gitlab
    @nick-zh won't it put it in potentially infinite loop?
    Nick
    @nick-zh
    i hope not, with the eof setting to true, instead of null, we will always get RD_KAFKA_RESP_ERR__PARTITION_EOF on which we break :)
    Paweł Niedzielski
    @steveb_gitlab
    Pushed. It theoretically should do the trick if we receive EOF only if BOTH topics are exhausted
    Nick
    @nick-zh
    we have two topics? sry i didn't check that well, then probably we also need the old count in again :D
    ay :D
    Paweł Niedzielski
    @steveb_gitlab
    two topics or partitions
    thats the whole point of that test :P
    two topics
    • test_rdkafka0<UNIQID>
    • test_rdkafka1<UNIQID>
    oh crap, now I understand what that $eof was for... :D
    Nick
    @nick-zh
    no worries, just re-add it, afterwards i think it should be fine
    Paweł Niedzielski
    @steveb_gitlab
    isn't there something in message that would allow us to detect which topic message arrived from?
    public $topic_name ;
    should be better than just incrementing ;)
    Nick
    @nick-zh
    the problme is, the eof event doesn't have a topic, the message on the other hand does :)
    Paweł Niedzielski
    @steveb_gitlab
    wait, we receive EOF for a topic, but we don't know which one? :D
    Nick
    @nick-zh
    i can test, but i would assume no :D
    Paweł Niedzielski
    @steveb_gitlab
    Ok, we'll leave it for some other time
    my normal work has to be done :)
    I'll just push that $eof change
    Nick
    @nick-zh
    i was wrong, we do get the topic, yay
    no worries, same here :D
    this should do then
    Andrew Bashuk
    @96andlgrac_twitter
    Hello all. Please, help me with one question. How can i check that only one consumer listen to partition? Because if I start 2 processes with the same consumer listen to replica 0 for example, this two consumers receive messages.
    So, there is no guarantee that if a separated process started somehow, messages will not be processed twice.
    Paweł Niedzielski
    @steveb_gitlab
    @96andlgrac_twitter in this case you should use Consumer Groups. For php-rdkafka it's a setting that you're required to set - group.id. Only one consumer from the same group is allowed to listen to a particular topic partition.
    I'm assuming high-level consumers of course.
    Arnaud Tiérant
    @atierant
    Hi everyone, do you know if it is possible to get informations about partitions offsets (min offset, max offset) and current consumer group offset per partition, from a KafkaConsumer ? I'm under 4.0.2 & I see queryWatermarkOffsets, getOffsetPositions, but I didn't manage to connect... Thanks
    Arnaud Tiérant
    @atierant
    Nick
    @nick-zh
    heya, yeah as you figured out this is possible with https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka-kafkaconsumer.querywatermakoffsets.html to get the topics offsets and since you have a consumer (high or low, possible with both) you also know where you are with your offsets.
    the readme in github and the doc should help how to create a consumer :)
    abdo1
    @abdo1
    how to know the length of queue in example

    $conf = new RdKafka\Conf();
    $conf->set('group.id', 'test-consumer-group');

    $rk = new RdKafka\Consumer($conf);
    $rk->addBrokers("xx.xxx.xxx.xx:9092");
    $queue = $rk->newQueue();

    $topicConf = new RdKafka\TopicConf();
    $topicConf->set('auto.commit.interval.ms', 100);

    $topicConf->set('auto.offset.reset', 'smallest');

    $Topic1 = $rk->newTopic("test", $topicConf);

    $Topic1->consumeQueueStart(0, RD_KAFKA_OFFSET_STORED, $queue);
    $Topic1->consumeQueueStart(1, RD_KAFKA_OFFSET_STORED, $queue);