These are chat archives for akkadotnet/akka.net

27th
Feb 2018
Maciek Misztal
@mmisztal1980
Feb 27 2018 00:36
Hey all, can anyone elaborate on how SetReceiveTimeout should be handled in PersistentFSM ?
Maciek Misztal
@mmisztal1980
Feb 27 2018 01:02

I'm asking b/c :

     SetStateTimeout(Uninitialized.Instance, 10.Seconds());
     SetStateTimeout(Inactive.Instance, 10.Seconds());
     SetStateTimeout(Active.Instance, 10.Seconds());

does not seem to be firing in any of my states, when using handlers such as :

 When(Uninitialized.Instance, (evt, state) =>
            {
                switch (evt.FsmEvent)
                {
                    case FSMBase.StateTimeout timeout:
                        return Stay().AndThen((s) => OnTimeOut(timeout));
                    default:
                        return Stay();
                }
            });
so I'm wondering, how should this be handled properly?
Ulimo
@Ulimo
Feb 27 2018 09:49
@Horusiath thanks for the response :) I ended up just using the network layer with TCP to solve the issue so I could have more control, thank you!
Arsene T. Gandote
@Tochemey
Feb 27 2018 14:09
Hello geeks, I would like to know whether there is a library for pub/sub built using Akka.NET. Also any advice how to use Akka.NET and Kafka
Also I would like to know whether the Akka.Net has an implementation of Akka Streams Kafka.
Vasily Kirichenko
@vasily-kirichenko
Feb 27 2018 15:28
firstly, this is no good .net client for kafka (at all). secondly, kafka streams is a java library and I strongly doubt anybody is gonna port it to .net (and why?)
Bartosz Sypytkowski
@Horusiath
Feb 27 2018 16:32
@mmisztal1980 tbh it looks good
do you have any example to reproduce?
AndreSteenbergen
@AndreSteenbergen
Feb 27 2018 19:19
@vasily-kirichenko What's wrong with the Confluent Kafka? It is a wrapper for librdkafka, but is quite good. It should be possible to create a Kafka Source based on Confluent Kafka.
Does anyone know how to break free from a stream RunForEach?
           var mat = ActorMaterializer.Create(Context);

            Source<EventEnvelope, NotUsed> source = readJournal.EventsByTag("IEvent", null);
            source.RunForeach(envelope =>
            {
                if (subscribers.Contains(sender)) sender.Tell(envelope);                    
            }, mat);
I am using a readjournal as basis for a subscription, but an actor can die, at that moment I would like to stop this loop
is dispose enough?
AndreSteenbergen
@AndreSteenbergen
Feb 27 2018 19:35
No disposable found, I guess I can throw some exception and catch that :(
Marc Piechura
@marcpiechura
Feb 27 2018 19:57
@AndreSteenbergen would a kill switch help?
AndreSteenbergen
@AndreSteenbergen
Feb 27 2018 19:58
Don't know this is what I have now, I am trying to implement the Context.Watch what you suggested last week
public class PublishEventsActor : ReceiveActor
{
    private readonly ActorSystem system;

    public Dictionary<IActorRef, ActorMaterializer> subscribers = new Dictionary<IActorRef, ActorMaterializer> ();


    public PublishEventsActor(ActorSystem system)
    {
        Receive<string>(msg => Handle(msg));
        Receive<Terminated>(msg => Handle(msg));

        this.system = system;
    }

    private void Handle(Terminated msg)
    {
        Console.WriteLine("Subscriber died");
        subscribers[msg.ActorRef].Shutdown();
        subscribers.Remove(msg.ActorRef);
    }

    private void Handle(string msg)
    {
        if (msg.Equals("start") && !subscribers.ContainsKey(Sender))
        {
            var sender = Sender;

            Context.Watch(sender);

            //create a journal query, sending all messages to the actor in the wrapper
            // obtain read journal by plugin id
            var readJournal = PersistenceQuery.Get(system).ReadJournalFor<RedisReadJournal>("akka.persistence.query.journal.redis");
            var mat = ActorMaterializer.Create(Context);

            Source<EventEnvelope, NotUsed> source = readJournal.EventsByTag("IEvent", null);
            subscribers.Add(sender, mat);

            source.RunForeach(envelope =>
            {
                if (subscribers.ContainsKey(sender)) sender.Tell(envelope);
            }, mat);                
        }
    }
}
I am a bit worried about the way I close it of now, if it is safe or not. I have an api listening for events, creating a read model based of events. But I am a bit worried I end up in strange situations regarding the source.ForEach. Is Materilizer.ShutDown wise, it still is a hard part for me to grasp actually.
AndreSteenbergen
@AndreSteenbergen
Feb 27 2018 20:03
Normally I would expect I could use a CancellationToken starting the RunForEach Task, but I can't.
Marc Piechura
@marcpiechura
Feb 27 2018 20:50
So you could use a kill switch, it’s an actual Akka streams stage ;) and “dispose” this when you receive the termination message
AndreSteenbergen
@AndreSteenbergen
Feb 27 2018 20:51
I'll check it out right now thx
AndreSteenbergen
@AndreSteenbergen
Feb 27 2018 20:57
like this? var killswitch = source.ViaMaterialized(KillSwitches.Single<EventEnvelope>(), Keep.Right);
Making the this the dictionary Dictionary<IActorRef, Source<EventEnvelope, UniqueKillSwitch>>
Marc Piechura
@marcpiechura
Feb 27 2018 21:01
ViaMaterialzed is correct but you need to store the materialized value in the dict and not the source itself
So take a look what RunForeach returns
AndreSteenbergen
@AndreSteenbergen
Feb 27 2018 21:01
Task
Marc Piechura
@marcpiechura
Feb 27 2018 21:02
Then you need to change the foreach call
AndreSteenbergen
@AndreSteenbergen
Feb 27 2018 21:02
Source<EventEnvelope,NotUsed>
Marc Piechura
@marcpiechura
Feb 27 2018 21:02
ToMaterialized(Sink.ForEach<Envelop>(...), Keep.Both)
AndreSteenbergen
@AndreSteenbergen
Feb 27 2018 21:02
Ah, ok
Marc Piechura
@marcpiechura
Feb 27 2018 21:02
After the KillSwitch instead of runforeach
And then .Run(mat)
That should give you a Tuple<KillSwitch, Task>
AndreSteenbergen
@AndreSteenbergen
Feb 27 2018 21:03
I'll try right now thx
Working
            var mat = ActorMaterializer.Create(Context);

            subscribers.Add(sender, null);

            Source<EventEnvelope, NotUsed> source = readJournal.EventsByTag("IEvent", null);
            var (killswitch, task) = source
                .ViaMaterialized(KillSwitches.Single<EventEnvelope>(), Keep.Right)
                .ToMaterialized(Sink.ForEach<EventEnvelope>((envelope) =>
                {
                    if (subscribers.ContainsKey(sender)) sender.Tell(envelope);
                }), Keep.Both)
                .Run(mat);

            subscribers[sender] = killswitch; 
Thx Marc, the whole streams part are still a bit magic to me, but slowly I am getting it
Marc Piechura
@marcpiechura
Feb 27 2018 21:08
👍
AndreSteenbergen
@AndreSteenbergen
Feb 27 2018 21:09
Would have loved to see something like this in the documentation about readjournals.
But again, thanks a lot.
Marc Piechura
@marcpiechura
Feb 27 2018 21:11
You’re welcome, and we ofc always accept PR’s 😜
AndreSteenbergen
@AndreSteenbergen
Feb 27 2018 21:12
Found it. https://github.com/akkadotnet/getakka.net/tree/master/src, I will make a PR (tomorrow)
getting late
Bartosz Sypytkowski
@Horusiath
Feb 27 2018 21:14
it's a good idea to add some basic/advanced tutorials for akka.net site, for FAQs like:
  • how to create saga pattern (i.e. by using PersistentFSM)
  • how to make rolling incremental cluster upgrades
Marc Piechura
@marcpiechura
Feb 27 2018 21:14
That’s the wrong location
Bartosz Sypytkowski
@Horusiath
Feb 27 2018 21:14
@AndreSteenbergen right now articles are generated directly from the official akka.net repo
AndreSteenbergen
@AndreSteenbergen
Feb 27 2018 21:14
Thx, I'll bookmark them right now
AndreSteenbergen
@AndreSteenbergen
Feb 27 2018 21:20
Sorry for all the newby questions. I started Akka.Net about 4 months ago, and I am venturing in the little harder bits.
The examples only go so far in explaining what is happening. This channel, no the people on this channel, are of great help.