These are chat archives for akkadotnet/AkkaStreams

16th
Aug 2016
Alex Valuyskiy
@alexvaluyskiy
Aug 16 2016 11:44

I'm trying to connect Streams to Redis, and I have this code

            string channelName = "numbers";

            // Redis init
            IRedisClientsManager manager = new BasicRedisClientManager();
            var client = manager.GetClient();

            // Streams init
            var system = ActorSystem.Create("StreamsSystem");
            var materializer = system.Materializer();

            // Publish messages
            Source
                .From(Enumerable.Range(1, 100000000))
                .Throttle(1, TimeSpan.FromSeconds(1), 1, ThrottleMode.Shaping)
                .RunForeach(i => client.PublishMessage(channelName, i.ToString()), materializer);

            // Subscribe to messages
            var pubsub = manager.CreatePubSubServer(channelName);
            pubsub.Start();
            pubsub.OnMessage = (channel, msg) => Console.WriteLine($"Received '{msg}' from '{channel}'");

How to use pubSub.OnMessage as a stream source?

Bartosz Sypytkowski
@Horusiath
Aug 16 2016 11:48
@alexvaluyskiy usual approach is to create an actor (inheriting from ActorPublisher<T>), that will be responsible for transforming third-party input into akka stream messages
Alex Valuyskiy
@alexvaluyskiy
Aug 16 2016 12:03
yeah. I've added these lines
// Subscribe to messages
var pubsub = manager.CreatePubSubServer(channelName);
pubsub.Start();
pubsub.OnMessage = (channel, msg) => actorPublisher.Tell(new RedisMessage(msg, channel));
pubsub.OnError = exception => actorPublisher.Tell(exception);

Source
    .FromPublisher(redisPublisher)
    .RunForeach(i => Console.WriteLine($"Received '{i.Message}' from '{i.Channel}'"), materializer);
and these two classes
public class RedisMessage
{
    public RedisMessage(string message, string channel)
    {
        Message = message;
        Channel = channel;
    }

    public string Message { get; }

    public string Channel { get; }
}

public class RedisPublisher : ActorPublisher<RedisMessage>
{
    protected override bool Receive(object message)
    {
        if (message is RedisMessage)
        {
            OnNext(message as RedisMessage);
        }
        else if (message is Exception)
        {
            OnError(message as Exception);
        }

        return false;
    }
}
and it works
Marc Piechura
@marcpiechura
Aug 16 2016 12:07
@alexvaluyskiy you need to check if demand is available from downstream before you call OnNext
Alex Valuyskiy
@alexvaluyskiy
Aug 16 2016 12:14
Why we need to implement another buffer inside the actor?
I thought that Streams has it's own buffers
Marc Piechura
@marcpiechura
Aug 16 2016 12:17
You don't need to, you could also drop incoming messages if no demand is available. Problem is that the Redis client can't be backpressured so you need to handle the case where the client is faster than the rest of the stream
Maybe Source.ActorRef would be better in this case
Alex Valuyskiy
@alexvaluyskiy
Aug 16 2016 12:24

Should I send my own messages if my stream is complete?

pubsub.OnDispose = () => actorPublisher.Tell("complete");
pubsub.OnStop = () => actorPublisher.Tell("complete");

Or we have some standard messages?

Ah, these
The stream can be completed successfully by sending Akka.Actor.PoisonPill or Akka.Actor.Status.Success to the actor reference.

The stream can be completed with failure by sending Akka.Actor.Status.Failure to the actor reference.