These are chat archives for akkadotnet/AkkaStreams

16th
Nov 2016
Arjen Smits
@Danthar
Nov 16 2016 12:56
Do we have an akka-streams adapter/source definition for rabbitmq ?
Marc Piechura
@marcpiechura
Nov 16 2016 13:01
@Danthar we have one for amqp would that also work?
Should be added to the new Alpakka repo shortly
Arjen Smits
@Danthar
Nov 16 2016 13:02
uuhmmm. It should be. Depends on how configurable it is. I'd like to be able to integrate it with an existing environment.
ill take a look. Thx
Marc Piechura
@marcpiechura
Nov 16 2016 13:03
If not, we always accept PR's 😜
Arjen Smits
@Danthar
Nov 16 2016 13:03
hahah touche :D
im looking to replace some of the plumbing code that glue's rabbitmq easynetq with my akka code, with akka streams
build in backpressure insert angelic aah here
Arjen Smits
@Danthar
Nov 16 2016 13:20
@Silv3rcircl3 the amqp repo is not in nuget ?
oh i only just noticed the whole repo is 3 days old :+
Marc Piechura
@marcpiechura
Nov 16 2016 13:24
It will be part of https://github.com/akkadotnet/Alpakka in the future
Will move my Azure connectors too
Arjen Smits
@Danthar
Nov 16 2016 13:24
nice
im looking through the source now. Its very basic, but thats not a bad thing per say
Marc Piechura
@marcpiechura
Nov 16 2016 13:26
Yup it's not so hard to write your own Source stage
Arjen Smits
@Danthar
Nov 16 2016 13:28
whats the timeframe for the move to akkadotnet/Alpakka ?
Marc Piechura
@marcpiechura
Nov 16 2016 14:09
It depends, we could release a first version with the amqp connector from @bobanco and my Azure connectors, or we wait until we have ported the other connectors from the jvm
But we don't need testing support, only nuget release pipeline so it should be easy to setup
Arjen Smits
@Danthar
Nov 16 2016 15:08
How familiar are you with the jvm amqp connector @Silv3rcircl3 ?
Marc Piechura
@marcpiechura
Nov 16 2016 15:10
Not a bit :) haven't even looked into @bobanco's port
Arjen Smits
@Danthar
Nov 16 2016 15:11
im looking around in both codebases now. The ampq proxy code from akka is abit harder to grok
Marc Piechura
@marcpiechura
Nov 16 2016 15:11
Something in particular?
Arjen Smits
@Danthar
Nov 16 2016 15:12
but some remarks on the akka.net port. Obviously its not done yet, registering exchanges is not implemented, bindings are probably also not registered.
so still some work there
but the main issue is, imho
this:
  private void PushAndAckMessage(IncomingMessage message)
            {
                Push(_outlet, message);
                // ack it as soon as we have passed it downstream
                // TODO ack less often and do batch acks with multiple = true would probably be more performant
                Channel.BasicAck(message.Envelope.DeliveryTag,false);// just this single message

            }
the source receives a message and immediatly ack's it
i can understand why you would do this for an initial implementation
however ack'ing a message, should be part of your flow definition
because now, when processing a message fails upstream
the message is already gone from the ampq queue
so no retries
ideally what you would want, is a configurable prefetch param, which controlles how many unacked messages are fetched by the source at any given time
and a way for the consumer to configure how a message should be acked
Vagif Abilov
@object
Nov 16 2016 15:15
@Danthar You are absolutely right. In our scenario (and I believe this is a common pattern) acking/nacking can't be done immediately, it's a part of the outer workflow.
Arjen Smits
@Danthar
Nov 16 2016 15:15
or am i reading this code incorrectly
could be, im not 100% into akka streams internals
:nods:
Vagif Abilov
@object
Nov 16 2016 15:16
I'd say this is the essential for amqp
Marc Piechura
@marcpiechura
Nov 16 2016 15:16
No you're right. Had the same problem with my Azure connectors
Arjen Smits
@Danthar
Nov 16 2016 15:16
yeah its a huge pain if you can explicitly ack your messages
*cant
Marc Piechura
@marcpiechura
Nov 16 2016 15:17
I solved it by using different overloads for the source creation, one would auto ack the message the other one passes the messages as well as the partition downstream so that you can ack the message later in the stream
Arjen Smits
@Danthar
Nov 16 2016 15:17
yeah Azure messagebus has an abstraction for that
you can say message.ack() i believe
ideally we would like something similar here
But passing an Ack-strategy in the source/sink creation would be a good start
Marc Piechura
@marcpiechura
Nov 16 2016 15:19
Should be possible
Vagif Abilov
@object
Nov 16 2016 15:19
Perhaps it's possible to look at Azure implementation and add something similar to Rabbit?
Arjen Smits
@Danthar
Nov 16 2016 15:19
What the azure implementation does is essentially generate a proxy for your message
so that it has an Ack function you can call on your message that notifies the channel
that would be really fancy if we could do that.
But a simpler and easier to implement solution is to use an wrapper
something like MessageContext<MyMessageType>
which would allow you to do ctx.Message to get the actual message and/or ctx.Ack()
Vagif Abilov
@object
Nov 16 2016 15:21
Yeah this is how we are currently doing this in a non-stream implemenation that uses Akka with RabbitMQ.
Arjen Smits
@Danthar
Nov 16 2016 15:21
this also allows you to expose request reponse and other mechanisms like correlationid, through the ampq system
Im using EasyNetQ. It also allows some form of explicit ack's. But its a pain tbh.
Vagif Abilov
@object
Nov 16 2016 15:24
I started with EasyNetQ, then switched to plain RabbitMQ client just to remove any opinionated code.
Marc Piechura
@marcpiechura
Nov 16 2016 15:25
you can take a look at https://github.com/Silv3rcircl3/Akka.Streams.Azure/blob/master/src/Akka.Streams.Azure.ServiceBus/ServiceBusSource.cs
you can opt for automatic body extraction and auto ack or you get the BrokeredMessage
and https://github.com/Silv3rcircl3/Akka.Streams.Azure/blob/master/src/Akka.Streams.Azure.EventHub/EventHubSource.cs
where I pass the EventData as well as the PartitionContext downstream for every message
So you can say "auto ack every received batch" or you do it manually later
should work the same way for AMQP
Arjen Smits
@Danthar
Nov 16 2016 15:30
@object I know what you mean. But i needed to get started quickly. And i really didn't wanted to implement my own routing strategy, serialization layer, and other things.
I now have over 2k message types. Subscriber registration's are automized based on convention and such.
Vagif Abilov
@object
Nov 16 2016 15:30
I need some time to understand both code bases, Marc so I can be slow. But if we don't get anyone else better than me to check this out, I will give it a try.
Arjen Smits
@Danthar
Nov 16 2016 15:32
@Silv3rcircl3 yup. Sounds good. Ill look into it and ill start playing with @bobanco's port. Ill make a PR if i have anything substantial :)
For me i got 2 options. I either provide some way to inject a strategy that uses the same naming and serialization strategy that easynetq uses oob. Or i adopt a new way, and then ill have to rewrite my message infrastructure layer.
Im guessing the first one is less work ^^
should be interesting though
Boban
@bobanco
Nov 16 2016 18:44
@Danthar my repo is hand to hand port of alpakka amqp connector, and it works well with rabbitmq, soon i will share the port of opprabbit port, currently am using the oprabbit port and works good, it has recovery strategy etc.
regarding the alpakka amqp connector
u can define the prefetch size as well
Marc Piechura
@marcpiechura
Nov 16 2016 19:47
moved my Azure connectors to alpakka, porting now the file connectors