Hi everyone! Does anybody know how to interrupt librdkafka internal threads using a signal and let the application process some signal? I have daemon+worker application and kafka is used inside the worker. When daemon receives SIGTERM it sends one to all workers. The problem is when the application is consuming a topic, the signal handler inside worker is not called. However using strace I can see, that process actually receives SIGTERM and hangs inside system calls like this:
--- SIGTERM {si_signo=SIGTERM, si_code=SI_USER, si_pid=30489, si_uid=1000} ---
rt_sigreturn({mask=~[KILL TERM STOP RTMIN RT_1]}) = -1 EINTR (Interrupted system call)
futex(0x5602a96ca6bc, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME, 17, {1484151715, 231482000}, ffffffff) = -1 ETIMEDOUT (Connection timed out)
futex(0x5602a96ca690, FUTEX_WAKE_PRIVATE, 1) = 0
rt_sigprocmask(SIG_BLOCK, ~[RTMIN RT_1], ~[KILL TERM STOP RTMIN RT_1], 8) = 0
rt_sigprocmask(SIG_SETMASK, ~[KILL TERM STOP RTMIN RT_1], NULL, 8) = 0
rt_sigprocmask(SIG_BLOCK, ~[RTMIN RT_1], ~[KILL TERM STOP RTMIN RT_1], 8) = 0
rt_sigprocmask(SIG_SETMASK, ~[KILL TERM STOP RTMIN RT_1], NULL, 8) = 0
Lines after rt_sigreturn are repeated until the worker is killed after timeout.
I'm using asynchronous signal processing from php 7.1 and when I replace kafka consuming with sleep
function everything works well.
Environment: php 7.1
, php-rdkafka 3.0.0
, librdkafka 0.9.2-237-gd3dcc0 (d3dcc0198517160b9c8e374da2e963f563eb2c6f)
..................FFF........S.SS..[New Thread 0x7fffdfccf700 (LWP 20884)]
[Thread 0x7fffdfccf700 (LWP 20884) exited]
[New Thread 0x7fffdfccf700 (LWP 20886)]
[Thread 0x7fffdfccf700 (LWP 20886) exited]
.......[New Thread 0x7fffdfccf700 (LWP 20887)]
[Thread 0x7fffdfccf700 (LWP 20887) exited]
[New Thread 0x7fffdfccf700 (LWP 20888)]
[Thread 0x7fffdfccf700 (LWP 20888) exited]
...........
Program received signal SIGSEGV, Segmentation fault.
0x00007fffec678a3b in rd_kafka_new () from /lib64/librdkafka.so.1
(gdb) quit
bt
so we know what issue we ran into though. To close this off :)
// Global config settings - see: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md#global-configuration-properties
$conf = new ConsumerConf();
$conf->set("enable.auto.commit", "false");
$conf->set("socket.blocking.max.ms", 1);
$conf->set("fetch.wait.max.ms", 10);
$conf->set('metadata.broker.list', implode(',', $brokers));
$conf->set('group.id', $this->generateGroupId($topicName, $partition));
$conf->setErrorCb(function ($kafka, $err, $reason) {
printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason);
});
$topicConf = new TopicConf();
$topicConf->set('auto.offset.reset', 'smallest');
$conf->setDefaultTopicConf($topicConf);
$consumer = new HighLevelConsumer($conf);
$consumer->assign([
new TopicPartition($topicName, $partition, RD_KAFKA_OFFSET_STORED)
]);