Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Nick
    @nick-zh
    @Kirzilla yes totally, after a certain amount of time, when a consumer process doesn't "check in" , it will be kicked out of the group and trigger a rebalance :D
    Kirzilla
    @Kirzilla
    @nick-zh Thank you a lot! It's a pity that HL features are so poorely documented. :(
    Nick
    @nick-zh
    well this is actually not specific to our extension but to Kafka. Confluent offers a lot of resources (video, free books, etc.)
    I am in the process though to add more resources (general and php relevant), i am also planning a video series to help get into the matter faster :D
    so keep an eye out ;)
    Nikita
    @NikDevPHP
    Hi there, can someone look into this arnaud-lb/php-rdkafka#395
    Is this issue related for this repository or I missed something...
    Paweł Niedzielski
    @steveb_gitlab
    @NikDevPHP responded
    Grégoire Marchal
    @Gregoire-M
    Hello, I'm working with your lib to consume messages from a Kafka topic, using low level consumer. I have a question about acknowledgement, I can't find the answer in your doc. Is there a way in a consumer to mark a message as "not acknowledged" so I can retry it later?
    11 replies
    Vladyslav
    @chelsEg

    Hi there, I have some question.
    I have project on Laravel and use rdkafka to push messages into Kafka.
    Project it's something like API. One of the API method push data into Kafka.
    For using rdkafka I'm created ServiceProvider and bind Producer class.
    But in my monitoring Grafana dashboard I see on php-fpm nods many Processes Forks. When I drop connection to Kafka Processes Forks is decrease.
    Why is happens?

    PHP version: 7.4.11
    librdkafka version: 1.6.0.0
    php-rdkafka version: 4.0.4
    kafka version: 2.6.0

    8 replies
    Marios Kamperis
    @marioskamperis

    Hello there!

    I just wanted to ask if there is a way to provide ssl kafka config:
    ssl.ca.location
    ssl.certificate.location
    ssl.key.location
    With the actual keys rather than their path location (I am retrieving at runtime from AWS Secrets Manager)

    1 reply
    Adam Tester
    @adamtester

    Hey has anyone got this working on alpine?

    FROM php:7.4-alpine
    RUN apk --update add autoconf gcc make g++ zlib-dev librdkafka
    RUN pecl install rdkafka

    I get

    checking for rdkafka support... yes, shared
    checking for librdkafka/rdkafka.h" in default path... not found
    1 reply
    Adam Tester
    @adamtester
    Ignore me! Not enough coffee!
    librdkafka-dev
    Geoffrey Bachelet
    @ubermuda

    Hello there! I'm having trouble setting up php-rdkafka to connect to a managed kafka with SSL. Producing and consuming seems to work fine with kafkacat, but using the same configuration with php-rdkafka I this:

    %7|1605336751.447|CONNECT|rdkafka#producer-1| [thrd:app]: Not selecting any broker for cluster connection: still suppressed for 49ms: leader query
    %7|1605336751.447|CONNECT|rdkafka#producer-1| [thrd:app]: Not selecting any broker for cluster connection: still suppressed for 49ms: leader query
    %7|1605336751.447|METADATA|rdkafka#producer-1| [thrd:app]: Skipping metadata refresh of 1 topic(s): no usable brokers

    It doesn't even look like it's specifically SSL's fault, but I managed to get it working on a local kafka without SSL.
    I'm using php-rdkafka 4.0.4 with librdkafka 1.4.4 on alpine
    (Note: I'm completely new to kafka so not sure what info I can provide to help)

    1 reply
    prateekj18
    @prateekj18
    Hi how can i connect msk kafka service with ssl , I am using weiboad/kafka-php php library
    4 replies
    Marc
    @mjunyent

    Hello!
    I'm trying to produce messages on a topic with SSL authentication without luck.
    I currently have it working with python with this code:

    producer = KafkaProducer(
      bootstrap_servers=['server1:9094','server2:9094','server3:9094'],
                          security_protocol='SSL',
                          ssl_check_hostname=True,
                          ssl_certfile='dev.crt',
                          ssl_keyfile='dev.key',
                          value_serializer=lambda m: json.dumps(m).encode('utf-8'),
                          key_serializer=lambda m: str.encode(m))
    
    future = producer.send(topic, key=key, value=data)

    I translated this to this php code:

    use RdKafka\Conf;
    use RdKafka\Producer;
    
    
    $conf = new Conf();
    $conf->set('log_level', LOG_DEBUG);
    $conf->set('debug', 'all');
    $conf->set('metadata.broker.list', 'server1:9094,server2:9094,server3:9094');
    $conf->set('security.protocol', 'ssl');
    $conf->set('ssl.key.location', 'dev.key');
    $conf->set('ssl.certificate.location', 'dev.crt');
    
    $producer = new Producer($conf);
    $topic = $producer->newTopic("topic");
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, $payload, $key);
    $producer->poll(0);
    $result = $producer->flush(10000);

    but I get a -185 code always.
    I tried changing several parameters (setting ssl.endpoint.identification.algorithm to https, using api.version.request true, and others). But the result is always the same.
    When I do the newTopic always get a message like:
    Skipping metadata refresh of 1 topic(s): no usable brokers
    in the log.

    I've also tried several installs of rdkafka in case ssl was not enabled.

    The code in python works and I'm out of ideas what configuration might be different in the two setups, any idea?

    6 replies
    prateekj18
    @prateekj18
    Hi i am getting repeated data when i consume message from Kafka topic. Please help me
    Paweł Niedzielski
    @steveb_gitlab
    @prateekj18 in 99% of cases like this you're not setting a consumer group, which means consumers start from the beginning of topic each time they start.
    Serghei Luchianenco
    @luchianenco

    Hi everyone,
    i have found out in logs of php application these 2 errors happing quite often and these 2 errors go always together. But we still get the messages in Kafka topics and the messages landing in DB.

    "%3|1607515969.026|ERROR|rdkafka#producer-47| [thrd:sasl_ssl://xxxxxxxx]: sasl_ssl://xxxxxxx: SASL authentication error: SaslAuthenticateRequest failed: Local: Broker handle destroyed (after 0ms in state DOWN)"
    "%3|1607515969.026|FAIL|rdkafka#producer-47| [thrd:sasl_ssl://xxxxxxxxx]: sasl_ssl://xxxxxxx: SASL authentication error: SaslAuthenticateRequest failed: Local: Broker handle destroyed (after 0ms in state DOWN)"

    The only thing that is changing is the producer number. Can anyone get any hint why this happens. Thanks

    3 replies
    Nick Georgiadis
    @nsgeorgi
    Hi guys do anyone of you know which config variable maps to SSL_TRUSTSTORE_LOCATION?
    2 replies
    yang
    @andresribeiro1996
    hello, i have been having a segmentation fault when i instantiate a producer and assign it to variable, anyone with the same problem?
    8 replies
    yang
    @andresribeiro1996
    image.png
    in app.php its starts a producer
    yang
    @andresribeiro1996
    image.png
    Harleyxu
    @Harleyxu
    hello
    Please ask, {"err":-195,"topic_name":null,"timestamp":-1,"partition":0,"payload":"GroupCoordinator response error: Local: Broker transport failure","len": 64,"key":null,"offset":0,"headers":null} What caused it
    1 reply
    Jordan Dobrev
    @jordandobrev
    Hey guys, I'm struggling to figure out how to avoid message duplication. If atm sth happens to the final DB::commit(), my event messages will be pushed again to Kafka:
    
    
    $config = new Conf();
    
    $config->set('log_level', (string) LOG_DEBUG);
    $config->set('debug', 'all');
    $config->set('enable.idempotence', 'true'); // not sure I need that when using txn
    $config->set('metadata.broker.list', 'kafka:9092');
    $config->set('transactional.id', 'myApp');
    
    // This will set Event's reference in DB ( topic::offset )
    $config->setDrMsgCb([$this->messageDeliveryCallback, 'handle']);
    
    $this->producer = new Producer($config);
    
    $producerTopic = $this->producer->newTopic('my-topic');
    
    $this->eventProducer = new EventProducer(
        $producerTopic,
        new EventSerializer()
    );
    
    $timeout = 10000; // 10 seconds
    $producerTxnActive = false;
    
    $this->producer->initTransactions($timeout);
    
    while (true) {
        try {
            DB::beginTransaction();
    
            $this->producer->beginTransaction();
    
            $producerTxnActive = true;
    
            Event::whereNull('reference')
                ->orderBy('id')
                ->take(100)
                ->each(
                    function ($event) {
                        $this->eventProducer->produce($event);
                        $this->producer->poll(0);
                    }
                );
    
            $result = $this->producer->flush($timeout);
    
            if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
                throw new RuntimeException('Was unable to flush, messages might be lost!');
            }
    
            $this->producer->commitTransaction($timeout);
    
            $producerTxnActive = false;
    
            throw new Error('Something happes here!');
    
            DB::commit();
        } catch (Throwable $e) {
            echo $e->getMessage();
            report($e);
            DB::rollBack();
    
            if ($producerTxnActive) {
                $this->producer->abortTransaction(10000);
            }
    
            throw $e;
        }
    
        sleep(5);
    }
    32 replies
    Nick Georgiadis
    @nsgeorgi
    Hi guys, Im getting the below error when trying to produce a message to kafka: "ssl.ca.location failed: by_file.c:253: error:0B084002:x509 certificate routines:X509_load_cert_crl_file:system lib:"
    using ssl
    any ideas whats wrong?
    1 reply
    Nick Georgiadis
    @nsgeorgi
    I fixed the previous error but now Im getting a new error "client authentication might be required"
    Im getting this cause in my kafka broker I configured "ssl_client_authentication=required"
    what do I have to change on the client side?
    2 replies
    Nick Georgiadis
    @nsgeorgi
    Also I keep getting this error everytime I produce a message " [error] 83#83: *1 FastCGI sent in stderr: "PHP message: PHP Deprecated: Function RdKafka\Conf::setDefaultTopicConf() is deprecated"
    Any work around?
    3 replies
    joeprintfffffff
    @joeprintfffffff
    Hi everyone, I went through documents. Howver, I was not able to fetch more than 1 message from a partition. All I want is to be able to fetch specific numbero of messages.
    For example fetch 100 messages and then doing somthing in db
    JIMITY
    @JIMITY
    1 reply
    abdo1
    @abdo1
    How i retry failed producing messages?
    1 reply
    Andrew
    @andrewmy

    Hey there, I got this in my container:

    # php -i | grep kafka
    rdkafka
    rdkafka support => enabled
    librdkafka version (runtime) => 0.11.6
    librdkafka version (build) => 1.5.0.255

    How is it even possible? It's a problem because enqueue wants v1+ in runtime

    Installing as:

    RUN apt-get update \
        && apt-get install -y \
            gnupg \
            wget \
        && echo "deb http://security.debian.org/debian-security jessie/updates main" >> /etc/apt/sources.list \
        && echo "deb [arch=amd64] https://packages.confluent.io/deb/6.0 stable main" >> /etc/apt/sources.list \
        && wget -qO - https://packages.confluent.io/deb/6.0/archive.key | apt-key add - \
        && apt-get update \
        && apt-get install -y \
            build-essential \
            libtool \
            autoconf \
            unzip \
            libcurl4-openssl-dev \
            pkg-config \
            libssl-dev \
            librdkafka-dev \
        && pecl install rdkafka
    14 replies
    Arun Kolhapur
    @arrowak

    Before the producer pushes the new set of messages to a topic, I need to keep reference to the last offset of each partition. Is there a way to get last offset of a partition?

    For some reason getOffsetPositions() is giving me -1001 as the result. Whereas, the actual offset is 289736.

    6 replies
    Arnaud Le Blanc
    @arnaud-lb

    Released php-rdkafka 5.0.0. This version adds support for PHP 8, and has a single breaking change: PHP 5 support has been removed.

    https://github.com/arnaud-lb/php-rdkafka

    Andrew
    @andrewmy
    [wrote stuff here, all wrong, nvm]
    Andrew
    @andrewmy

    At first I thought it was a regression and only now I realized I had a version between 3.0 and 3.1.

    Anyway, the message headers are deserialized to all strings when they can also be ints and nulls. E.g.:

    RdKafka\Message {#485
      +err: 0
      +topic_name: "messages"
      +timestamp: 1610657820673
      +partition: 0
      +payload: "{"body":"some_body","properties":[],"headers":{"Content-Type":"application\/json","name":"some_name","id":"8d2977e3-695f-45e9-a09f-f1541db273da","timestamp":1610657820,"retry_count":null}}"
      +len: 188
      +key: null
      +offset: 2
      +headers: array:5 [
        "Content-Type" => "application/json"
        "name" => "some_name"
        "id" => "8d2977e3-695f-45e9-a09f-f1541db273da"
        "timestamp" => "1610657820"
        "retry_count" => ""
      ]
      errstr: "Success"
    }

    See timestamp and retry_count

    Andrew
    @andrewmy
    Got you a failing test case: arnaud-lb/php-rdkafka#439
    aand18
    @aand18
    hello! v5.0.0 DLL is missing from the Windows PECL site at https://pecl.php.net/package/rdkafka
    any idea if it's going to be published soon?
    aand18
    @aand18

    or any pointer on how to compile from source?
    I did

    C:\php-sdk\phpdev\vs16\x64\php-8.0.1-src
    $ configure --disable-all --enable-cli --with-rdkafka

    I've got this warning:

    ...
    Enabling extension ext\standard
    Checking for library librdkafka.lib ... <not found>
    WARNING: rdkafka not enabled; libraries and headers not found
    ...
    aand18
    @aand18
    I think there's something wrong with this release, I've opened issue #440
    xxm404
    @xxm404
    I want to store the message offset in a local array and manually trigger commit. When there is only one consumer, it runs normally. When two consumers trigger rebalance, how can I clear the offset information stored locally?
    Nekrasov Ilya
    @arrilot
    Hi all, a little question please. If I plan to produce several messages to one (or several diffrent topics) during one php-fpm request which option should I pick?
    • register KafkaProducer in service container as a singleton.
    • register a bunch of KafkaProducers in a service container as singletons. One producer for each topic.
    • create the new KafkaProducer instance each time I produce a new batch of events/messages without messing with singletons at all
    Paweł Niedzielski
    @steveb_gitlab
    @arrilot unless you need to create producers dynamically, declare each producer as a separate service. That would be a general rule of thumb.
    I'm unsure if they will re-use broker connection, I'll need to look that up in extension source code actually. @nick-zh ?
    4 replies