Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Magnus Edenhill
    @edenhill
    can you show me?
    jstean
    @jstean
    @edenhill topic with 10 partitions, some consumers ( in unique groups using high level consumer ) have "unknown" for current offset and lag
    @edenhill as a result they dont get all the messages on the topic
    Magnus Edenhill
    @edenhill
    Depending on Kafka version you might nee to specify --new-consumer to that script
    If it still shows up as unknown it means that consumer has not committed offsets for the given partition, or that the committed offsets have been purged by the offset topic retention policy. (1w by default, iirc)
    jstean
    @jstean
    @edenhill kafka_2.10-0.10.1.1
    @edenhill I'm using the default conf with [enable.auto.offset.store] => true and enable.auto.commit] => true
    Magnus Edenhill
    @edenhill
    If a librdkafka consumer reaches the end of the partition before seeing any real messages it will not commit the offset of the "end offset" of the partition. This differs from the Java client.
    jstean
    @jstean
    does that mean it just then stops consuming further partitions?
    @edenhill its very inconsistent, some times cannot get the "issue" to happen with identical code/properties
    Magnus Edenhill
    @edenhill
    no, it will keep consuming.
    If you can reproduce with debug=cgrp,topic,fetch,broker,protocol then open an issue on librdkafka
    jstean
    @jstean
    @edenhill ok ty will do that :+1:
    ChenwayWong
    @ChenwayWong
    hi
    anyone here ?
    jianzhiunique
    @jianzhiunique
    hello?
    when I connect to a invalide ip:port (means no kafka), producer will block(php ), I use strace and got this message :futex(0x161290c, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME, 42837, {1507802562, 768738000}, ffffffff) = -1 ETIMEDOUT (Connection timed out)
    futex(0x16128e0, FUTEX_WAKE_PRIVATE, 1) = 0
    gettimeofday({1507802562, 768946}, NULL) = 0
    futex(0x161290c, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME, 42839, {1507802562, 769946000}, ffffffff) = -1 ETIMEDOUT (Connection timed out)
    futex(0x16128e0, FUTEX_WAKE_PRIVATE, 1) = 0
    gettimeofday({1507802562, 770097}, NULL) = 0
    how to close connection and stop php script?
    Keyur Shah
    @keyurshah070
    Hi
    How to acknowledge consume message in kafka using php-rdkafka?
    after producing new message I am always getting old message as well as new messages
    is there any way to receive only new messages ?
    Any suggestions or hints ?
    Magnus Edenhill
    @edenhill
    @keyurshah070 offset commits should take care of that. Make sure you close() the consumer before terminating so that final offsets are committed.
    Keyur Shah
    @keyurshah070
    Thanks a lot @edenhill for response
    Could you please give me some example ?
    Keyur Shah
    @keyurshah070
    can someone have some example on this ?
    Magnus Edenhill
    @edenhill
    consoumer->close()
    Keyur Shah
    @keyurshah070
    I downloaded rdkafka stub for editor and I can not find close() method
    you can see my code below

    `kafkaConsumer = new RdKafka\Consumer();
    $kafkaConsumer->addBrokers("127.0.0.1:9292");
    $topic = $kafkaConsumer->newTopic("test");
    $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);

    while (true) {
    $msg = $topic->consume(0, 1000);
    if($msg){
    if ($msg->err) {
    echo $msg->errstr(), "\n";
    break;
    } else {
    echo $msg->payload, "\n";
    }
    }
    }`

    Magnus Edenhill
    @edenhill
    oh, right, they made it implicit in the destructor.
    But that's the low-level consumer and you tell it to start at the beginning. If you want to continue where you last left off you should use the high-level consumer. See the docs for an example.
    Keyur Shah
    @keyurshah070

    $kafkaConsumer = new RdKafka\Consumer();
    $kafkaConsumer->addBrokers("127.0.0.1:9292");
    $topic = $kafkaConsumer->newTopic("test");
    $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);

    while (true) {
    $msg = $topic->consume(0, 1000);
    if($msg){
    if ($msg->err) {
    echo $msg->errstr(), "\n";
    break;
    } else {
    echo $msg->payload, "\n";
    }
    }
    }`

    Keyur Shah
    @keyurshah070
    Thanks for the response
    but I am not getting
    alexander-thomas
    @alexander-thomas
    Hi, can I somehow get timestamp of kafka event?
    Robert Slootjes
    @slootjes
    I'm just messing around with kafka now and this extension (thanks a lot for it!) but I run into something I don't understand. When I have a topic with 5 partitions and 2 consumers on the same group I only get 40% (2/5) of the messages and the rest is "lost"
    what should I do to get all messages? When using 2 partitions and 2 consumers everything goes well btw.
    jstean
    @jstean
    @slootjes are you sure they are in the same group? you can use kafka-consumer-groups.sh in your kafka install to inspect
    Robert Slootjes
    @slootjes
    they are in the same group, but 2 consumers consume 2 partitions, not all 5 of them
    I'm using the free Cloudkarafka plan to test with so I don't have access to server side tools right now
    Robert Slootjes
    @slootjes
    just tried it again and now it works fine...
    Robert Slootjes
    @slootjes
    I think I know what it is... I stopped 1 of the 2 consumers and there was no repartioning
    so the 1 consumer that I have left just gets 2 out of 5 topics and I lose the rest
    for some reason no repartioning is taking place
    Robert Slootjes
    @slootjes
    When killing a process (ctrl+c) somehow kafka doesn't seem to take it out of the pool directly
    so when I kill it and rejoin I sometimes end up with 3 out of 5 partitions not being asigned to a consumer...how can I prevent this?
    ubimsk
    @ubimsk
    Hi is there a way to catch an exception when adding a broker fails?