These are chat archives for akkadotnet/AkkaStreams

7th
Nov 2017
Boris Kreminski
@krembf
Nov 07 2017 15:20 UTC

Hi guys, I would like to use AKKA.Streams to implement file transfer between remote actors. Was originally following this SO article:
https://stackoverflow.com/questions/45359672/sending-stream-e-g-file-from-actor-to-actor
plus tried to read the getakka.net docs regarging streaming, but I still need some guidance.
On sender's side, I'm materializing the outgoing stream and sending data to receiver's child actor which is materialized to accept incoming data. So as far as I understand the topology, there will be one child actor of the sender actor and one child of the receiver actor exchanging the chunks. I convert ByteString to byte array (ChunkData) for serialization.
Here is the code:

    public class SenderActor : ReceiveActor
    {
        private const string Filename = @"C:\temp\in.docx";

        private const int ChunkSize = 4096;

        public SenderActor()
        {
            Receive<StreamStart>(message =>
            {
                var stream = new FileStream(Filename, FileMode.Open);
                Console.WriteLine(
                    $"[sender] Received StreamStart message, local stream length {stream.Length}");
                var materializer = Context.Materializer();

                var source = StreamConverters.FromInputStream(() => stream, ChunkSize)
                    .Via(Flow.Create<ByteString>().Select(x => new ChunkData(x.ToArray())));
                var result = source.To(Sink.ActorRef<ChunkData>(message.Receiver, new StreamComplete())).Run(materializer);
            });

            Receive<StreamResult>(message =>
            {
                Console.WriteLine(
                    $"[sender] Received StreamResult message, result {message.Result}");
            });
        }
    }

    public class ReceiverActor : ReceiveActor
    {
        private const string Filename = @"C:\temp\out.docx";

        private readonly IActorRef _streamReceiver;
        private readonly FileStream _stream;
        private IActorRef _sender;
        private const int BufferSize = 4096;

        public ReceiverActor()
        {
            _stream = new FileStream(Filename, FileMode.Create);
            _streamReceiver = Source.ActorRef<ChunkData>(BufferSize, OverflowStrategy.Fail)
                .Via(Flow.Create<ChunkData>().Select(x => ByteString.CopyFrom(x.Chunk)))
                .To(StreamConverters.FromOutputStream(() => _stream, true)).Run(Context.Materializer());

            Context.Watch(_streamReceiver);

            Receive<StreamBegin>(message => OnStreamBegin(message));
            Receive<StreamComplete>((message) => ReceivedStreamComplete(message));
        }

        private void OnStreamBegin(StreamBegin message)
        {
            Console.WriteLine(
                "[receiver] Received StreamBegin message");

            _sender = message.Sender;
            _sender.Tell(new StreamStart(_streamReceiver));
        }

        private void ReceivedStreamComplete(StreamComplete message)
        {
            Console.WriteLine($"[receiver] got signaled that the stream completed, local stream length {_stream.Length}");

            _sender.Tell(new StreamResult(true));

            _stream.Close();
        }
    }    

    public class ChunkData
    {
        public byte[] Chunk { get; set; }

        public ChunkData(byte[] chunk)
        {
            Chunk = chunk;
        }
    }

My confusion is: if this code makes any sense, then the StreamComplete message will be sent to the child actor of the receiver and not to the parent receiver actor:
source.To(Sink.ActorRef<ChunkData>(message.Receiver, new StreamComplete())).Run(materializer);
In this case, the receiver actor has no idea whether the transfer has completed.
Otherwise, if I provide the reference to receiver actor as a target actor for receiving chunks (as was suggested in the proposed solution on SO), then yes, it will receive the StreamComplete message, but then I'm not sure how the tream flow control will be done, as I will end up explicitly implementing the consumer side with Receive<ChunkData>.
Thanks for help!