These are chat archives for akkadotnet/akka.net

20th
Feb 2018
Bartosz Sypytkowski
@Horusiath
Feb 20 2018 07:30
@mmisztal1980 PersistentFSM stores events as part of the Applying definition. Recovery/state-update happens as defined in your ApplyEvent override.
Ondrej Pialek
@ondrejpialek
Feb 20 2018 08:46
So I wonder - if I debug my app do all the timeouts still... timeout? :) I see my cluster breaks if I debug for too long and then goes up again after debugger is resumed. What about message timeout in inbox, asks etc?
What is the development scenario here - have a dev config with the timeouts in minutes?
Onur Gumus
@OnurGumus
Feb 20 2018 10:02
In event sourcing, I am seeing my states are also being persisted besides the events. Is this expected ?
And how does journal reader treat them in that case ?
Bartosz Sypytkowski
@Horusiath
Feb 20 2018 10:04
@OnurGumus how are you storing your actor's state?\
Onur Gumus
@OnurGumus
Feb 20 2018 10:04
I don't do anything special. I use apply
Standard FSM pattern
I see my states persisted with a different manifest name
whenever they occur.
Oh and it is PersistentFSM
Bartosz Sypytkowski
@Horusiath
Feb 20 2018 10:08
I guess that PersistentFSM is snapshotting the state on successful application (I'm not sure if it should happen on every event application or not)
Onur Gumus
@OnurGumus
Feb 20 2018 10:09
To the EventJournal Table?
Bartosz Sypytkowski
@Horusiath
Feb 20 2018 10:09
snapshots are usually stored separately from events
Onur Gumus
@OnurGumus
Feb 20 2018 10:09
It is in EventJournal Table, I have not configured snapshots
Bartosz Sypytkowski
@Horusiath
Feb 20 2018 10:15
in what event is this state stored?
Onur Gumus
@OnurGumus
Feb 20 2018 10:15
It's not an event.
I see each event is logged to one row. Then in between whenever the state changes I see one dedicated row for that new state as well in eventjournal table
Bartosz Sypytkowski
@Horusiath
Feb 20 2018 10:16
ah, you're talking about StateChangeEvent?
Onur Gumus
@OnurGumus
Feb 20 2018 10:18
yes thatis correct
that's what is written in the manifest column
Bartosz Sypytkowski
@Horusiath
Feb 20 2018 10:18
without storing the state changes the persistent FSM wouldn't be able to restore the correct state of state machine during recovery
Onur Gumus
@OnurGumus
Feb 20 2018 10:18
hmm Is that so
I thought replaying events would suffice
but perhaps you are right since the state change can be based on custom logic rather than events
I never thought about it
And readjournal just gives me those states too eh?
Bartosz Sypytkowski
@Horusiath
Feb 20 2018 10:23
only information about the changes
state itself may be too massive to keep it as event
afaik it's replayed based on snapshots and events
Maciek Misztal
@mmisztal1980
Feb 20 2018 11:50
@Horusiath is the CurrentState<T>a special type of an event?
Vasily Kirichenko
@vasily-kirichenko
Feb 20 2018 12:11

morning. I need to exec a stored proc in a database periodically, it returns a bunch of rows. I'd like to construct a stream from the entire thing, like

Source.formDbQuery("exec Get", TimeSpan.FromSeconds(60)).Select(...)...

Maybe there is a combinator that repeats a source periodically infinitely or something like that?

Onur Gumus
@OnurGumus
Feb 20 2018 12:12
@vasily-kirichenko perhaps you should start from a timer and execute the query in selec
Vasily Kirichenko
@vasily-kirichenko
Feb 20 2018 12:13
:) this is not the hard part
the hard part is to push the records into a single stream, with backpressure
Onur Gumus
@OnurGumus
Feb 20 2018 12:15
You can use Source.Queue
Vasily Kirichenko
@vasily-kirichenko
Feb 20 2018 12:15
I can create an actor which exec the sp and push into a Source.Queue... Does it sound ok?
:)
Onur Gumus
@OnurGumus
Feb 20 2018 12:15
Note that Source.Queue documentation is a bit tricky.
So good luck, (evil smile)
Vasily Kirichenko
@vasily-kirichenko
Feb 20 2018 12:17
I know
Onur Gumus
@OnurGumus
Feb 20 2018 12:19
And it is not thread safe
Vasily Kirichenko
@vasily-kirichenko
Feb 20 2018 12:20
I know ;)
Marc Piechura
@marcpiechura
Feb 20 2018 12:21
Would Source.Tick().SelectFromDb().SelectMany() work? in theory ticks from the source would be ignored until select many has flatten all elements and sends demand upstream
Vasily Kirichenko
@vasily-kirichenko
Feb 20 2018 12:21
I think MergeHub could be used. let me a second.
Boban
@bobanco
Feb 20 2018 12:21
@vasily-kirichenko you can create your own Timer Graph Stage
Vasily Kirichenko
@vasily-kirichenko
Feb 20 2018 12:22
@Silv3rcircl3 ohhhhhhhhhh. great
Boban
@bobanco
Feb 20 2018 12:22
or what @Silv3rcircl3 has suggected
Onur Gumus
@OnurGumus
Feb 20 2018 12:23
But that won't give you backpressure
Vasily Kirichenko
@vasily-kirichenko
Feb 20 2018 12:23
@bobanco I see. But stages are tricky to write. However, if the logic is already available...
Onur Gumus
@OnurGumus
Feb 20 2018 12:23
I thought you wanted to get individual rows passed as back pressure no?
Vasily Kirichenko
@vasily-kirichenko
Feb 20 2018 12:23
@NaagaGame_twitter Marc said it will.
@OnurGumus yes
in theory ticks from the source would be ignored until select many has flatten all elements and sends demand upstream
Onur Gumus
@OnurGumus
Feb 20 2018 12:24
yes but you will be loading all the data from db
Ah you just didn't want overlapping sequences
but again this was my initial suggestion :)
anyway
Vasily Kirichenko
@vasily-kirichenko
Feb 20 2018 12:25
why? as I understand, the next SelectFromDb will not be called until the previous records are passed downstream
Onur Gumus
@OnurGumus
Feb 20 2018 12:26
yes yes, but I actually suggested the same with @Silv3rcircl3 , and you opposed that
Vasily Kirichenko
@vasily-kirichenko
Feb 20 2018 12:26
I need an infinite stream of records
Onur Gumus
@OnurGumus
Feb 20 2018 12:26
so I thought you did not want to load all items at once
anyway never mind me
Vasily Kirichenko
@vasily-kirichenko
Feb 20 2018 12:26
ah, I see
sorry
I didn't realize you ment Source.Tick by "start with a timer"
Marc Piechura
@marcpiechura
Feb 20 2018 12:27
You may need to set the inputbuffer Attribute since by default we put a buffer of size 16 between async stages, not 100% sure for SelectAsync though
Bartosz Sypytkowski
@Horusiath
Feb 20 2018 12:44
@mmisztal1980 in what context special?
Maciek Misztal
@mmisztal1980
Feb 20 2018 13:20
@Horusiath just reviewing the test case that you've linked
it's used there - I'm trying to understand what is it
Aaron Stannard
@Aaronontheweb
Feb 20 2018 13:57
just a quick FYI for our Akka.Remote users: https://twitter.com/AkkaDotNET/status/965947058095091712
@/all ran into some reports of this issue with users upgrading to DotNetty v0.4.7 over the past couple of weeks. If you upgrade to that version of DotNetty, you'll see some deserialization errors occur. This is the result of a bug which DotNetty has already patched and they'll be doing a release to DotNetty v0.5.0 in the next week, which should fix it but will include some breaking API changes. Once DotNetty v0.5.0 comes out we'll need to compile a version of Akka.NET against it and release that NuGet package before you upgrade. We'll keep you updated here or on our official Twitter.
In the short run, you can avoid the issue altogether by just not upgrading to DotNetty 0.4.7
Akka.Remote depends on DotNetty 0.4.6 and runs just fine on there
just wanted to spare you all the trouble of discovering this on your own time
<3
Vasily Kirichenko
@vasily-kirichenko
Feb 20 2018 14:11
It seems MergeHub is the simplest way to solve my task:
def infoSource() = Source.fromPublisher(DB.readOnlyStream(sql"exec dbo.GetInfo".map(Info.apply).iterator))
  val sink = MergeHub.source[Info].to(Sink.foreach(println)).run()

  Source.tick(Duration.Zero, 5.seconds, ())
    .runForeach(_ => infoSource().runWith(sink))
attacking dynamically created sources to single sink.
cool :)
Marc Piechura
@marcpiechura
Feb 20 2018 14:14
But that will not backpressure the tick Source
Kris Schepers
@schepersk
Feb 20 2018 15:21
Hi, are there any decent examples on how PersistenceQuery can be used? Or better, can it be used for just simply creating a deserialized list of events (to use in a read model rebuild scenario)?
Or am I better of doing this myself?
Vasily Kirichenko
@vasily-kirichenko
Feb 20 2018 17:04
@Silv3rcircl3 how so?...
let me try to check this
Marc Piechura
@marcpiechura
Feb 20 2018 17:28
Because inside the foreach you’re only creating the stream which connects to the hub, but you’re not waiting until the stream has finished. So you need mapAsync after Tick and in mapAsync you create the stream and wait for the materialzed future
Martin Clode
@MClode
Feb 20 2018 17:31
Anyone have a working example of using Akka.Persistence.Cassandra? I have tried to write an F# script but having trouble and can't find anything after Googling for a while :)
Vasily Kirichenko
@vasily-kirichenko
Feb 20 2018 18:10
@Silv3rcircl3 I'm stuck trying to add a Future so that it completes when infoSource() completes :(
it's materialized to NotUsed.
How to change it to Future?
Marc Piechura
@marcpiechura
Feb 20 2018 18:38
@vasily-kirichenko I‘m not 100% sure but foreach should materialize into a future and runWith should materialize into a Tuple[matLeft, matRight] or whatever tuple is in scala ;) so theoretically you should get a Tuple[NotUsed, Future] from runWith and use Tuple.Item2
But I can try it myself later
goti2
@goti2
Feb 20 2018 18:39
hi everyone, I need help with akka.net and xamarin
Marc Piechura
@marcpiechura
Feb 20 2018 18:40
Otherwise you need to use some ob the xxxMat overloads, toMat with Keep.Right for example
goti2
@goti2
Feb 20 2018 18:40
I can not create a actor system in the xamarin application
Somebody had experience with the Xamarin
Onur Gumus
@OnurGumus
Feb 20 2018 18:42
@goti2 perhaps if you can mention more detail about the error someone might help. (P.S I have never used xamarin myself)
goti2
@goti2
Feb 20 2018 18:43
@OnurGumus
I just can not create an actor system in the application, the application just hangs
white screen and ...
Vasily Kirichenko
@vasily-kirichenko
Feb 20 2018 18:56
@Silv3rcircl3 I managed to make it working
def source() =
    Source.fromIterator(() => Iterator.range(0, 10))
      .throttle(1, 1.second, 0, ThrottleMode.Shaping)

  val sink = MergeHub.source[Int].to(Sink.foreach(println)).run()

  Source.tick(Duration.Zero, 1.seconds, ())
    .mapAsync(1) { _ =>
      source()
        .watchTermination()(Keep.right)
        .to(sink)
        .run()
    }.runWith(Sink.ignore)
Marc Piechura
@marcpiechura
Feb 20 2018 19:01
yeah watchTermination would work too :+1:
Vasily Kirichenko
@vasily-kirichenko
Feb 20 2018 19:01
what's the other way?
just curious
Marc Piechura
@marcpiechura
Feb 20 2018 19:02
I think you could use toMat(sink, Keep.right) in both places
In order to get the future from Sink.foreach
Vasily Kirichenko
@vasily-kirichenko
Feb 20 2018 19:03
but sink is long living
waiting it's completion inside mapAsync would block the stream forever
so we need a Future which completes when source() completes. I don't see a way but watchTermination
do you?
Marc Piechura
@marcpiechura
Feb 20 2018 19:07
Give me a sec, I need to have the code in front of me ;-)
Vasily Kirichenko
@vasily-kirichenko
Feb 20 2018 19:25
sorry for scala, btw
Marc Piechura
@MarcPiechura_twitter
Feb 20 2018 19:32
no problem ;) and you're right, it's not working with toMat
I thought MergeHub.Source would materialize into Sink<T, Future<NotUsed>> or something
then it would have worked because every new materialization in mapAsync would have created a new task
Vasily Kirichenko
@vasily-kirichenko
Feb 20 2018 19:36
Ah, I see.
Thanks.
Marc Piechura
@MarcPiechura_twitter
Feb 20 2018 19:48

but one question, what's wrong with SelectAsync ? I tried it and it worked as I would expect

            var source = Source.From(Enumerable.Range(0, 10))
                .Throttle(1, TimeSpan.FromSeconds(1), 0, ThrottleMode.Shaping)
                .ToMaterialized(Sink.Seq<int>(), Keep.Right);


            var tick = Source.Tick(TimeSpan.Zero, TimeSpan.FromSeconds(1), "")
                .Select(_ =>
                {
                    Console.WriteLine("TICK");
                    return _;
                })
                .SelectAsync(1, _ => source.Run(materializer))
                .SelectMany(x => x)
                .RunWith(Sink.ForEach<int>(Console.WriteLine), materializer);

Prints TICK then 10 seconds nothing (your db query) and then 1,2,3,4... and a new TICK