Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
  • May 25 22:22
    GregBowyer commented #467
  • May 23 15:58
    zRedShift synchronize #472
  • May 22 16:58
    zRedShift synchronize #472
  • May 22 16:55
    zRedShift opened #472
  • May 22 16:46
    zRedShift opened #471
  • May 17 19:09
    aarashy opened #470
  • May 17 06:07
    benesch commented #457
  • May 17 06:00
    benesch commented #466
  • May 10 13:46
    scanterog commented #458
  • May 10 13:45
    scanterog commented #468
  • May 09 17:13
    duarten closed #466
  • May 05 10:56
    dtheodor opened #468
  • May 02 23:07
    GregBowyer opened #467
  • Apr 28 00:01
    CHCP commented #458
  • Apr 27 14:55
    scanterog commented #458
  • Apr 27 14:55
    scanterog commented #458
  • Apr 27 05:10
    duarten commented #466
  • Apr 27 04:20
    benesch commented #466
  • Apr 27 00:55
    duarten labeled #453
  • Apr 27 00:54
    CHCP commented #458
Nikhil Benesch
There is also the open question of fede1024/rust-rdkafka#192, though it's not blocking for 0.23
Mohammed Abubakar
make the push
Oron Sharabi
Hey, how can I use regex in order to consume from * topics please?
Thanks :)
Nate Mara
@benesch congrats on the release of 0.23! I noticed that the docs are not building https://docs.rs/crate/rdkafka/0.23.0, could you look into that?
Nikhil Benesch
Thanks, @nate-onesignal! We've finally got things sorted with the new docs.rs environment for 0.23.1: https://docs.rs/rdkafka/0.23.1/rdkafka/
@oronsh you'll need to implement such a feature yourself. You can get a list of all topics with https://docs.rs/rdkafka/0.23.1/rdkafka/client/struct.Client.html#method.fetch_metadata, and then filter for topics that match your regex, and then subscribe to those topics.
Calvin Brown
Hey all! I have a quick question, when using the MessageStream is there a way to get a batch of records per poll, instead of iteration over them as they come in 1 by one?
Nikhil Benesch
Afraid not @calvinbrown085. You’d have to implement the batching in your own code.
vijay sali

Hi folks,

I am building application with rdkafka-rust, version pinned to 0.22. I am compiling this to windows binary from linux box.
I have installed all the toolchain for windows.

Following this example

I am getting following errors during build.

error: linking with `/usr/bin/x86_64-w64-mingw32-gcc` failed: exit code: 1

= note: /home/vagrant/code/demo-example/target/x86_64-pc-windows-gnu/release/deps/demo_example-feb82719e9d890fc.demo_example.h10zmb5r-cgu.12.rcgu.o:demo_example.h:(.text+0x2bf): undefined reference to `rd_kafka_opaque'
          /home/vagrant/code/demo-example/target/x86_64-pc-windows-gnu/release/deps/demo_example-feb82719e9d890fc.demo_example.h10zmb5r-cgu.12.rcgu.o:demo_example.h:(.text+0x3d8): undefined reference to `rd_kafka_queue_get_consumer'
          /home/vagrant/code/demo-example/target/x86_64-pc-windows-gnu/release/deps/demo_example-feb82719e9d890fc.demo_example.h10zmb5r-cgu.12.rcgu.o:demo_example.h:(.text+0x43b): undefined reference to `rd_kafka_conf_set_opaque'
          /home/vagrant/code/demo-example/target/x86_64-pc-windows-gnu/release/deps/demo_example-feb82719e9d890fc.demo_example.h10zmb5r-cgu.12.rcgu.o:demo_example.h:(.text+0x454): undefined reference to `rd_kafka_conf_set_log_cb'
          /home/vagrant/code/demo-example/target/x86_64-pc-windows-gnu/release/deps/demo_example-feb82719e9d890fc.demo_example.h10zmb5r-cgu.12.rcgu.o:demo_example.h:(.text+0x46d): undefined reference to `rd_kafka_conf_set_stats_cb'

Found nothing on web, any help is appreciated :)

Nikhil Benesch
You’re mostly on your own if you’re trying to cross-compile, I’m afraid; there’s just so much voodoo involved
At the very least sure you’re using the cmake build; cross compilation with the default build system is extra unsupported
And here’s the list of environment variables that I needed to set to xcompile from macOS to Linux: https://github.com/MaterializeInc/materialize/blob/7696c8cdaba6b5d0d9225e82322d5c33fa8a09fb/bin/xcompile#L45-L58
Note that those are the variables required for a large project (Materialize) that depends on rdkafka in addition to a few other *-sys crates; not all of them are necessarily applicable to rdkafka
vijay sali
Thanks @benesch , I shall try out these.
Daniel Samson
Hey guys, I am working on a migration tool with rdkafka-rust. Is there a way to mix rdkafka-rust with rayon instead of tokio?
i can't seem to to get the await to work? any ideas?
  metrics.into_par_iter().for_each(|metric: Metric| {
            let json = serde_json::to_string(&metric);
            if let Err(e) = &json {
                log::error!("{}", e)
                // TODO: IMPORTANT: FIX the deserialization issue where everything comes back as null

            if let Ok(j) = &json {
                log::info!("{}", j);
                    producer.send(FutureRecord::to("METRICS").payload(j).key(""), 0);
Nikhil Benesch
Afraid not. Rayon is for parallelizing CPU intensive tasks; tokio is for parallelizing IO intesive tasks. (Give or take.)
If you need to mix the two worlds, you’ll probably want to use a channel. Do your CPU intensive stuff in parallel with Rayon, and then send the records to produce to a Tokio runtime over a channel.
Michal 'vorner' Vaner
I think someone experimented with futures-enabled rayon thing. Like, submitting something to run on rayon giving you a future + being able to await inside rayon and switching tasks there. But I think this didn't get over the finish line.
John Halbert
Hi all! I'm running into a linking issue with lz4. Multple functions are being redefined - once from rocksdb and once from rdkafka. Is there a way to disable inclusion of lz4 with rdkafka?
Gerard Klijs
Hi, since the default of rdkafka is now async, I guess that's how it's typically used? Asking because I'm considering making async the default for schema_registry_converter.
Adam Chalmers
Hi all, I'm a real Kafka noob. My FuturesProducer has connected to the broker, but when I try to send a message I get this:
KafkaError (Message production error: MessageTimedOut (Local: Message timed out)), OwnedMessage {
  payload: Some([214, 29, 23]), key: Some([]), 
  topic: 'auditlog.json', 
  timestamp: NotAvailable, 
  partition: -1, 
  offset: -1001, 
  headers: Some(OwnedHeaders { ptr: 0x7f84c4000b90 }) 
Two questions:
  1. Is there anything in the error message that tells me what I've done wrong?
  2. What exactly is a "key" of the FutureRecord? The docs say it's optional, but doesn't have any more detail.
aloha folks. Quick question. The API for send: https://docs.rs/rdkafka/0.24.0/rdkafka/producer/future_producer/struct.FutureProducer.html#method.send shows a return type of Result<(i32, i64), (KafkaError, OwnedMessage)>. I'm trying to decode what the two values are that are returned on success. I think the i32 is the partition? Can someone else me with what the i64 is?
Siddartha Guthikonda
Has any tried StreamConsumer with kafka-
it dies with an error
INFO [librdkafka] librdkafka: MAXPOLL [thrd:main]: GroupCoordinator/83: Broker does not support KIP-62 (requires Apache Kafka >= v0.10.1.0): consumer configuration max.poll.interval.ms (300000) is effectively limited by session.timeout.ms (6000) with this broker version
Konstantin Kirillov

Hi. I am trying to use dynamic_linking feature and as it is stated in readme that current librdkafka dependency for it is 1.4.2. But looks like it tries to find 1.5.0 version when building.

--- stderr
  librdkafka will be linked dynamically
  librdkafka 1.5.0 cannot be found on the system
  Dynamic linking failed. Exiting.

Does someone knows why is it the case?

Avinash Sharma V
Hi, does anyone know which is the latest kafka broker version supported by released version ( not master )?
Abdelmonem Mostafa
hello everyone , is there a crate for creating ActiveMq and KudeMq client like rust-rdkafka for apache kafka?
Thank you in advance.

Hi, I've open an issue about sending a FutureRecord without a key because it's not working (fede1024/rust-rdkafka#370). Inside the documentation of Kafka (Java, Python), it's possible to do it because the key is used to fine controlled partition. In my case, I prefer to have Kafka doing this management.

Anyone with the same issue or a workaround? Adding a key, it's not the solution

Robert Philipp
does anyone know whether/how to have a (rust rdkafka v0.2.6) ThreadedProducer reconnect after a network partition that exceeds 10 or 30 seconds?
hello, on mac, rust-rdkafka 0.26.0 fails to build. features ["cmake-build", "gssapi-vendored", "ssl-vendored"]
 ld: symbol(s) not found for architecture x86_64
  Undefined symbols for architecture x86_64:
    "_gssrpc_auth_debug_gssapi", referenced from:
        _main in client.o
    "_gssrpc_auth_gssapi_create_default", referenced from:
        _main in client.o
    "_gssrpc_clnt_pcreateerror", referenced from:
        _main in client.o
    "_gssrpc_clnt_perror", referenced from:
        _main in client.o
    "_gssrpc_clnt_sperror", referenced from:
        _main in client.o
    "_gssrpc_clnttcp_create", referenced from:
        _main in client.o
    "_gssrpc_clntudp_create", referenced from:
        _main in client.o
    "_gssrpc_misc_debug_gssapi", referenced from:
        _main in client.o
    "_gssrpc_svc_debug_gssapi", referenced from:
        _main in client.o
    "_gssrpc_xdr_free", referenced from:
        _main in client.o
    "_gssrpc_xdr_wrapstring", referenced from:
        _main in client.o
        _rpc_test_echo_1 in rpc_test_clnt.o
    "_krb5_gss_dbg_client_expcreds", referenced from:
        _main in client.o
  ld: symbol(s) not found for architecture x86_64
  clang: clang: errorerror: : linker command failed with exit code 1 (use -v to see invocation)
  linker command failed with exit code 1 (use -v to see invocation)
  make[2]: *** [client] Error 1
  make[2]: *** Waiting for unfinished jobs....
  make[2]: *** [server] Error 1
  make[1]: *** [all-recurse] Error 1
  make: *** [all-recurse] Error 1
we had to vendor in the dependencies on mac because the crate wasn't building previously with only cmake-build enabled (on mac). is there a nice solution to this or mac users are sol
Karsten Gebbert
Hi all! I have a question around the StreamConsumer and a particular use case I have. I would like to process messages in small batches, rather than one at a time, preferrably with manual commit control. I can imagine several ways to go about it, but they all seem a bit complex. Is there an idiomatic way this can be achieved?
Karsten Gebbert
In the end, it is actually quite simple to build with select! and sleep :)
Christian Pérez-Llamas
Hi, I have a StreamConsumer with a custom ConsumerContext where I need to create some future (invoke some async function) and wait for it to finish before returning from the post_rebalance callback. If I understand well, that callback is invoked while a poll is being executed from the Runtime context, which means that the thread handling it has a Runtime associated, but the ConsumerContext::post_rebalance is not an async function. So I'm a bit confused on how to deal with it. Any ideas on where to look at?
2 replies
Would this lib work with Redpanda?
Thijs Cadier
Hi all, long time no see! We're upgrading the rdkafka crate we're using and I'm back with a bug fix: fede1024/rust-rdkafka#407
Thijs Cadier
We're seeing a quite large memory leak in our data processors since upgrading rdkafka, debugging that. Anybody has seen anything in that direction? Maybe you @benesch?
Thijs Cadier
Ah I think we're seeing this one: fede1024/rust-rdkafka#220
Thijs Cadier
hey, I am using BaseConsumer with a simple consumer.iter() but instead of going one by one in the messages, is it possible to manage a batch of messages, like 500, do the related logic and after all done, consume another group of 500 messages? Thanks!
2 replies
my idea is to use rayon to process all the batch at one, consuming one by one is really slow ...
Spyros Roum
Hey people!
I have a question. I inherited a project which uses rdkafka and I'm a bit confused. We use BaseProducer to send things to kafka, but we never poll or flush it. Still messages arrive to kafka normally. I checked if BaseProducer implements Drop and calls flush there but as far as I understand it doesn't so I'm confused as to why this works
Mykhailo Osypov
Hi guys, is it possible to add AWS_MSK_IAM support just extending/forking this library? Or I should go over librdkafka?