Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • 18:25
    pavel-pimenov commented #3190
  • Feb 26 06:39
    jliunyu synchronize #3269
  • Feb 26 05:39
    jliunyu synchronize #3269
  • Feb 26 01:54
    jliunyu synchronize #3269
  • Feb 25 23:12
    jliunyu synchronize #3269
  • Feb 25 19:42
    jliunyu synchronize #3269
  • Feb 25 19:40
    faladu commented #3224
  • Feb 25 18:27
    dearchap commented #3224
  • Feb 25 18:23
    dearchap commented #3224
  • Feb 25 14:59
    tomasdeml commented #2470
  • Feb 25 14:59
    tomasdeml commented #2470
  • Feb 25 14:59
    tomasdeml commented #2470
  • Feb 25 12:37
    tomasdeml commented #2470
  • Feb 25 12:37
    tomasdeml commented #2470
  • Feb 25 12:35
    tomasdeml commented #2470
  • Feb 25 07:14
    mxk1235 commented #2160
  • Feb 25 06:38
    jliunyu edited #3269
  • Feb 25 06:14
    jliunyu synchronize #3269
  • Feb 24 21:58
    jliunyu synchronize #3269
  • Feb 24 20:24
    csubbiah commented #3276
jamasul
@jamasul
kafka is running on kubernetes and if i run kafka-console-produce from the container it runs fine.
SusantaBiswal
@SusantaBiswal

@edenhill - We are using librdkafka [ 1.5.0 ] and Kafka Server [2.4.0 ] .
What we have observed that the produced messages ( multi threaded producer ) are not stored in sequence even if the max.in.flight.requests.per.connection = 5, message.send.max.retries = INT32_MAX and request.required.acks = all is set in RD kafka configuration.

We’ve multi-threaded producer producing messages to a single topic and single partition.
The producer has 10 threads producing 100 batches of 10 messages to each thread, thus a total of 10000 messages.
Each message is uniquely identified by a sequence number and messages are sent in the order.
While the application shows the message is sent in ascending sequence order, the consumer shows that the order stored in Kafka is not in the same sequence as the order of sending.

Please note that

  • Kafka topic has 8 partitions ( kafka server is enabled with 8 partition ), but message is produced to one partition only.
  • The consumer thread subscribes to all partitions of the topic ( in this case subscribes to 8 partitions ) for consuming messages.

For e.g. here is the offset from Kafka and the message sequence Id for a particular consumer thread.

Kafka Offset Sequence Id
303703 2
303701 1
303705 3
303706 4
303707 5
303716 6
303719 7
303726 8
303742 10
303769 9

For another thread,

Kafka Offset Sequence Id
303735 3
303736 4
303737 5
303752 6
303753 7
303754 8
303755 9
303756 10
303764 1
303768 2

Most threads show similar pattern in order. Is there any other configuration that could influence this behavior.
If yes, what configuration must be changed to address this issue.

SusantaBiswal
@SusantaBiswal

@edenhill - We are using librdkafka [ 1.5.0 ] and Kafka Server [2.4.0 ] .
What we have observed that the produced messages ( multi threaded producer ) are not stored in sequence even if the max.in.flight.requests.per.connection = 5, message.send.max.retries = INT32_MAX and request.required.acks = all is set in RD kafka configuration.

We’ve multi-threaded producer producing messages to a single topic and single partition.
The producer has 10 threads producing 100 batches of 10 messages to each thread, thus a total of 10000 messages.
Each message is uniquely identified by a sequence number and messages are sent in the order.
While the application shows the message is sent in ascending sequence order, the consumer shows that the order stored in Kafka is not in the same sequence as the order of sending.

Please note that

  • Kafka topic has 8 partitions ( kafka server is enabled with 8 partition ), but message is produced to one partition only.
  • The consumer thread subscribes to all partitions of the topic ( in this case subscribes to 8 partitions ) for consuming messages.

For e.g. here is the offset from Kafka and the message sequence Id for a particular consumer thread.

Kafka Offset Sequence Id
303703 2
303701 1
303705 3
303706 4
303707 5
303716 6
303719 7
303726 8
303742 10
303769 9

For another thread,

Kafka Offset Sequence Id
303735 3
303736 4
303737 5
303752 6
303753 7
303754 8
303755 9
303756 10
303764 1
303768 2

Most threads show similar pattern in order. Is there any other configuration that could influence this behavior.
If yes, what configuration must be changed to address this issue.

I have tried with Single Partition Kafka Server, but see the same problem with message ordering.

1 reply
Eric Seidel
@gridaphobe
@edenhill https://github.com/edenhill/librdkafka/pull/2701#discussion_r510373487 seems like there might be a bug in your test framework? one of the TEST_FAIL_LATER assertions was swallowed in my DeleteRecords tests
Shiva Shankar D
@dshivashankar_twitter
When is Consumer connection to Kafka Cluster established ? During the creation of RdKafka Consumer or during the first subscription ?
2 replies
Anders Smedegaard Pedersen
@smedegaard_gitlab

Hi,
I'm trying to produce to a version 0.8.2.1 broker with the following.

from confluent_kafka import Producer

def create_legacy_producer() -> Producer:
    """Creates a Kafka Producer object"""
    kafka_conf = {
        'bootstrap.servers': "my.broker:9092",
        'broker.version.fallback': '0.8.2.1',
        'api.version.request': False,
        'debug': 'all'
    }
    return Producer(kafka_conf)


if __name__ == '__main__':
    producer = create_legacy_producer()
    producer.poll(0.1)
    producer.produce("my-topic", "testing")

The debug log seems to say that Produce is not supported by the broker:

%7|1604398713.323|APIVERSION|rdkafka#producer-1| [thrd:my.broker:9092/bootstrap]: my.broker:9092/bootstrap: Using (configuration fallback) 0.8.2.1 protocol features
%7|1604398713.323|APIVERSION|rdkafka#producer-1| [thrd:my.broker:9092/bootstrap]: my.broker:9092/bootstrap:  Feature MsgVer1: Produce (2..2) NOT supported by broker
%7|1604398713.323|APIVERSION|rdkafka#producer-1| [thrd:my.broker:9092/bootstrap]: my.broker:9092/bootstrap:  Feature MsgVer1: Fetch (2..2) NOT supported by broker
%7|1604398713.323|APIVERSION|rdkafka#producer-1| [thrd:my.broker:9092/bootstrap]: my.broker:9092/bootstrap: Disabling feature MsgVer1
%7|1604398713.323|APIVERSION|rdkafka#producer-1| [thrd:my.broker:9092/bootstrap]: my.broker:9092/bootstrap:  Feature MsgVer2: Produce (3..3) NOT supported by broker
%7|1604398713.323|APIVERSION|rdkafka#producer-1| [thrd:my.broker:9092/bootstrap]: my.broker:9092/bootstrap:  Feature MsgVer2: Fetch (4..4) NOT supported by broker
%7|1604398713.323|APIVERSION|rdkafka#producer-1| [thrd:my.broker:9092/bootstrap]: my.broker:9092/bootstrap: Disabling feature MsgVer2

But there is no send() method or anything else to replace Producer.produce(), as far as I can see.

The debug info also shows that the broker's topics are fetched as metadata.
%7|1604398713.325|METADATA|rdkafka#producer-1| [thrd:main]: Absolute update of metadata cache with 34 topic(s)

It then continues to queue my message, but says the broker is not up. But I'm certain it's up. I have a console-consumer running to see if the messages arrives.
%7|1604398713.422|TOPPAR|rdkafka#producer-1| [thrd:my.broker:9092/0]: my.broker:9092/0: my-topic [0] 1 message(s) queued but broker not up

I got a consumer working before. Is there something I'm missing?

Any help is greatly appreciated.

5 replies
Paul Dragoonis
@dragoonis
Hey! I've raised this issue: edenhill/librdkafka#3133 - and I've also DM'd you @edenhill :)
Leonid Gureev
@GureevLeonid
Hi! We are trying to implement proper producer/consumer error handling. And we want to trigger librdkafka fatal errors - to make test of our self-recovery mechanism around it. Is anybody know how to trigger/reproduce fatal errors?
2 replies
Michael03
@Michael03
Hi, Im trying to build https://github.com/edenhill/librdkafka/blob/master/examples/producer.cpp and im getting the error
error: no matching function for call to 'RdKafka::Producer::produce(std::string&, const int32_t&, RdKafka::Producer::<unnamed enum>, char*, std::__cxx11::basic_string<char>::size_type, NULL, int, int, NULL, NULL)'
  177 |                         NULL);
example dockerfile to reproduce
FROM gcc:latest

RUN apt update && apt install librdkafka-dev -y
COPY . /usr/src/myapp
WORKDIR /usr/src/myapp

RUN g++ -o myapp kafka.cpp

CMD ["./myapp"]

LABEL Name=cpp Version=0.0.1
Michael03
@Michael03
Sorted version of problem was version of librdkafkfa-dev
Michael03
@Michael03
Onto the next issue. Were using node-rdkafka in live we've upgraded from 2.2.3 to 2.8.1 which is librdkafka 0.11.1 to 1.3.0
Since doing that were seeing Local Broker Transport failure a few times a day from node-rdkafka error event.
I've read librdkafka does some retrying. Does that mean we should ignore this error and its all fine.
Or does that error only get sent to consumers after retries have failed?
24 replies
Or any idea why we would only be seeing this now?
data-decode
@data-decode
Hello Community, I am new to librdkafka. Could you help me to understand how to use key-value storre with librdkafa. Basically I am looking for cpp equivalent of https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/state/KeyValueStore.html
2 replies
data-decode
@data-decode
Could anyone please help me on this.
Mohammad Roohitavaf
@roohitavaf
I am seeing a weird behavior. Sometimes when I commit an offset+metadata to Kafka, I get this error: Broker: Coordinator not available
but the metadata eventually shows up on the __consumer_offset
What can I do with this Broker: Coordinator not available error?
12 replies
Adam Hunyadi
@hunyadi-dev
Hi @edenhill,
Using the C API of librdkafka, as a consumer, I would like to poll multiple messages in a single batch. However, in rdkafka.h rd_kafka_consume_batch() is listed to be a part of a legacy API. Can you tell me if I have an alternative for batching if I currently use rd_kafka_poll_set_consumer() and rd_kafka_consumer_poll() to poll messages?
Also why is the legacy API legacy? Will it get deprecated or stop getting updates soon?
Yu-Ching Chen
@yuch7
Is there an API to check if it is able to connect to the kafka server?
Sean Hanson
@shanson7

We had an issue with one of our tasks where lag was growing for certain partitions, but we were still getting RdKafka::ERR__TIMED_OUT returned from consume() calls (poll timeout 200 ms). It began during partition rebalance and we needed to kill a couple of the consumers in the group to get the lag to go back down (each time, a different set of partitions got "slowed").

We want to add conditional debug logging at runtime so we can capture more logs during these rebalance times. It seems that debug can only be set at consumer create time, so is there an estimated runtime cost to having debug on all the time and just discarding the messages except when we want to log them?

1 reply
Also, general advice on which categories of debug would be useful. I assume fetch and consumer minimally?
Victor Rosenberg
@harley84
Hi everyone. I have a question regarding enable.auto.commit configuration, in the doc it says "Setting this to false does not prevent the consumer from fetching previously committed start offsets". How can the situation described happen, assuming i'm not managing the offsets manually?
1 reply
sarkanyi
@sarkanyi
@edenhill some time ago I made a PR with changing parts of the code to be consistent with rd_alloc/rd_free not being mixed with plain malloc/free. Actually I'd go even further and change the integrated stuff like lz4 to use rd_alloc/rd_free, but my question is if you'd accept such a PR or I shouldn't bother? Basically everything under src and src-cpp
18 replies
Ярослав
@ChipoDeil
Hi there!
We are using dotnet wrapper for lidrdkafka - confluent-dotnet client and we are facing issues with huge number of local tcp connections being created per every new consumer.
For example, constructing one consumer with 6 brokers will produce 28 strange local tcp connections which can be retrieved with the help of netstat. Output of netstat -o in thread
13 replies
lfranic
@lfranic

hi,
We are getting the error when consuming a topic:
%5|1608141709.340|REQTMOUT|rdkafka#consumer-1| [thrd:10.200.116.115:31092/2]: 10.20.117.115:31092/2: Timed out FetchRequest in flight (after 90354ms, timeout #0)
%4|1608141709.340|REQTMOUT|rdkafka#consumer-1| [thrd:10.200.116.115:31092/2]: 10.20.117.115:31092/2: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests

The thing is that bootstrap.servers parameter we are sending is 10.200.116.115:31090, 10.200.116.116:31091, 10.200.116.117:31092.
Don't know why he is trying 31092 port on 10.200.116.115.
Tip: I have slightly changed IP numbers.
Thanx a bunch!

2 replies
Luciano Carvalho
@lucianoacjunior_twitter

Hi folks!
We are facing issues when started a new producer.

Failed to create thread: Resource temporarily unavailable (11)
Failed to create thread: No error information (0)

/proc/sys/kernel/threads-max is 123497
CPU and memory are available.
We are running with 9 brokers.

Is there any way to debug this better?

2 replies
Razvan Chitu
@razvanch
Hello!
For how long will a Kafka consumer retry in case of a broker disconnect?
2 replies
elizajeanwong
@elizajeanwong
is the legacy consumer API going to be dropped from librdkafka at some point?
Magnus Edenhill
@edenhill
@elizajeanwong Yep, but not any time soon.
elizajeanwong
@elizajeanwong
@edenhill thanks
Is there a way to "reset" the group's committed offset? e.g. if I decide I want to start from 'earliest' again?
1 reply
elizajeanwong
@elizajeanwong
this is probably a silly question, but, lets say I first subscribe to a topic with auto.offset.reset = earliest and I read 5 or so messages (out of 100 total in the topic), and then commit the offset and close up. The next time I subscribe with auto.offset.reset = latest. I'm using the same group ID. Now when I try reading messages, will it resume at the 6th message? Or will it wait for the 101st message to be added to the topic?
1 reply
Steven Diviney
@divo
Hi there,
I have a small question about project history. I was wondering why librdkafka went with crc32 to determine the hash key in message partition assignment, over Java Kafka using murmur2? I'm trying to decide on one to standardize on and was just looking for any context. Thanks!
2 replies
QishunW
@QishunW
Hello~ If I have more consumers than partitions, which means that there are always some idle consumers, will it cause frequent consumer rebalancing?
3 replies
ˈt͡sɛːzaɐ̯
@julius:mtx.liftm.de
[m]
(Shouldn't but I haven't tried. Someone will give a more authoritative answer soon, I presume.)
ephisino
@ephisino
@edenhill can one use seek() on a KafkaConsumer after using subscribe() . I'd like to re-seek to different offsets during consumer lifetime (not necessarily at start.
1 reply
juan
@_danielejuan_twitter
Hello. May I ask if there is an available documentation describing how tcp connections to kafka are managed.
2 replies
gargvicky
@gargvicky
Consumer stop consuming after Broker transport failure
Each time need to change the consumer group name when the old consumer group name stops consuming data. Any fix for this?
5 replies
sarkanyi
@sarkanyi
@edenhill now that 1.6 is out, any chance for merging edenhill/librdkafka#2405 ? I'd also do later an allocator PR, most likely
3 replies
dcdsc
@dcdsc
Hi @edenhill , I am trying to build librdkafka 1.5 on RHEL 8. I am seeing built librdkafka.so.1 is not not linking to libssl, libcrypto, libz etc. Can you please tell me whether I am missing anything?
Please find the attached confiure o/p.
10 replies
dcdsc
@dcdsc
librdkafka_configure.PNG
Ken Chen
@chenziliang
hi, may i ask how to incrementally subscribe/unsubscribe (assign/un-assign) new / deleted topics in a consumer group manually and reliably (without duplicate events) ? Thanks a lot
4 replies
Adriel Velazquez
@AdrielVelazquez
Trying to manually create a bundled librdkafka for v1.6.0; however, seem to be hitting some roadblocks with credentials. Is this not something that someone can open a PR for in clonfluent-kafka-go?
1 reply
Markus Andersson
@markus8109
We are having an issue with the library crashing when it is time to refresh the OAuth Token. The assert message is "cant handle op type" have we forgot to configure something?
1 reply
Ken Chen
@chenziliang
Hi @edenhill, I am using (legacy) simple consumer interface to pull the messages. I enabled auto.commit.enable=true, auto.commit.interval.ms=5000,enable.auto.offset.store=true, offset.store.method=broker these settings, but didn't see the offsets get committed automatically during consuming the messages since I have setup rd_kafka_conf_set_offset_commit_cb and the callback never got called and the consumer always started from very beginning across restart) ? Could you please shed some light here where i can take a look ? Thanks
22 replies
juan
@_danielejuan_twitter

@edenhill Hi, may I ask if there is a way to retrieve the consumer group lag for a specific topic without actually consuming in the topic?

We are using the high level kafka consumer and we are not able to receive the stat call back if we will not call consume.

4 replies
Sean Hanson
@shanson7

@edenhill - Is there a known issue with consumers being kicked from groups when the coordinator is restarted? We recently started scheduling reboots of our kafka brokers and we see that messages are being double consumed across instances due to offset commits failing. It does something like:

broker3: Unloading group metadata for slidr with generation 2864 (kafka.coordinator.group.GroupCoordinator)
consumer1: KAFKA LOG: severity = 3 fac = FAIL event = [thrd:broker3:9092/3]: broker3:9092/3: Connect to ipv4#[redacted]:9092 failed: Connection refused
broker4: Loading group metadata for slidr with generation 2864 (kafka.coordinator.group.GroupCoordinator)
broker4:broker4:  Member slidr-input-consumer1 in group slidr has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
consumer1: LOG: severity = 4 fac = COMMITFAIL event = [thrd:main]: Offset commit (cgrp auto commit timer) failed for 20/20 partition(s) in join-state started: Local: Waiting for coordinator: logs[60]@17323721604(Local: Waiting for coordinator)... (repeats this for 2 minutes)

After 2 minutes, consumer1 reports a failed connection to broker4 and then rejoins the group, but by this point the offsets have been reconsumed by another instance

4 replies
Nuwan Karunarathna
@nuwankaru91_twitter

Hi @edenhill ,

I have producer application which connects to a cluster with 3 brokers. So i expect library to create 4 threads. But it creates additional 4 threads with name rdk:broker-1. Can you please give some insight on this? And note that i'm using Admin API methods as well for topic creation and deletion

10314 10384 pts/1 00:00:00 rdk:main
10314 10385 pts/1 00:00:00 rdk:broker-1
10314 10386 pts/1 00:00:00 rdk:broker-1
10314 10387 pts/1 00:00:00 rdk:broker-1
10314 10388 pts/1 00:00:00 rdk:broker-1
10314 10389 pts/1 00:00:00 rdk:broker0
10314 10390 pts/1 00:00:00 rdk:broker2
10314 10391 pts/1 00:00:00 rdk:broker1

Thanks!
Nuwan

5 replies