These are chat archives for akkadotnet/AkkaStreams

25th
Oct 2016
Sean Farrow
@SeanFarrow
Oct 25 2016 04:56
@silv3rcircl3 I'm using an IFlux to convert currently. In terms of the Redis source, the only issue I have is the fact that multiple channels can be subscribed to with one connection. How would you handle the fact that a channel can be subscribed/unsubscribed mid-flow? I currently have my own ISubscriptionManager interface that has an IObservable property--not the cleanest I know which is why I want to convert fully to stream sources!
Marc Piechura
@marcpiechura
Oct 25 2016 05:50
@SeanFarrow I haven't worked with Redis so I don't know the semantics. Could you give me an example which code you want to convert into a source ?
Sean Farrow
@SeanFarrow
Oct 25 2016 07:44
@silv3rcircl3 StackExchange.Redis has an ISubscribe interface, with a method that has the following signature: void Subscribe(RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags = CommandFlags.None); Given that this can be called multiple times, how would you turn this in to a stream, Alternatively, should I abstract this in to my own interface that returns an IObservable and use the IFlux stuff we were discussing yesterday to convert to an Akka stream, or should I just use a plain actor and have subscribe/unsubscribe messages? The advantage of the latter approach means I can use this with remoting/clustering!
Marc Piechura
@marcpiechura
Oct 25 2016 07:46
I would assume you create a Source per call to Subscribe right?
Like Redis.CreateSource(channel, flags) this returns a Source<RedisValue,NotUsed>
Which you could use to work with the messages which this subscription received
Or do you need to implement the ISubscribe interface in your source and give this to the Redis client?
Sean Farrow
@SeanFarrow
Oct 25 2016 07:55
@silv3rcircl3 No, you just give the implementation of Subscribe an action to call when a message on the requested channel(s) is published and received.
Marc Piechura
@marcpiechura
Oct 25 2016 07:58
And the Subscribe is implemented by the Redis client ?
Sean Farrow
@SeanFarrow
Oct 25 2016 08:07
Yes, that is correct! My main concern is having a stream per channel or channels!
Marc Piechura
@marcpiechura
Oct 25 2016 08:10
So you want a source that calls Subscribe on the Redis client for a single channel or multiple channels and emits the values
Correct?
Sean Farrow
@SeanFarrow
Oct 25 2016 08:12
I dearly, yes, as I say I can always wrap my own flat round this interface and the class that implemented if needed!
Marc Piechura
@marcpiechura
Oct 25 2016 08:12
Ok got it :)
Marc Piechura
@marcpiechura
Oct 25 2016 08:25
Another question, can you backpressure the Redis client ? So that the action is only called if you say you're ready ?
Sean Farrow
@SeanFarrow
Oct 25 2016 08:27
No, the organist call regardless! But I can do whatever I want within the action
Marc Piechura
@marcpiechura
Oct 25 2016 08:32
MH ok so that's a problem, the same problem you have with RX. You need to drop messages if the stream can't handle more, if that's a no go you can't use streams for that
You could use a buffer but that only moves the problem to a later point in time, because if the buffer is full you also need to drop some messages
Sean Farrow
@SeanFarrow
Oct 25 2016 08:38
The question is, how can I determine whether a stream can take messages? Is there an IPO to do that
Marc Piechura
@marcpiechura
Oct 25 2016 08:40
If you would build your own source this information is provided via GraphStage API
But since you can't backpressure the client, you can simply use the default Source.Queue and set the buffer size and backpressure strategy and you're good to go
Sean Farrow
@SeanFarrow
Oct 25 2016 10:27
Are there any examples of doing this?