Aaronontheweb on dev
modified PingPong / RemotePingP… (compare)
IScheduler.Advanced
to call a method
AwaitAssert
functionality in TestKit, having a bit of trouble undestanding if I am using it right. The strange thing is that when I run just one single test using AwaitAssert
the test passes, however, if I run that test with multiple other tests (like in a suite) the test fails after the specified duration. Nothing is shared between tests and I am using Akka.TestKit.Xunit2
I have a question about running Akka streams in transactional manner, when success or failure can trigger some acknowedgement process. Let's say I have a simple stream:
var source = Source.From(Enumerable.Range(1, 10));
var flow = Flow.FromFunction(new Func<int, int>(x =>
{
if (x == 5)
throw new ArgumentException();
return x*2;
})).WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider));
var writeSink = Sink.ForEach<int>(Console.WriteLine);
var runnable = source.Via(flow).To(writeSink);
runnable.Run(materializer);
As you can see, when the flow encounters an element 5, it will raise an exception. How can this exception trigger some negative acknowledgemen (i.e. nack a queue message with certain tag)? And if the message flow through the whole stream, how can messaging processing completing trigger a positive acknowledgement? Of course I would like to run such acknowledgement outside individual stream elements.
Recover*
extension methods. However if you need to ack/nack messages back to source, there are few different options for that. I think, that the most stream-idiomatic is to use BidiFlows: they can be used to create a duplex request-response stream. For example, on the JVM a whole akka-http module is based on bidi flows.