Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Nick
    @nick-zh
    yeah so totally agreeing what steve said.
    If you need global ordering, don't use multiple partitions.
    Let's say you broadcast an entity, you still can use multiple partitions though, messages with the same key, still go into the same partition. This way updates related to each other are still ordered. In most cases that is enough
    Paweł Niedzielski
    @steveb_gitlab
    I thought key can be used for "selecting" partition, but wasn't sure :)
    Nick
    @nick-zh
    Yeah so the default partitioner will use the key to decide partitioning (same key, same partition), alternatively you can write your own partitioner, since you also can pass a target partition for producev
    Paweł Niedzielski
    @steveb_gitlab
    Default partitioner is good enough in majority of cases though, as this makes your application not reliant on a particular number of partitions.
    If someone runs into an issue with having not enough partitions, he/she can simply increase their count in Kafka without touching application code
    Nick
    @nick-zh
    adding a partition can result in new messages with the same key going into a different partition though, so this isn't as flexible as well ;)
    Paweł Niedzielski
    @steveb_gitlab
    doesn't it trigger Kafka rebalancing at the same time, that will eventually move all the messages with that key to new partition?
    oh, it doesn't :)
    Nick
    @nick-zh
    no, so basically if you change partition number you need to migrate your topic manually if you care about order :)
    timur-bioiq
    @timur-bioiq
    @steveb_gitlab Thank you for the response!
    What if I use as a kay an order number with an intent for all specific order modifications to go to the same partition. This will result in a large number of keys. Pretty much one key per each order. Will this impact performance? How does kafka remember where each key belong? Does it index keys? Does it keep track of them somewhere else?
    timur-bioiq
    @timur-bioiq
    @nick-zh thank you as well
    Paweł Niedzielski
    @steveb_gitlab

    @timur-bioiq default partitioner takes key and uses it to determine which partition it should go to (roughly randomly, which will result in equal load and each and every partition). The amount of keys overall doesn't really matter, because Kafka doesn't "index" by keys, so their count doesn't affect performance.

    This is similar to how hashing functions work. For the same input they will always produce the same result. Keys in this default partitioner works similarly. Consider partitioner working like this:

    <?php
    $partitionCount = 4;
    $key = 1;
    $hash = md5($key);
    $hashAsInt = parseInt($hash, 16);
    $partition = ($hashAsInt % 4) + 1;

    Key always produces the same result, so $partition will always be the same - in the sample, some partition 1 - 4.

    Going by this, using an order ID (whether it's a number or string) would be perfectly fine. Messages related to the same order will arrive in the same partition, and will be stored in order of their arrival.
    timur-bioiq
    @timur-bioiq

    @steveb_gitlab Thank you so much for a comprehensive answer, that make sense now. I thought it would go and look for that key in the whole partition. I am familiar with how md5 hash works, so your answer make total sense.

    That brings me to another question. I am working right now with orders that will be published. I do want to use more than 1 partition for scalability reasons just in case we will need to move them to another server. Let's say 4 partitions. One of the requirements that I have is for other systems to be able to replay from the start and consume all orders. I am planning to publish all new oder information into the queue and all the modifications to it as well, like shipments. I don't know how to design it in a way that would make it possible to replay all the history information. Meaning what would be the consumer implementation of it. What is the best practices for this?

    Paweł Niedzielski
    @steveb_gitlab
    @timur-bioiq replay is actually quite simple - familiarize yourself with Consumer Groups.
    When a consumer process connects to a Kafka broker, it will present itself with a string that identifies which Consumer Group it belongs to. Broker assigns partitions from a topic to every process with the same consumer group (that's the scalability). So assuming you have 4 partitions, once first process connects it will be assigned to all 4 until another process connects with the same Consumer Group name. Then, some partitions will be assigned to the first, and some to the second.
    Now, what happens when a new Consumer Group name is presented? This depends on a configuration setting (don't remember exactly, but it gets values like earliest, latest and similar) that you pass to consumer (new Conf()). If it's set to earliest or beginning, then consumption will start from the very beginning. For latest, you will receive only messages that arrive after the new Consumer Group consumer connected.
    This ONLY happens if it's a new Consumer Group. Existing consumer groups will use offset that is already stored on the broker. How long offset is "alive" can be configured (by default 7 days if I remember correctly).
    Do note that this makes log "heavy", since you have practically every event stored in it. Sometimes Kafka logs (which are what you're consuming / producing to) can be compacted, but this is something that you should read on about. Maybe Nick could share some of his experiences about this, since I've mainly used Kafka for things that I could easily discard, so I don't feel comfortable giving advice.
    Nick
    @nick-zh
    So basically this is a document message (entity), for that usecase we currently use a topic with compaction enabled with infinite retention. In short this means: if you have 10 updates for the same order, at some point only the newest state will reside in kafka, the others will get compacted away.
    For us this is ok, if you need the full history, adjust accordingly.
    Regarding replay, so as Steve elaborated, the same consumer group will continue where it left of, so i see two options to replay:
    • use a new consumer group
    • use assign instead of subscribe where you will be able to start from a specific point
      Hope this helps
    iahoorai
    @iahoorai
    anyone know why where i run the command pecl install rdkafka I get this error
    checking for librdkafka version... configure: error: librdkafka version 0.11.0 or greater required. ERROR: `/tmp/pear/temp/rdkafka/configure --with-php-config=/usr/bin/php-config' failed
    installing it for php5.6
    Paweł Niedzielski
    @steveb_gitlab
    @iahoorai install specifically rdkafka at version 3.x
    or even better, update your system librdkafka library
    0.11.0 is OLD
    Paweł Niedzielski
    @steveb_gitlab
    since 0.11.6 librdkafka received a major rewrite and while in terms of API it works the same (or is expanded) it definitely has a lot of internal bugs solved.
    Nick
    @nick-zh
    very sound advice of @steveb_gitlab as always. I suggest upgrading to librdkafka:1.5 to have the full set of features in the ext. Also i would advice to upgrade php itself, as support for PHP5 will be dropped in the next major
    iahoorai
    @iahoorai
    @steveb_gitlab how do I do that, all I'm doing is running apt-get install librdkafka1 librdkafka-dev which i'm not specifically mentioning any version
    @nick-zh i'm getting the same issue when using php 7.1
    btw, ty for responses
    Paweł Niedzielski
    @steveb_gitlab

    @iahoorai follow instructions here: https://github.com/edenhill/librdkafka#installation
    "On Debian and Ubuntu, install librdkafka from the Confluent APT repositories, see instructions here and then install librdkafka:

    $ apt install librdkafka-dev
    "

    Note that you have to use Confluent's APT repository (!)

    Ubuntu / debian are wayyy out of date.

    You're getting the same error on php7.1, because librdkafka (client library for kafka written in C) is a system-wide library that this extension uses. That's why pecl is complaining when compiling it.

    iahoorai
    @iahoorai
    @steveb_gitlab thanks for your help, yes I had to do the confluent.d installation before running pecl. it works now.
    Paweł Niedzielski
    @steveb_gitlab
    :thumbsup:
    Kirzilla
    @Kirzilla
    Hello everyone! Sometimes I get a strange error message Local: Invalid argument or configuration when calling $topic->consumeStop($partitionId). $partitionId is a valid partition number and before calling consumeStop many messages are consumed successfully from this partition by this script. I've tried to look at librdkakfa sources to find out when this error message is thrown, but my C knowledge is too poor. Thanks in advance!
    Nick
    @nick-zh
    @Kirzilla heya, make sure you have an updated librdkafka version, also i advise against the low level consumer
    Kirzilla
    @Kirzilla

    @Kirzilla heya, make sure you have an updated librdkafka version, also i advise against the low level consumer

    Thank you for advise. Our consuming strategy is following: we have master process that accepts lists of kafka topics and number of processes that should be spawned to consume all messages from this topics. For example, master process recieves list of two topics. Them master investigates that each topic have 10 partitions. Next those twenty topic+partition pairs should be splitted among 4 consumer processes, so every consumer will consume 5 topic+partition pairs. Is it possible to implement this kind of consuming using high level consumer?

    Paweł Niedzielski
    @steveb_gitlab
    @Kirzilla it's exactly the kind of workload that high level consumers are good for ;)
    Nick
    @nick-zh
    @Kirzilla so what @steveb_gitlab said. With high level consumer you can just spawn as many tasks as you want and it will balance itself (communicating with the broker) to distribute the load evenly among the number of tasks. Even if a new task joins / drops out, a rebalance will take place to adjust. This will save you a lot of work and make your code / application a lot more flexible
    Kirzilla
    @Kirzilla

    @steveb_gitlab @nick-zh Can you please explain a little more... You're saying that HL consumer can "spawn" tasks, but what do you mean saying "spawn tasks"?

    For example, In kafka I have 2 topics, 10 partitions per each topic. We want to consume it all using 3 consumer php processes. So, I'm running a master php process witch splits pairs of topics and partitions (creates "consuming plan"). Then my master process forks three children. For consuming, each children should run code like this:

    $conf->set('group.id', 'myConsumerGroup');
    $conf->set('metadata.broker.list', 'node1,node2,node3');
    $conf->set('auto.offset.reset', 'earliest');
    
    $consumer = new RdKafka\KafkaConsumer($conf);
    
    $topicPartitions = [];
    //$assignedTopicsPartitions is list of topic+partition pairs assigned to child by master
    foreach($assignedTopicsPartitions as $topic => $partitions) {
        foreach($partitions as $partition) {
            $tp = new RdKafka\TopicPartition($topic, $partition);
            //$tp->setOffset() Should we call this method or offset will be set automatically 
            //                 according to consumer group offset when consuming starts?
            $topicPartitions[] = $tp;
        }
    }
    $consumer->assign($topicPartitions);
    
    while(true) {
          $message = $consumer->consume(500);
          ...
    }

    Is it ok or there is more elegant way of doing it? How "spawn tasks" relates to this?
    Thank you in advance!

    Nick
    @nick-zh
    I think there was a misunderstanding, you still need to start the amount of tasks yourself, but you don't have to manually assign partitions or anything. The load (amount of partitions) will be balanced automatically to be evenly distributed amongst your tasks
    don't use assing, just use subscribe and the topics you want to consume from
    Kirzilla
    @Kirzilla

    @nick-zh Just amazing! Didn't know that php-rdkafka can do it. Thank you! But there are still some questions (sorry for boring you)...

    • Where rdkafka stores current consumer's state/distribution for HL consuming? In kafka or locally?
    • If it stores current state in Kafka then it is possible to do multi-server consuming - e.g. ten cosnumers on one server, ten consumers on another and all of them are consuming different partitions of the same topic.
    • If it stores locally then how - file/shared memory etc.?

    Thank you in advance.

    Nick
    @nick-zh
    @Kirzilla yes, partition assignment is stored on Kafka side, max. amount of consumer tasks per consumer group = amount of partitions. and yes different servers will serve different partitions, this is how kafka does it :D
    Kirzilla
    @Kirzilla
    @nick-zh Thank you. What if one of the consumer processes died unexpectedly (like SIGKILL or server shutdown or network lag)? How another consumer process (maybe even on another server) will understand that rebalance is required? Thanks!
    Kirzilla
    @Kirzilla
    @nick-zh saying "consumer process" I meant "HL consumer process"
    Nick
    @nick-zh
    @Kirzilla yes totally, after a certain amount of time, when a consumer process doesn't "check in" , it will be kicked out of the group and trigger a rebalance :D
    Kirzilla
    @Kirzilla
    @nick-zh Thank you a lot! It's a pity that HL features are so poorely documented. :(
    Nick
    @nick-zh
    well this is actually not specific to our extension but to Kafka. Confluent offers a lot of resources (video, free books, etc.)
    I am in the process though to add more resources (general and php relevant), i am also planning a video series to help get into the matter faster :D
    so keep an eye out ;)
    Nikita
    @NikDevPHP
    Hi there, can someone look into this arnaud-lb/php-rdkafka#395
    Is this issue related for this repository or I missed something...
    Paweł Niedzielski
    @steveb_gitlab
    @NikDevPHP responded
    Grégoire Marchal
    @Gregoire-M
    Hello, I'm working with your lib to consume messages from a Kafka topic, using low level consumer. I have a question about acknowledgement, I can't find the answer in your doc. Is there a way in a consumer to mark a message as "not acknowledged" so I can retry it later?
    10 replies