Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Aug 12 19:30
    mensfeld opened #972
  • Aug 12 19:18
    mensfeld edited #956
  • Aug 12 19:17

    mensfeld on master

    increase timeout for admin oper… (compare)

  • Aug 12 19:17

    mensfeld on 964

    (compare)

  • Aug 12 19:17
    mensfeld closed #970
  • Aug 12 19:17
    mensfeld closed #964
  • Aug 12 15:27
    mensfeld synchronize #970
  • Aug 12 15:27

    mensfeld on 964

    Merge branch 'master' of github… update changelog Merge branch '964' of github.co… (compare)

  • Aug 12 15:13
    mensfeld edited #970
  • Aug 12 15:12
    mensfeld review_requested #970
  • Aug 12 15:12
    mensfeld labeled #970
  • Aug 12 15:12
    mensfeld labeled #970
  • Aug 12 15:12
    mensfeld assigned #970
  • Aug 12 15:12
    mensfeld ready_for_review #970
  • Aug 12 15:00
    mensfeld synchronize #970
  • Aug 12 15:00

    mensfeld on 964

    Fix link to Pro Subscription wi… Merge branch 'master' into 964 (compare)

  • Aug 12 14:46
    mensfeld synchronize #970
  • Aug 12 14:46

    mensfeld on 964

    compensate for slow topic creat… (compare)

  • Aug 12 14:30
    mensfeld synchronize #970
  • Aug 12 14:30

    mensfeld on 964

    different approach (compare)

Jack Wilson
@wils3005
consumer group @generation_id is not set in my console session. it being nil seems to break a lot of methods
had to run the join_group method
Maciej Mensfeld
@mensfeld
I wonder why it's not being set though
If you could come up with a repro I could patch it
Jack Wilson
@wils3005

$seed_brokers = ENV.fetch('KAFKA_BROKERS')

$client = Kafka.new($seed_brokers, ssl_ca_certs_from_system: true)

=> Kafka::Client

$cluster = $client.instance_variable_get(:@cluster)

=> Kafka::Cluster

$group_id = "MY_GROUP_ID"
$consumer = $client.consumer(group_id: $group_id)

=> Kafka::Consumer

$consumer_group = $consumer.instance_variable_get(:@group)

=> Kafka::ConsumerGroup

$topic = "MY_TOPIC"
$consumer.subscribe($topic)

yikes that didn't paste well
ok i don't know how to gitter
Maciej Mensfeld
@mensfeld
is it still the pre 1.0 ruby-kafka?
Jack Wilson
@wils3005
yea. i can't change that on the fly like this
this is a production deployment that i'm investigating via irb or pry
Maciej Mensfeld
@mensfeld
yes, production is serious stuff
Jack Wilson
@wils3005
it looks like ruby-kafka doesn't have any dependencies holding it back
so maybe i can actually upgrade that
the greater application is a rats nest of out of date dependencies
that is for someone else to deal with, currently
so i'll have to investigate that
but in the mean time, using the above client, cluster, consumer, consumer_group objects, i'm trying to mash my fingers into my keyboard until something works
even after i join group, when i look at which consumer groups have members, the offending consumer group has no members
Jack Wilson
@wils3005
which means i have a consumer object in a group object, the group object has claimed to join its group (has a generation_id), yet the group claims that consumer object is not a member
unless i'm misinterpreting their relationships
ok i see, consumer.join_group calls consumer_group.join_group among other things
Maciej Mensfeld
@mensfeld
its 11pm here, going to sleep now. Unless you fix it, drop me a line at maciej@mensfeld.pl and we can coop
Jack Wilson
@wils3005
ok! thanks for your help
Maciej Mensfeld
@mensfeld
anytime :) sorry for experiencing trouble with "my" software
Jack Wilson
@wils3005
not at all; we are almost certainly the ones at fault
by we, i mean those other guys i work with and definitely not me
chris
@_suung__twitter
thanks @mensfeld
Jack Wilson
@wils3005
@mensfeld did a production deploy today, unrelated to the problem i was having. part of the deploy involved the karafka instance moving between clusters (we happen to be amid a migration of kubernetes versions right now). for whatever reason that i definitely cannot explain, the new karafka instance in the new cluster is able to consume messages from the topic just fine
a bit scary that i don't know why it's working, but at least it's working
i get the feeling some error down the stack was being swallowed before making it to my eyes
Maciej Mensfeld
@mensfeld
@wils3005 that's why I asked in the first place if your cluster and everything is in the same place. It may be, that part of the cluster is misconfigured and there is some sort of an network (or other) issue
Jack Wilson
@wils3005
certainly a possibility but that older cluster was working prior. in any case, glad it's fixed, but sad i didn't find out why. mystery for the ages. :(
Maciej Mensfeld
@mensfeld
@wils3005 I've seen something similar years ago. Things worked until they stopped (that is until the process was stopped)
Maciej Mensfeld
@mensfeld
If anyone is interested: https://github.com/karafka/karafka/pull/668/files feel free to review and comment :)
Nawab Iqbal
@niqbal

Hi @mensfeld (This is a cross post from Slack channel, you can respond there, so that the activity gradually shifts to slack from here)

karafka/karafka#564
You say that it is not threadsafe. Can we make it threadsafe by adding https://ruby-doc.org/core-2.6/Mutex.html
Like:- (see lines which end at ## My addition )

$stats = {}

semaphore = Mutex.new ## My addition

Karafka.monitor.subscribe("backends.inline.process") do |event|  
    semaphore.synchronize {  ## My addition    
        $stats[event[:caller].topic.name] ||= 0
        $stats[event[:caller].topic.name] += event[:caller].params_batch.size
     } ## My addition
end
Maciej Mensfeld
@mensfeld
Answered here: http://karafka-io.slack.com/ :)
I did not start promoting it yet but with 2.0 we will be slowly migrating to Slack
much easier to manage and work with
AleksanderSzyszka
@AleksanderSzyszka
Hi, I have a question regarding warterdrop 2.0. There is no option to set partition_key like in earlier versions. Is it final decision? It won't be possible to set partition key in waterdrop 2.0?
20 replies
Prateek Baheti
@prateekbaheti-ms

Hello Everyone,
I am new to the karafka framework and trying to set it up on our existing rails applications that have sidekiq workers.
I have been successful in setting up consumers and running the karafka server locally bundle exec karafka s
Now I am trying to use sidekiq background workers to consume messages using the sidekiq-backend.

What I need clarification on is do we need to still run a sidekiq server along with the background sidekiq worker to consume messages ?
I have the sidekiq worker running bundle exec karafka worker and the karafka server bundle exec karafka s locally and it works fine.

Is it possible to do both in just one sidekiq worker and not have to run both ?

11 replies
Prateek Baheti
@prateekbaheti-ms
Hi folks, I have question about deploying Karafka server.
Is there a way we can health check or do a liveness probe that can check if the server is running fine. Our use case: when new code change / config change is deployed the health check is configured in our deployments to rollback to the previous commit in case of failures.
Looking for a consistent way to determine if the karafka server is healthy.
5 replies
Jérôme Prudent
@jprudent
Hello ! I have a question. Is it possible to define a consumer which constructor takes extra parameters ? Because the routing DSL seems to only allow to provide a classname, but no way to specify extra arguments for instanciation (unless I didn't see it).
Many thanks for your help
3 replies
Maciej Mensfeld
@mensfeld
@/all While I will still reply and support everyone via Gitter for a long time, If you want to reach out me much faster, feel free to join our Slack channel: https://slack.karafka.io - I will be more active there. With 2.0 I also plan to do a monthly Karafka live-coding :) (or something similar)
potasiewicz
@potasiewicz
Hi, I want to ask about the situation when partition/topic is blocked because of the message with bad data which is still retried to process by application. It's expected behaviour because we want to block it when something is wrong, but how to unblock when I recognize that message can be skipped. Should I just bump the partition offset or there some more convenient way in this case?
Thanks in advance for all help :)
Maciej Mensfeld
@mensfeld
@potasiewicz please move to https://slack.karafka.io - this gitter is no longer in active use
1 reply
Gabriela Guamán
@gabygm
Hi everyone, I would like to ask something, in my project we have a business requirement that we need to consume messages from a topic every hour, is it possible to consume every period of time in karafka?, please I would like to hear your opinions
1 reply
Phan Nghia
@phannghia-eh
Hi guys, is there anyone faced issue like. When i start the karafka server for awhile, it will raise error Timed out while trying to connect {brokers}. Connection timed out. But, if i start it manually still on that pod, it run normally.
1 reply
Maciej Mensfeld
@mensfeld
ANY QUESTIONS OR ANYTHING ELSE: please move to https://slack.karafka.io - this gitter is no longer in active use
korrawee
@korrawee
Hi there, I have an error after installing karafka gem and running bundle exec karafka install then I get this error bundler: failed to load command: karafka (/Users/****/.rbenv/versions/3.1.0/bin/karafka)
2 replies
Maciej Mensfeld
@mensfeld
ANY QUESTIONS OR ANYTHING ELSE: please move to https://slack.karafka.io - this gitter is no longer in active use