These are chat archives for akkadotnet/akka.net

16th
Apr 2016
Boban
@bobanco
Apr 16 2016 14:11

@Horusiath do you have any idea why Sink.FromSubscriber gives this error

[INFO][16-Apr-16 14:10:51][Thread 0008][akka://ReactiveRabbit/user/StreamSupervisor-0/Flow-0-0-unknown-operation] Message RequestMore from NoSender to akka://ReactiveRabbit/user/StreamSupervisor-0/Flow-0-0-unknown-operation was not delivered. 1 dead letters encountered.

here is how it's called

          var trailMessages = new List<string> { "message1","message2", "message3"};

            Source.From(trailMessages)
                .Map(
                    msg =>
                        new Message(Encoding.UTF8.GetBytes(msg)))
                .RunWith(Sink.FromSubscriber(connection.Publish(inboundExchange.Name, "")), mat);
btw am porting ReactiveRabbit to .net which is based on akka stream
Boban
@bobanco
Apr 16 2016 14:16
here is the Subscriber implemenration
       internal class ExchangeSubscriber : ISubscriber<Routed>
    {
        private readonly IModel _channel;
        private readonly string _exchange;
        private readonly AtomicReference<ISubscription> _active = new AtomicReference<ISubscription>();
        private int _publishingThreadRunning = 0;//can have 2 values, 0 means not running, 1 means running.
        private readonly ConcurrentQueue<Routed> _buffer = new ConcurrentQueue<Routed>();
        private int _closeRequested = 0;//can have 2 values, 0 means not requested, 1 means requested.

        public ExchangeSubscriber(IModel channel, string exchange)
        {
            if (exchange.Length >= 255)
                throw new ArgumentOutOfRangeException(nameof(exchange), exchange.Length, "exchange.Length>=255");
            _channel = channel;
            _exchange = exchange;
        }
        public void OnNext(Routed element)
        {
            if(element==null)
                throw new ArgumentNullException(nameof(element));
            _buffer.Enqueue(element);
            var running = Interlocked.Exchange(ref _publishingThreadRunning, 1);
            if (running == 0)
            {

                Task.Factory.StartNew(PublishFromBuffer, TaskCreationOptions.AttachedToParent);//revisit this
            }

        }

        private void PublishFromBuffer()
        {
            Routed element;
            if (_buffer.TryDequeue(out element))
            {
                Publish(element);
            }
            var nonEmpty = _buffer.Count > 0;
            Interlocked.Exchange(ref _publishingThreadRunning, nonEmpty ? 1 : 0);
            if(nonEmpty)
                PublishFromBuffer();
        }

        private void Publish(Routed routed)
        {
            try
            {
                _channel.BasicPublish(_exchange, routed.RoutingKey,
                    Conversions.ToBasicProperties(routed.Message),
                    routed.Message.Body);
                _active.Value.Request(1);
            }
            catch (Exception ex)
            {
                _active.Value.Cancel();
                CloseChannel();
            }
        }

        public void OnSubscribe(ISubscription subscription)
        {
            var sub = _active.CompareAndSet(null, subscription);
            if(sub)
                subscription.Request(1);
            else
            {
                subscription.Cancel();
            }
        }

        public void OnError(Exception cause)
        {
            ShutdownWhenFinished();
        }

        public void OnComplete()
        {
            ShutdownWhenFinished();
        }

        public void OnNext(object element)
        {
           OnNext((Routed)element);
        }

        private void CloseChannel()
        {
            var closedRequested = Interlocked.Exchange(ref _closeRequested, 1) == 0;
            lock (_channel)
            {
                if (closedRequested && _channel.IsOpen)
                    _channel.Close();
            }
        }

        private void ShutdownWhenFinished()
        {
            //        Future {
            //            publishingThreadRunning.single.await(!_)
            //  closeChannel()
            //}
            Task.Factory.StartNew(() =>
            {
                while (true)
                {
                    if (Thread.VolatileRead(ref _publishingThreadRunning) == 0)
                        break;
                }
            }, TaskCreationOptions.AttachedToParent).Wait();
            CloseChannel();
        }
    }
Bartosz Sypytkowski
@Horusiath
Apr 16 2016 15:44
@bobanco it's not necessarily an error - it point out that actor on which your flow materializes, is probably dead. Maybe you've closed the actor system before flow has finished? (also another part is that streams are WIP, and if I remember correctly there were some problems with closing streams, detected by testkit)