Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • 05:12
    FTQ03008 commented #3023
  • 05:12
    FTQ03008 commented #3023
  • 05:09
    FTQ03008 commented #3023
  • May 14 05:58
    emretanriverdi commented #3334
  • May 13 16:15
    PrateekJoshi closed #3389
  • May 13 16:15
    PrateekJoshi commented #3389
  • May 13 15:56
    edenhill commented #3390
  • May 13 15:56

    edenhill on master

    Fix for issue #3389 (compare)

  • May 13 15:56
    edenhill closed #3390
  • May 13 14:07
    PrateekJoshi opened #3390
  • May 13 13:47
    PrateekJoshi opened #3389
  • May 12 07:14
    edenhill commented #3388
  • May 12 07:03
    chandarkumar7385 opened #3388
  • May 12 06:50
    chandarkumar7385 commented #370
  • May 11 12:55
    alejandro-colomar edited #3387
  • May 11 12:51
    edenhill commented #3386
  • May 11 12:43
    alejandro-colomar opened #3387
  • May 10 15:37
    Manicben opened #3386
  • May 10 08:48

    edenhill on v1.7.0

    (compare)

  • May 08 09:20
    KJTsanaktsidis synchronize #3180
sarkanyi
@sarkanyi
@edenhill now that 1.6 is out, any chance for merging edenhill/librdkafka#2405 ? I'd also do later an allocator PR, most likely
8 replies
dcdsc
@dcdsc
Hi @edenhill , I am trying to build librdkafka 1.5 on RHEL 8. I am seeing built librdkafka.so.1 is not not linking to libssl, libcrypto, libz etc. Can you please tell me whether I am missing anything?
Please find the attached confiure o/p.
10 replies
dcdsc
@dcdsc
librdkafka_configure.PNG
Ken Chen
@chenziliang
hi, may i ask how to incrementally subscribe/unsubscribe (assign/un-assign) new / deleted topics in a consumer group manually and reliably (without duplicate events) ? Thanks a lot
4 replies
Adriel Velazquez
@AdrielVelazquez
Trying to manually create a bundled librdkafka for v1.6.0; however, seem to be hitting some roadblocks with credentials. Is this not something that someone can open a PR for in clonfluent-kafka-go?
1 reply
Markus Andersson
@markus8109
We are having an issue with the library crashing when it is time to refresh the OAuth Token. The assert message is "cant handle op type" have we forgot to configure something?
1 reply
Ken Chen
@chenziliang
Hi @edenhill, I am using (legacy) simple consumer interface to pull the messages. I enabled auto.commit.enable=true, auto.commit.interval.ms=5000,enable.auto.offset.store=true, offset.store.method=broker these settings, but didn't see the offsets get committed automatically during consuming the messages since I have setup rd_kafka_conf_set_offset_commit_cb and the callback never got called and the consumer always started from very beginning across restart) ? Could you please shed some light here where i can take a look ? Thanks
22 replies
juan
@_danielejuan_twitter

@edenhill Hi, may I ask if there is a way to retrieve the consumer group lag for a specific topic without actually consuming in the topic?

We are using the high level kafka consumer and we are not able to receive the stat call back if we will not call consume.

4 replies
Sean Hanson
@shanson7

@edenhill - Is there a known issue with consumers being kicked from groups when the coordinator is restarted? We recently started scheduling reboots of our kafka brokers and we see that messages are being double consumed across instances due to offset commits failing. It does something like:

broker3: Unloading group metadata for slidr with generation 2864 (kafka.coordinator.group.GroupCoordinator)
consumer1: KAFKA LOG: severity = 3 fac = FAIL event = [thrd:broker3:9092/3]: broker3:9092/3: Connect to ipv4#[redacted]:9092 failed: Connection refused
broker4: Loading group metadata for slidr with generation 2864 (kafka.coordinator.group.GroupCoordinator)
broker4:broker4:  Member slidr-input-consumer1 in group slidr has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
consumer1: LOG: severity = 4 fac = COMMITFAIL event = [thrd:main]: Offset commit (cgrp auto commit timer) failed for 20/20 partition(s) in join-state started: Local: Waiting for coordinator: logs[60]@17323721604(Local: Waiting for coordinator)... (repeats this for 2 minutes)

After 2 minutes, consumer1 reports a failed connection to broker4 and then rejoins the group, but by this point the offsets have been reconsumed by another instance

4 replies
Nuwan Karunarathna
@nuwankaru91_twitter

Hi @edenhill ,

I have producer application which connects to a cluster with 3 brokers. So i expect library to create 4 threads. But it creates additional 4 threads with name rdk:broker-1. Can you please give some insight on this? And note that i'm using Admin API methods as well for topic creation and deletion

10314 10384 pts/1 00:00:00 rdk:main
10314 10385 pts/1 00:00:00 rdk:broker-1
10314 10386 pts/1 00:00:00 rdk:broker-1
10314 10387 pts/1 00:00:00 rdk:broker-1
10314 10388 pts/1 00:00:00 rdk:broker-1
10314 10389 pts/1 00:00:00 rdk:broker0
10314 10390 pts/1 00:00:00 rdk:broker2
10314 10391 pts/1 00:00:00 rdk:broker1

Thanks!
Nuwan

5 replies
Mathieu Coussens
@MrKickkiller

Looking for help on using confluent-kafka-go (which is a go client for this client) to have at least one semantics. Currently I have disabled auto commit and I only call the .CommitMessage(m) after successful processing of the received message. However, during testing I have found that it is possible that a consumer first gets an invalid message (third party api down or broken), does NOT commit, and then gets a correct message (third party working), which commits the offsets. Therefor the broken message gets skipped entirely. This feels all similar to a previous GH issue (confluentinc/confluent-kafka-go#527) which remains unanswered afaik. I have written some logic for retrying messages at a potentially later date, but now i'm having issues testing that.

Has anyone succesfully configured something like this and are they willing to share some snippets of code?

Taylor Foxhall
@hallfox
Hi there, my team and I are working on implementing the AlterConsumerGroupOffsets feature from the Java admin API. We've been trying to implement it in terms of the currently existing rd_kafka_OffsetCommitRequest function, but this currently functions by being passed a cgrp. I'm not sure an admin client would necessarily have this initialized, and it looks a little bit heavy to be creating from a group ID. Based on the implementation, it seems like the offset commit request just needs basically the details of the consumer group metadata. Would it make sense to change the OffsetCommitRequest to use the metadata object, and then fix everything up? Or does this sound unfeasible from a compatibility perspective? Thanks.
1 reply
Nuwan Karunarathna
@nuwankaru91_twitter

Hi @edenhill,

We need to bind rfkafka spawned threads to specific set of cores through our application. Does library provide any capability/callbacks to achieve this?

Thanks!
Nuwan

2 replies
Jenishk56
@Jenishk56

Hello There,

For one of the topic, I am trying to copy which is failing with the error message % Delivery failed for message: Broker: Invalid message. Tried setting up, check.crcs=trueon producer side to verify the CRC32 but no luck. Tried enabling the debug mode but no much use. Can someone help what could be the issue here ?

1 reply
gargvicky
@gargvicky
hi
Any solution for below error
rdkafka-3855ec77-513c-4f3d-a5a1-4b6db1d8545d in group has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
gargvicky
@gargvicky
Debug log shows
no broker available for coordinator query: intervaled in state query-coord
Aleh Danilovich
@MCPDanilovich
Hi all, maybe here i can get answer. I have 1 topic (100 partitions) and 100 consumers in general (5 k8s pods with 20 consumer threads with the same group.id). Consumers are devided evenly in general. But in situation then i want scale my app up to 10 pods for example, all new consumers still stay unassigned to any partition. Protocol = sticky. Is it possible to use part of consumers from all new pods ? For example 10 pods - 10 consumer threads per pod ?
Victor Rosenberg
@harley84
Hi, I’ve got a couple of questions about producing. When producing we get messages similar to this in the error_cb callback:
“<server-hostname>:9092/2: 3 request(s) timed out: disconnect (after 265245ms in state UP). Error code: -185: Local: Timed out “
  1. What exactly are the requests that are counted here? How is it related to retries?
  2. How is the time in the message measured?
  3. How come the disconnect value is so large when we use much lower message.timeout.ms and request.timeout.ms values (30 seconds and 10 seconds respectively)?
    Thanks in advance
1 reply
jlazdw
@jlazdw
Hi @edenhill,
I have some questions regarding the performance related to the different consumer APIs supported in librdkafka. In the document, it states that: rd_kafka_consume(), rd_kafka_consume_batch( ) and rd_kafka_consume_callback() provide the ascending order of performance.
(1) how much difference in terms of the performance, from rd_kafka_consume() to rd_kafka_consume_batch() to rd_kafka_consume_callback()?
jlazdw
@jlazdw
(2) I am using the cpp class called KafkaConsumer to consume the messages, and I found that the method implementation of consume (.) actually invokes the C API of rd_kafka_consume(). There is no API supported in this KafkaConsumer class that invokes rd_kafka_consume_batch() or rd_kafka_consume_callback. The so-call legacy class called "Consumer" defined in rdkafkacpp.h does provide consume_callback(), but still does not provide call to rd_kafka_consume_batch(), and this class lacks some other methods supported by KafkaConsumer class. What would be the suggestion to invoke the C API call to rd_kafka_consume_batch(), while allowing me to still leverage the C++ implementation of KafkaConsumer class? I am using librdkafka-1.5.2.
Thanks in advance.
Sean Hanson
@shanson7

While debugging lagging (but fairly idle) consumers we found the existing issue edenhill/librdkafka#2879 and think this might be the issue (combined with the reduction of default queued.max.messages.kbytes to 64MB in 1.5.0). It seems to be a compounding problem. e.g. if the queue fills, then there is a long pause where no data is consumed (1 second) and then the queue is immediately filled again with the accumulated data. This means that queued.max.messages.kbytes needs to be generously allocated to have more than 1 second of room. In fact, it likely needs quite a lot more to avoid getting caught in this feedback loop. Does this sound like a correct analysis?

Also, while looking at debug logs, I see that this debug line seems to print rd_kafka_q_len(&msetr->msetr_rkq) 2 times. Should one of these be the len of the parent queue?

2 replies
Magnus Edenhill
@edenhill
Yeah, it should be msetr_par_rkq. Good find :+1:
Sean Hanson
@shanson7
Is anyone using the new incremental rebalance feature? I'm testing it out and I've adjusted the rebalance_cb we install to call incremental_(un)assign, but it still seems like when I add a new consumer to a group that partitions are shuffled between the existing members of a group.
Sean Hanson
@shanson7
Also getting Offset commit (unassigned partitions) failed for 2/5 partition(s) in join-state wait-incr-unassign-to-complete: Broker: Specified group generation id is not valid error during rebalance (even when using the built-in rebalance callback). Will put in an issue.
6 replies
Nuwan Karunarathna
@nuwankaru91_twitter

Hi @edenhill,

I have a application which keep on producing messages in a high rate. However if there was temporary disconnection between the producer and the kafka cluster, library rejects messages with below error for couple of seconds. However after a while it continue to accept messages and writes to the cluster

"Local: Unknown partition"

Could you please help me to rectify this issue?

Thanks!
NK

4 replies
Nouman Hassan Shahid
@NoumanShahid1258
Hi I'm getting undefined symbol: _ZN7RdKafka13KafkaConsumer6createEPKNS_4ConfERNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE. I've compiled rdkafka from source on ubuntu 20.04 and using the libs in c++ code. I'm getting error on code execution.
sarkanyi
@sarkanyi
@edenhill regarding a custom allocator API, which option would you like better, option A) " return _rd_allocator.malloc(sz, _rd_allocator.opaque);" -> basically an allocator struct holding the malloc_funcs, or B) "typeof (&rd_malloc) malloc_fn = rd_malloc_hook ? rd_malloc_hook : malloc;
  • void *p = malloc_fn(sz);" - basically just function hooks, set the same way as the struct is set up in the previous, via a setter function? I want to revise my old PR and I'm curious about the preference.
1 reply
HEBIN
@hoobean1996
Hi masters, what does Broker not Coordinator for Group means? I searched for a lot, but did not find a good explanation.
we encountered this type of error when Commit.
thanks!
3 replies
Amos Bird
@amosbird
Hello! I'm confused by the so many different type of offsets in librdkafka. There are offset, stored_offset, committing_offset, committed_offset. Could anyone give a simple illustration? Thanks!
2 replies
Why do we need an offset store when we can just commit offset to kafka?
Vincent Janelle
@vjanelle
are there any issues with librdkafka caching dns lookups? If the IP address for a broker host name changes, will it look up the IP address next time it tries to connect?
1 reply
gianfrancog70
@gianfrancog70

Hello, I'm using librdkafka v. 1.5.0 64 bits with VS 2019 and facing a crash when calling rd_kafka_topic_conf_destroy and/or rd_kafka_conf_destroy.
Having these variables:

rd_kafka_conf_t m_Configuration = nullptr;
rd_kafka_topic_conf_t
m_TopicConfiguration = nullptr;
rd_kafka_topic_t m_Topic = nullptr;
rd_kafka_t
m_Producer = nullptr;

Initialized this way:

m_Configuration = rd_kafka_conf_new();

if (rd_kafka_conf_set(m_Configuration, "client.id", sClientID.c_str(),
m_cErrorString, sizeof(m_cErrorString)) != RD_KAFKA_CONF_OK)
{
fprintf(stderr, "rd_kafka_conf_set error: %s\n", m_cErrorString);
}

rd_kafka_conf_set_dr_msg_cb(m_Configuration, MessagesDeliveryCallback);

rd_kafka_conf_set(m_Configuration, "queue.buffering.max.ms", "1",
m_cErrorString, sizeof(m_cErrorString));

if (rd_kafka_conf_set(m_Configuration, "bootstrap.servers", sServersList.c_str(),
m_cErrorString, sizeof(m_cErrorString)) != RD_KAFKA_CONF_OK)
{
fprintf(stderr, "rd_kafka_conf_set error: %s\n", m_cErrorString);
}

m_TopicConfiguration = rd_kafka_topic_conf_new();

if (rd_kafka_topic_conf_set(m_TopicConfiguration, "acks", "all",
m_cErrorString, sizeof(m_cErrorString)) != RD_KAFKA_CONF_OK)
{
fprintf(stderr, "rd_kafka_topic_conf_set error: %s\n", m_cErrorString);
}

m_Producer = rd_kafka_new(RD_KAFKA_PRODUCER, m_Configuration,
m_cErrorString, sizeof(m_cErrorString));

if (m_Producer == nullptr)
{
fprintf(stderr, "rd_kafka_conf_set error: %s\n", m_cErrorString);
}

m_Topic = rd_kafka_topic_new(m_Producer, sTopicName.c_str(), m_TopicConfiguration);

Then the objects are destroyed:

if (m_Topic != nullptr)
{
rd_kafka_topic_destroy(m_Topic);
}

if (m_Producer != nullptr)
{
rd_kafka_destroy(m_Producer);
}

if (m_TopicConfiguration != nullptr)
{
rd_kafka_topic_conf_destroy(m_TopicConfiguration);
}

if (m_Configuration != nullptr)
{
rd_kafka_conf_destroy(m_Configuration);
}

What could be the problem?

4 replies
mahipal0913
@mahipal0913_gitlab
Hi, We are trying to use confluent-kafka 1.6.1 version. Based on documentation it looks like we dont need to install llibrdkafka seperately. Is it correct
7 replies
juan
@_danielejuan_twitter

Hi, we are trying to use the KafkaConsumer in librdkafka. May we ask what happens if we do not call consume after a long period of time?

I assume that the consumer will be removed from the group and remove any partition assignemnt. If so, is there a fxn or steps we could do to check any "stale" or "inactive" KafkaConsumer?

8 replies
Nick
@nick-zh
Just to be 100% sure, is my assumption correct, that seek will only change the offset of the consumer (internally, by purging and re-fetching) and will not change the commited offset that is stored on broker side?
2 replies
HEBIN
@hoobean1996
hi master, can you please take a look this issue? confluentinc/confluent-kafka-go#638
Generally, I found a issue when Consumer
-> FindCoordinator Successfully then and JoinGroup failed due to Broker is not Coordinator error
Can you give me some ideas, thanks fo much
mahipal0913
@mahipal0913
@edenhill @hoobean1996
We have upgraded to Confluent kafka 1.6.1 to fix the issue where we have some of our consumers are getting stuck and not able to consume messaages
We were being told that this issue will be fixed in confluent 1.6.1 version
Pease provide your input if you think otherwise as we are planning to deploy this change soon
Problem/Issue: Some of the partitions are getting stuck and consumer never consumes the messages from some partitions and after restart it will resume the processing of messages
Please provide me your feedback if any one had similar issue
Thanks for the help again
Pavel Alexeev aka Pahan-Hubbitus
@Hubbitus

Hello.
Could someone please help me with GSSAPI auth in kafka official docker container?
I always got error: "GSSAPI Error: Miscellaneous failure (see text) (Matching credential (kafka/ecsc00a09ead.epam.com@EPAM.COM) not found)"

Run command:

podman run --rm -it --name kafkacat-DEV \
        -v$(pwd)/conf/integration:/conf -v$(pwd)/conf/integration/krb5.conf:/etc/krb5.conf \
                localhost/kafkacat_gssapi:1 \
                kafkacat \
                        -b kafka-int.epm-eco.projects.epam.com:9095 \
                                -Xssl.ca.location=/conf/epm-eco-int.ca.crt \
                                -Xsecurity.protocol=SASL_SSL \
                                -Xsasl.mechanisms=GSSAPI \
                                '-Xsasl.kerberos.kinit.cmd=cat /conf/paswd | /usr/bin/kinit Pavel_Alexeev@PETERSBURG.EPAM.COM' \
                        -m30 -L

Where localhost/kafkacat_gssapi:1 just built from Dockerfile:

FROM docker.io/edenhill/kafkacat:1.6.0                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      
RUN apk add --no-cache cyrus-sasl cyrus-sasl-gssapiv2 krb5 openssl ca-certificates

But on my Fedora 33 box without container command:

kafkacat \
        -e -b kafka-int.epm-eco.projects.epam.com:9095 \
        -Xssl.ca.location=conf/integration/env_int_epm-eco-int.ca.crt \
        -Xsecurity.protocol=SASL_SSL \
        -Xsasl.mechanisms=GSSAPI \
        -Xsasl.kerberos.kinit.cmd="cat conf/paswd | /usr/bin/kinit Pavel_Alexeev@PETERSBURG.EPAM.COM" \
        -Xsasl.kerberos.service.name=kafka \
        -m 30 \
        -L

Works as expected.
Please help.

ˈt͡sɛːzaɐ̯
@julius:mtx.liftm.de
[m]
Btw, is there any reason that there's a copy of queue.h in src/, but the tests still include it from the system as sys/queue.h? https://github.com/edenhill/librdkafka/blob/master/tests/sockem.c#L43