Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • 14:12
    dcsg opened #1012
  • Jan 25 11:23
    stale[bot] labeled #1001
  • Jan 25 11:23
    stale[bot] commented #1001
  • Jan 22 13:34
    stale[bot] labeled #1000
  • Jan 22 13:34
    stale[bot] commented #1000
  • Jan 19 01:24
    denheck commented #284
  • Jan 18 13:52
    makasim closed #992
  • Jan 17 15:48
    stale[bot] closed #996
  • Jan 16 12:44
    stale[bot] closed #995
  • Jan 15 20:28
    deguif opened #1011
  • Jan 14 10:41
  • Jan 14 10:30
    BenoitLeveque synchronize #992
  • Jan 14 10:16
  • Jan 14 10:05
    BenoitLeveque synchronize #992
  • Jan 13 13:21
    wirwolf commented #1010
  • Jan 13 13:21
    Steveb-p commented #1010
  • Jan 13 13:19
    makasim commented #1010
  • Jan 13 12:57
    wirwolf opened #1010
  • Jan 10 15:16
    stale[bot] labeled #996
  • Jan 10 15:16
    stale[bot] commented #996
rscipien
@rscipien
consume method
try {
            $subscriptionConsumer = $this->interopContext->createSubscriptionConsumer();
        } catch (SubscriptionConsumerNotSupportedException $e) {
            $subscriptionConsumer = $this->fallbackSubscriptionConsumer;
        }
but in STOMP context
 public function createSubscriptionConsumer(): SubscriptionConsumer
    {
        throw SubscriptionConsumerNotSupportedException::providerDoestNotSupportIt();
    }
so I alwasy get fallback
I run bin/console enqueue:consume name-of-the-queue
rscipien
@rscipien
I can see that consumer is attatched to that queue
but message is not passed to class which should use it
class implements Processor, TopicSubscriberInterface
I don't know if it's connected fallback mode and consumption of message
rscipien
@rscipien
I am suspecting this code becouse my message it taken from name-of-the-queue but then passed to other queue default
Doqnach
@Doqnach
dunno about ActiveMQ, but RabbitMQ has exchanges and queues that are linked by bindings
a consumer opens a channel to a queue and reads everything in it
a publisher pushes to an exchange, and basic on the routingKey (topic) that is defined by the bindings it ends up in a specific queue
so you don't filter when reading from the queue: you filter when putting it into the queue, is how I think it works
rscipien
@rscipien
so in RabbitMQ how you handle consumed messages. Not like that
<?php
use Interop\Queue\Message;
use Interop\Queue\Context;
use Interop\Queue\Processor;
use Enqueue\Client\TopicSubscriberInterface;

class FooProcessor implements Processor, TopicSubscriberInterface
{
    public function process(Message $message, Context $session)
    {
        echo $message->getBody();

        return self::ACK;
        // return self::REJECT; // when the message is broken
        // return self::REQUEUE; // the message is fine but you want to postpone processing
    }

    public static function getSubscribedTopics()
    {
        return ['aFooTopic'];
    }
}
i can see that message was taken from queue but not passed to process method
rscipien
@rscipien
could someone check what processor you get in
Enqueue\Consumption\QueueConsumer
consume
in $callback
            $messageReceived = new MessageReceived($this->interopContext, $consumer, $message, $processor, $receivedAt, $this->logger);
            $extension->onMessageReceived($messageReceived);
            $result = $messageReceived->getResult();
            $processor = $messageReceived->getProcessor();
            if (null === $result) {
                try {
                    $result = $processor->process($message, $this->interopContext);
                } catch (\Exception $e) {
                    $result = $this->onProcessorException($extension, $consumer, $message, $e, $receivedAt);
                }
            }
whene i go into it process method
DelegateProcessor
 public function process(InteropMessage $message, Context $context)
    {
        $processorName = $message->getProperty(Config::PROCESSOR);
        if (false == $processorName) {
            throw new \LogicException(sprintf(
                'Got message without required parameter: "%s"',
                Config::PROCESSOR
            ));
        }

        return $this->registry->get($processorName)->process($message, $context);
    }
what process name i should got in
$processorName = $message->getProperty(Config::PROCESSOR);
should i get the class that implements Processor, TopicSubscriberInterface
the message handler
becouse now i got
Enqueue\Client\RouterProcessor
which resend message to default queue
rscipien
@rscipien
Hello all, it is by design that consumed messages go to default_queue. I consume messages from queueA. This message is taken from queueA move to defaul with right processor name
set in the header
can i use the same queue to by router_queue and default_queue
rscipien
@rscipien
it is connect with cli command
bin/console enqueue:consume
Poltynik
@Poltynik
hi, I face a problem of handling errors if broker name can't be resolved - 1573035921.906|FAIL|rdkafka#consumer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Failed to resolve 'kafka:9092':Name does not resolve. I've sold this problem, but the question is - how can I log this? Logger extension is useless in this case. Any ideas?
Paweł Niedzielski
@steveb_gitlab
@Poltynik could you open an issue in the repository? I'd like to document this error handling properly with comments from @nick-zh preferably from phprdkafka
Bram Gerritsen
@bramstroker
Hi, I am integrating enqueue AMQP in my project. Looks very promising and got simple implementation up and running in 10 minutes. However I get stuck trying to implement worker queues (as described here: https://www.rabbitmq.com/tutorials/tutorial-two-python.html). Anyone know if I can configure this pattern in the Symfony bundle and how to do it. I need to distribute work amongst workers, but the standard enqueue has a single consumer, which is not feasible for my use case.
Freek Gruntjes
@Fgruntjes

Hello, I have a \Interop\Queue\Processor that will handle messages in a non-blocking manner. Is there a way to send the ACK, REJECTand REQUEUE besides the return value of \Interop\Queue\Processor::process.

So something like this:

<?php

use Interop\Queue\Context;
use Interop\Queue\Message;
use Interop\Queue\Processor;
use Symfony\Component\Process\Process;

class AsyncProcessor implements Processor
{
    private $messages = [];
    private $processes = [];

    public function process(Message $message, Context $context)
    {
        $this->start($message);
        $this->cleanOldMessages();
    }

    private function start(Message $message): void
    {
        // do some async action like a symfony process
        $index = count($this->messages);
        $process = Process::fromShellCommandline('some sleepy command');
        $process->start();
        $process->isRunning();

        $this->messages[$index] = $message;
        $this->processes[$index] = $process;
    }

    private function cleanOldMessages(): void
    {
        foreach ($this->processes as $index => $process) {
            if (!$process->isRunning()) {
                $this->ackMessage($this->messages[$index], $process);
                unset($this->processes[$index], $this->messages[$index]);
            }
        }
    }

    private function ackMessage(Message $message, Process $process): void
    {
        if ($process->isSuccessful()) {
            // send Processor::REJECT;
        } else {
            // send Processor::ACK;
        }
    }
}
Freek Gruntjes
@Fgruntjes
Sorry guys found an issue php-enqueue/enqueue-dev#746 with my problem
Rein Baarsma
@rbaarsma

Hi,

I've looked through the docs, the code, issues and pull requests, but cannot find how to set the (default) visibliity timeout when using enqueue/enqueue-bundle (Symfony).

I have found that the SqsDestination.php has several attributes, one of which is the visibilityTimeout. It seems to me that this sets the Default Visibility Timeout (since this default is indeed normally 30 seconds in SQS).

But how do I set this attribute?

Rein Baarsma
@rbaarsma

Looking into the SqsDriver, it seems that with the standard enqueue:setup-broker, it's impossible to add attributes

            /** @var SqsDestination $queue */
            $queue = $this->createRouteQueue($route);
            if (array_key_exists($queue->getQueueName(), $declaredQueues)) {
                continue;
            }

            $log('Declare processor queue: %s', $queue->getQueueName());
            $this->getContext()->declareQueue($queue);

I would need to add $queue->setVisibilityTimeout(60*15) here., before the declareQueue is called

Am I correct that the only way to modify this is to extend the SqsDriver?
Seems to me that there is probably an easier way here... but I can't really find it

Poltynik
@Poltynik
@steveb_gitlab, php-enqueue/enqueue-dev#989 it can be fixed and merged?
Andrew Zaplitnyak
@zaplitnyak
Hi,
Andrew Zaplitnyak
@zaplitnyak
I wonder if I could config enqueue for rabbitmq to work through 1 exchange for 1 queue(due to legacy architecture of project) bypassing router queue or I need to override an AmqpDriver
Paweł Niedzielski
@steveb_gitlab
@Poltynik I'll work on enqueue on friday evening
Max Kotliar
@makasim
@zaplitnyak that's possible. You need a command subscriber with some properly set flags. excluded, prefixed something like that. Once set you'd be able to send message directly to that queue without any special\enqueue headers or properties. They would go to that command subscriber processor