Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • 13:55
    Aaronontheweb commented #3811
  • 13:53
    Aaronontheweb commented #3811
  • 13:48
    Zetanova commented #3811
  • 13:21
    Aaronontheweb commented #3811
  • 12:15
    PetrPejcel opened #3917
  • 09:56
    Zetanova commented #3903
  • 09:42
    Zetanova commented #3811
  • 09:38
    Zetanova commented #3811
  • 09:37
    Zetanova commented #3811
  • 08:37
    Pzixel commented #3908
  • 08:20
    Zetanova synchronize #3916
  • 08:13
    Horusiath commented #3908
  • 08:12
    Horusiath commented #3908
  • 08:11
    Horusiath commented #3908
  • 07:58
    Horusiath commented #3284
  • 07:56
    Horusiath commented #3284
  • 07:32
    ismaelhamed commented #3284
  • 07:24
    Zetanova opened #3916
  • Sep 16 15:35
    Aaronontheweb commented #3284
  • Sep 16 07:55
    ismaelhamed commented #3284
John Nicholas
@MrTortoise
am i going insane?
Boban
@bobanco

@Horusiath do you have any idea why Sink.FromSubscriber gives this error

[INFO][16-Apr-16 14:10:51][Thread 0008][akka://ReactiveRabbit/user/StreamSupervisor-0/Flow-0-0-unknown-operation] Message RequestMore from NoSender to akka://ReactiveRabbit/user/StreamSupervisor-0/Flow-0-0-unknown-operation was not delivered. 1 dead letters encountered.

here is how it's called

          var trailMessages = new List<string> { "message1","message2", "message3"};

            Source.From(trailMessages)
                .Map(
                    msg =>
                        new Message(Encoding.UTF8.GetBytes(msg)))
                .RunWith(Sink.FromSubscriber(connection.Publish(inboundExchange.Name, "")), mat);
btw am porting ReactiveRabbit to .net which is based on akka stream
Boban
@bobanco
here is the Subscriber implemenration
       internal class ExchangeSubscriber : ISubscriber<Routed>
    {
        private readonly IModel _channel;
        private readonly string _exchange;
        private readonly AtomicReference<ISubscription> _active = new AtomicReference<ISubscription>();
        private int _publishingThreadRunning = 0;//can have 2 values, 0 means not running, 1 means running.
        private readonly ConcurrentQueue<Routed> _buffer = new ConcurrentQueue<Routed>();
        private int _closeRequested = 0;//can have 2 values, 0 means not requested, 1 means requested.

        public ExchangeSubscriber(IModel channel, string exchange)
        {
            if (exchange.Length >= 255)
                throw new ArgumentOutOfRangeException(nameof(exchange), exchange.Length, "exchange.Length>=255");
            _channel = channel;
            _exchange = exchange;
        }
        public void OnNext(Routed element)
        {
            if(element==null)
                throw new ArgumentNullException(nameof(element));
            _buffer.Enqueue(element);
            var running = Interlocked.Exchange(ref _publishingThreadRunning, 1);
            if (running == 0)
            {

                Task.Factory.StartNew(PublishFromBuffer, TaskCreationOptions.AttachedToParent);//revisit this
            }

        }

        private void PublishFromBuffer()
        {
            Routed element;
            if (_buffer.TryDequeue(out element))
            {
                Publish(element);
            }
            var nonEmpty = _buffer.Count > 0;
            Interlocked.Exchange(ref _publishingThreadRunning, nonEmpty ? 1 : 0);
            if(nonEmpty)
                PublishFromBuffer();
        }

        private void Publish(Routed routed)
        {
            try
            {
                _channel.BasicPublish(_exchange, routed.RoutingKey,
                    Conversions.ToBasicProperties(routed.Message),
                    routed.Message.Body);
                _active.Value.Request(1);
            }
            catch (Exception ex)
            {
                _active.Value.Cancel();
                CloseChannel();
            }
        }

        public void OnSubscribe(ISubscription subscription)
        {
            var sub = _active.CompareAndSet(null, subscription);
            if(sub)
                subscription.Request(1);
            else
            {
                subscription.Cancel();
            }
        }

        public void OnError(Exception cause)
        {
            ShutdownWhenFinished();
        }

        public void OnComplete()
        {
            ShutdownWhenFinished();
        }

        public void OnNext(object element)
        {
           OnNext((Routed)element);
        }

        private void CloseChannel()
        {
            var closedRequested = Interlocked.Exchange(ref _closeRequested, 1) == 0;
            lock (_channel)
            {
                if (closedRequested && _channel.IsOpen)
                    _channel.Close();
            }
        }

        private void ShutdownWhenFinished()
        {
            //        Future {
            //            publishingThreadRunning.single.await(!_)
            //  closeChannel()
            //}
            Task.Factory.StartNew(() =>
            {
                while (true)
                {
                    if (Thread.VolatileRead(ref _publishingThreadRunning) == 0)
                        break;
                }
            }, TaskCreationOptions.AttachedToParent).Wait();
            CloseChannel();
        }
    }
Bartosz Sypytkowski
@Horusiath
@bobanco it's not necessarily an error - it point out that actor on which your flow materializes, is probably dead. Maybe you've closed the actor system before flow has finished? (also another part is that streams are WIP, and if I remember correctly there were some problems with closing streams, detected by testkit)
dilvishjohn
@dilvishjohn
Hi, all. I have a problem to make akka work at latest ASP.NET 5, but there are no dll's fot DNX 5.0 core framework. Is there any workaroud exist about how to fix it?
Roger Johansson
@rogeralsing
no, there are a lot of parts that needs to be ported before akka.net works with .net core ... there is some progress going on, but no where near complete
Vagif Abilov
@object
Saw that 1.0.7 PR has been accepted for Akka.Persistence.SqlServer. That's a great news, I will start using it right away and hope that NuGet package will also be released soon.
Ralf
@Ralf1108
is it possible to connect to actor system via named pipes?
Zetanova
@Zetanova
If an bigger actor needs to make some requests and wait for the response, how to get this roundstrips in some kind of farmated code ?
BecomeStacked(msg =>
                {
                    return OldBecome(msg) || msg.Match()
                        .With<Response>(m =>
                        {

                            UnbecomeStacked();
                        })
                        .WasHandled;                    
                });
whould it plausible to inline the function and how to get the on the Oldbecome ?
How could it be done for multiple requests?
Zetanova
@Zetanova
How i am currently doing things with requestIds, it is getting very fragmented and i think that i would get problems in 12monath to read and navigate in my own code
Jeff Cyr
@JeffCyr
@Ralf1108 There's work in progress to support named pipes in v1.5
Ralf
@Ralf1108
ok cool :-)
would be cool to support scenarios for multi process akka systems without opening a port and hassle with firewall rules
Kris Schepers
@schepersk
indeed..
Aleksandrs
@sonicflare
Can somebody shine a light on what happened to AsyncBehavior.Suspend/AyncBehavior.Reentrant on Receive? I need to do bunch of operations, but i dont want to waste thread on blocking, and Suspend is the thing that im looking for
Jeff Cyr
@JeffCyr
@sonicflare Are you looking for ReceiveAsync<T>(Func<T, Task>)?
AsyncBehavior got dropped a long time ago, only Suspend is supported now, there is an overload of Receive that accepts a Func<T, Task> it have been renamed to ReceiveAsync in 1.0.7
Aleksandrs
@sonicflare
@JeffCyr looks like it, and it does the suspend by default? So while it is waiting for stuff to complete, no other messages for this actor is processes, right?
Jeff Cyr
@JeffCyr
That's right
Aleksandrs
@sonicflare
@JeffCyr thanks!
Jeff Cyr
@JeffCyr
np!
Grzegorz Bernas
@profesor79
hi guys, just found this question on SO is there a way to use app.json .net core with HOCON
eyantiful
@eyantiful
Hi,
Any sample on how to send Actor (Initiating actor in choreography) inside a meesage in Cluster environment?
Thanks
Christian Sparre
@christiansparre
For a monitoring solution for our services (not using clustering), what would be the best way to present a "list/tree" of actors of a local actorsystem. Having each actor respond to a "getchildren" message or is there a way to ask the actorsystem this? Is there a way to traverse the actor tree?
Zetanova
@Zetanova
@christiansparre the get list/tree its the same as to work with system file directories. You need to ask each folder for the child-items. But i dont know if there is a system message for doing it like Akka.Actor.Identity
Arjen Smits
@Danthar
There is no officially supported way of doing this. But there is a hacky one.
Lejdholt
@Lejdholt
Hi, AroundReceive for UntypedActor allways returns true and ReceiveActor inherites from UntypedActor without changing this behavior. Shouldn't ReceiveActor return false for AroundReceive if for some reason the message wasn't handled by a Receive function defined in a ReceiveActor?
Arjen Smits
@Danthar
@christiansparre check out this repo by @rogeralsing https://github.com/rogeralsing/Beacon
The actor in there illustrates a way you can "explore" your actorsystem.
However once again. this is not officially supported stuff. But it should give you a nice sample on how you can do this.
basically what it boils down to is you recursively apply an Identify message on an ActorSelection with wildcard
Problem is, you never know when you are done. So running this could kill your cluster :P because you;d end up continually spamming it with identify messages.
But it you could tweak it with clear end-conditions, or be able to restrict it parts of your cluster only. It could be the basis of a nice tool.
Bartosz Sypytkowski
@Horusiath
@Lejdholt it's a very good notice.
could you set that as an issue on github?
Lejdholt
@Lejdholt
Was hoping it was me who didn't understand ;P. But yes, I'll do that.
Bartosz Sypytkowski
@Horusiath
@eyantiful since you can send message through cluster in multiple ways, you need to be more precise. There are few various examples here i.e. cluster section from core repo and cluster web crawler
Lejdholt
@Lejdholt
@Horusiath here is the new issue akkadotnet/akka.net#1894
Jeff Cyr
@JeffCyr
@Lejdholt @Horusiath I think Unhandled(object msg) is called somewhere else in ReceiveActor
eyantiful
@eyantiful
@Horusiath sorry for the minimalism.
I have a custom serializer (ProtoBuf.Net) for type Message.
Message has (among others) IActorRef property for the initiator actor.
In the samples you mentioned the default serializer is used as far as i can tell.
Might be ISurrogated interface which should be surrogated?
I guess the question is more is passing IActorRef around good practice?
In a low-latency application what is the overhead of the resolving for each message?
Lejdholt
@Lejdholt
@JeffCyr If I want to log handled messages (commands) to a message store in my ReceiveActor without doing it in every receive function, how would I do that? I thought I could use AroundReceive for this, guess not. Feels a bit inconsistent but I think i'm missing something.
Jeff Cyr
@JeffCyr
@Lejdholt Do you expect an actor to receive a lot of unhandled messages? Why not log every messages?
Lejdholt
@Lejdholt
@JeffCyr No but I dont want to store commands that wasn't handled. Storing commands are for followup, what command resultet in that event, through correlationids and causationids
Artur Karbone
@ArturKarbone
Is there a more advanced way to control ITellScheduler? Cron-like for instance. Currently ITellScheduler provides initial delay and iterval.
I want something like once at 2AM during workdays, stuff like that. Thanks
Lejdholt
@Lejdholt
logging commands to a "normal" log is already done in the parent actor.