Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    alexandra-default
    @alexandra-default
    Thanks a lot for all your time, you're the best! =)
    Nick
    @nick-zh
    @steveb_gitlab also added it as comment in the PR let me know when we can watch travis do the work XD
    @alexandra-default no worries, happy to help ;)
    Paweł Niedzielski
    @steveb_gitlab
    @nick-zh won't it put it in potentially infinite loop?
    Nick
    @nick-zh
    i hope not, with the eof setting to true, instead of null, we will always get RD_KAFKA_RESP_ERR__PARTITION_EOF on which we break :)
    Paweł Niedzielski
    @steveb_gitlab
    Pushed. It theoretically should do the trick if we receive EOF only if BOTH topics are exhausted
    Nick
    @nick-zh
    we have two topics? sry i didn't check that well, then probably we also need the old count in again :D
    ay :D
    Paweł Niedzielski
    @steveb_gitlab
    two topics or partitions
    thats the whole point of that test :P
    two topics
    • test_rdkafka0<UNIQID>
    • test_rdkafka1<UNIQID>
    oh crap, now I understand what that $eof was for... :D
    Nick
    @nick-zh
    no worries, just re-add it, afterwards i think it should be fine
    Paweł Niedzielski
    @steveb_gitlab
    isn't there something in message that would allow us to detect which topic message arrived from?
    public $topic_name ;
    should be better than just incrementing ;)
    Nick
    @nick-zh
    the problme is, the eof event doesn't have a topic, the message on the other hand does :)
    Paweł Niedzielski
    @steveb_gitlab
    wait, we receive EOF for a topic, but we don't know which one? :D
    Nick
    @nick-zh
    i can test, but i would assume no :D
    Paweł Niedzielski
    @steveb_gitlab
    Ok, we'll leave it for some other time
    my normal work has to be done :)
    I'll just push that $eof change
    Nick
    @nick-zh
    i was wrong, we do get the topic, yay
    no worries, same here :D
    this should do then
    Andrew Bashuk
    @96andlgrac_twitter
    Hello all. Please, help me with one question. How can i check that only one consumer listen to partition? Because if I start 2 processes with the same consumer listen to replica 0 for example, this two consumers receive messages.
    So, there is no guarantee that if a separated process started somehow, messages will not be processed twice.
    Paweł Niedzielski
    @steveb_gitlab
    @96andlgrac_twitter in this case you should use Consumer Groups. For php-rdkafka it's a setting that you're required to set - group.id. Only one consumer from the same group is allowed to listen to a particular topic partition.
    I'm assuming high-level consumers of course.
    Arnaud Tiérant
    @atierant
    Hi everyone, do you know if it is possible to get informations about partitions offsets (min offset, max offset) and current consumer group offset per partition, from a KafkaConsumer ? I'm under 4.0.2 & I see queryWatermarkOffsets, getOffsetPositions, but I didn't manage to connect... Thanks
    Arnaud Tiérant
    @atierant
    Nick
    @nick-zh
    heya, yeah as you figured out this is possible with https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka-kafkaconsumer.querywatermakoffsets.html to get the topics offsets and since you have a consumer (high or low, possible with both) you also know where you are with your offsets.
    the readme in github and the doc should help how to create a consumer :)
    abdo1
    @abdo1
    how to know the length of queue in example

    $conf = new RdKafka\Conf();
    $conf->set('group.id', 'test-consumer-group');

    $rk = new RdKafka\Consumer($conf);
    $rk->addBrokers("xx.xxx.xxx.xx:9092");
    $queue = $rk->newQueue();

    $topicConf = new RdKafka\TopicConf();
    $topicConf->set('auto.commit.interval.ms', 100);

    $topicConf->set('auto.offset.reset', 'smallest');

    $Topic1 = $rk->newTopic("test", $topicConf);

    $Topic1->consumeQueueStart(0, RD_KAFKA_OFFSET_STORED, $queue);
    $Topic1->consumeQueueStart(1, RD_KAFKA_OFFSET_STORED, $queue);

    abdo1
    @abdo1
    @steveb_gitlab
    Paweł Niedzielski
    @steveb_gitlab
    @abdo1 see the comment from nick above - watermark offsets are what you're looking for.
    Arnaud Tiérant
    @atierant
    @nick-zh Thanks, so I'm on the right track ? I have to get all metadatas, then topics, then ones matching mines, then partitions on each topic, and then queryWatermarkOffsets on each partition ?
    Arnaud Tiérant
    @atierant

    heya, yeah as you figured out this is possible with https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka-kafkaconsumer.querywatermakoffsets.html to get the topics offsets and since you have a consumer (high or low, possible with both) you also know where you are with your offsets.
    the readme in github and the doc should help how to create a consumer :)

    Okay I got it, I have caught min & max for each of my topics with queryWatermarkOffsets. But you told me "you also know where you are with your offsets", what about when offsets are stored server side, for a specific consumer group ?

    Paweł Niedzielski
    @steveb_gitlab
    @atierant offset is part of the message, so you will get that info from each message. When eof is enabled then you will also get it when reaching topic EOF (no more messages to read) probably (haven't checked that, but I assume that will be the case)
    Arnaud Tiérant
    @atierant
    @steveb_gitlab Thanks. But I'd like to request current offset kept by the broker without consuming, I try to implement a tiny monitoring tool
    Paweł Niedzielski
    @steveb_gitlab
    I'm not sure but it might not be possible with current phprdkafka.
    but it's not integrated to phprdkafka as far as I'm aware
    Arnaud Tiérant
    @atierant
    I've seen the same request in kafka-python :) dpkp/kafka-python#1501
    Paweł Niedzielski
    @steveb_gitlab
    Open an issue on github, and maybe nick will pick it up :) he's the main contributor atm. However, he's also busy with his own things in his company.
    Or you can give it a try yourself
    I'm currently busy with a completely different piece of tech, so can't work on it :(
    Arnaud Tiérant
    @atierant
    I understand, don't worry :) I haven't developed any php extension yet, but I'll open the issue and give a try in parallel :)
    Arnaud Tiérant
    @atierant
    Isn't it the same as arnaud-lb/php-rdkafka#336 ?
    Paweł Niedzielski
    @steveb_gitlab
    Looks similar yes, but I'm not sure. It seems like you can get commmited offset that way.