Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Arnaud Le Blanc
    @arnaud-lb
    Hi @Nico_beBee_twitter
    Could you show the exact error messages ?
    Nico
    @Nico_beBee_twitter
    There are no error messages, the error handler change and warnings, deprecated notifications and similar messages from other parts of the code causes exception when a TopicPartition object is created.
    Nico
    @Nico_beBee_twitter
    For example:
    // This error not throw exception
    @trigger_error('', E_USER_DEPRECATED);
    $partition = new TopicPartition($topic, $partition, $offset);
    // This error now throw exception
    @trigger_error('', E_USER_DEPRECATED);
    Nastasia Saby
    @NastasiaSaby
    Hi everybody, can I ask you something? I'm trying to use the high level consumer with the example given in the documentation, but I have a problem.
    The KafkaConsumer seems to commit my messages automatically when doing "consume" even if I put the "auto.commit.enable" or the "enable.auto.commit" to 0.
    Is there something I do wrong? Could anyone help me?
    I am working with PHP7 and rdkafka 2.0.1.
    Magnus Edenhill
    @edenhill
    @NastasiaSaby that's enable.auto.commit (for the high level consumer), and it really should work.
    what librdkafka version?
    Nastasia Saby
    @NastasiaSaby
    @edenhill Hi, I tried it but it still does not work.
    @edenhill My configuration is : rdkafka : 2.0.1, librdkafka : 0.9, kafka : 0.10, PHP7. I can send you my code if you want. It is the "high level consumer" example "https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka.examples-high-level-consumer.html". I have only added this : $topicConf->set('enable.auto.commit', 0);
    Magnus Edenhill
    @edenhill
    @NastasiaSaby what exact librdkafka version?
    the high-level consumer uses global configuration properties ("enable.auto.commit"), not topic-level configuration ("auto.commit.enable")
    Nastasia Saby
    @NastasiaSaby

    rdkafka

    rdkafka support => enabled
    version => 2.0.1
    build date => Dec 12 2016 10:13:42
    librdkafka version (runtime) => 0.9.2-112-g7f12b7
    librdkafka version (build) => 0.9.3.0

    Magnus Edenhill
    @edenhill
    $topicConf->set("enable.auto.commit", ..) should actually fail since that is a global property
    Nastasia Saby
    @NastasiaSaby
    Ok so do I have to change my kafka config to add an "enable.auto.commit" option in the servier_properties ?
    Thank you for your quick answer by the way
    Magnus Edenhill
    @edenhill
    no, thats a global client property
    librdkafka config is split up in Global properties and per-topic per properties. The high-level consumer uses the Global properties (since you typically dont instantiate topic objects when using it)
    That doc has two sections, one for Global and one for Topic
    Nastasia Saby
    @NastasiaSaby
    Ok but where can I change this option?
    On the config of the kafkaCOnsumer ?
    Magnus Edenhill
    @edenhill
    yeah
    $conf->set("enable.auto.commit", "false")
    Nastasia Saby
    @NastasiaSaby
    It seems to work \O/ \O/. Thank you so much for your help.
    Magnus Edenhill
    @edenhill
    :+1:
    nknighter
    @nknighter
    Hello, guys! Have a question: is php-rdkafka async when working via php-fpm?
    Aaron Gong
    @ais-one
    Does it work on php5.6, 64bit linux?

    is there any code that can handle high volume on PHP? I tried ab -n 20000 -c 500 producer seems to choke on PHP, but nodejs is fine...

    the broker and consumer are all ok, on the producer... the code is...

    $my_topic = "test0";
    $brokers="192.168.48.79,192.168.48.80,192.168.48.86";
    $conf = new RdKafka\Conf();
    $rk = new RdKafka\Producer($conf);
    $rk->addBrokers($brokers);
    $topic = $rk->newTopic($my_topic);
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "$i Record Has Been Entered Here");
    while ($rk->getOutQLen() > 0) {
        $rk->poll(0);
    }
    $conf->set('internal.termination.signal', SIGIO);
    Magnus Edenhill
    @edenhill
    @ais-one you typically want to persist the producer and reuse it across multiple produce() calls. otherwise things become really slow.
    also use a proper timeout rather than zero in $rk->poll(0) or it'll busy-loop
    Aaron Gong
    @ais-one
    thanks for the advice, i will try it out, now finding out how to persist producer in php web application, in nodejs its easy because i know how to do it, but this time, i have to use php for everything
    Aaron Gong
    @ais-one
    i cant persist the producer in a php web app as these are stateless.
    Aaron Gong
    @ais-one
    @nknighter I am also interested to know if php rdkafka is async but in php cli
    al-razi
    @allyraza
    hello everyone
    just installed php-rdkafka on my macbook when I run my producer script I get a seg fault
    the producer script is from examples
    al-razi
    @allyraza
    php70, librdkafka 0.9.3, php70-rdkafka, kafka 0.10
    Craig Patrick
    @cpats007
    Hi - anyone know how I perform a simple check to see if rdkafka is connected or not?
    al-razi
    @allyraza
    channel is dead
    Aaron Gong
    @ais-one
    its holiday season
    Craig Patrick
    @cpats007
    anyone around?
    Aaron Gong
    @ais-one
    @patrick try the api to get topics as producer or as consumer
    Efimov Evgenij
    @edefimov

    Hi everyone! Does anybody know how to interrupt librdkafka internal threads using a signal and let the application process some signal? I have daemon+worker application and kafka is used inside the worker. When daemon receives SIGTERM it sends one to all workers. The problem is when the application is consuming a topic, the signal handler inside worker is not called. However using strace I can see, that process actually receives SIGTERM and hangs inside system calls like this:

    --- SIGTERM {si_signo=SIGTERM, si_code=SI_USER, si_pid=30489, si_uid=1000} ---
    rt_sigreturn({mask=~[KILL TERM STOP RTMIN RT_1]}) = -1 EINTR (Interrupted system call)
    futex(0x5602a96ca6bc, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME, 17, {1484151715, 231482000}, ffffffff) = -1 ETIMEDOUT (Connection timed out)
    futex(0x5602a96ca690, FUTEX_WAKE_PRIVATE, 1) = 0
    rt_sigprocmask(SIG_BLOCK, ~[RTMIN RT_1], ~[KILL TERM STOP RTMIN RT_1], 8) = 0
    rt_sigprocmask(SIG_SETMASK, ~[KILL TERM STOP RTMIN RT_1], NULL, 8) = 0
    rt_sigprocmask(SIG_BLOCK, ~[RTMIN RT_1], ~[KILL TERM STOP RTMIN RT_1], 8) = 0
    rt_sigprocmask(SIG_SETMASK, ~[KILL TERM STOP RTMIN RT_1], NULL, 8) = 0

    Lines after rt_sigreturn are repeated until the worker is killed after timeout.
    I'm using asynchronous signal processing from php 7.1 and when I replace kafka consuming with sleep function everything works well.

    Environment: php 7.1, php-rdkafka 3.0.0, librdkafka 0.9.2-237-gd3dcc0 (d3dcc0198517160b9c8e374da2e963f563eb2c6f)

    apocello2008
    @apocello2008
    Hi! How can i use correlation id in this lib? I need process signin over kafka, how can i do this?
    Magnus Edenhill
    @edenhill
    @apocello2008 what do you mean by process signin? client authentication?
    apocello2008
    @apocello2008
    @edenhill Yes. client send login and passwd, i process this information and make unique token and after this i need to send token to this user. But if i send this data to "consumer group" all consumers in this group get this info. Maybe i can create personal "consumer group" for every user but i think 100 000+ groups is bad idea..
    Craig Patrick
    @cpats007
    anyone have any ideas of how to improve performance? on php-fpm my app response times more than double when writing to Kafka
    Magnus Edenhill
    @edenhill
    @cpats007 measure and profile to find out what is taking time. Possibly the produce+ack. Then see queue.buffering.max.ms and socket.blocking.max.ms
    Craig Patrick
    @cpats007
    thanks @edenhill - so I'm not doing any ack at this stage - this is simply a "fire and forget" function - I just want to dump stuff into Kafka from PHP. I have altered the socket.blocking.max.ms down to 100 and still the same, plus we can't use the SIGIO stuff because it's php-fpm. I'll look at the queue.buffering.max.ms now
    it's literally logging the request information into Kafka from the app, one big JSON string, nothing complicated