These are chat archives for akkadotnet/AkkaStreams

2nd
Sep 2017
Alex Valuyskiy
@alexvaluyskiy
Sep 02 2017 11:13
@Silv3rcircl3 I need your help with my Kafka plugin, because right now I blocked
I have some questions
1) Should I execute each EventHandler inside AsyncCallback?
2) How often should I poll the data in TimerGraphStageLogic?
3) Maybe 3 seconds is not enough for tests, that's why they are falling
4) Reactive Scala uses assertAllStagesStopped inside all tests. Why we need it, and how I could use in on .NET?
Bartosz Sypytkowski
@Horusiath
Sep 02 2017 11:47
@alexvaluyskiy
  1. Essentially you pick asyncCallback = GetAsyncCallback<MyEvent>(yourHandlingLogic) and then invoke that action inside event handler like event += (sender, args) => asyncCallback(AsMyEvent(sender, args)). Async callbacks are used as wrappers around your logic, so that you action is not invoked immediately, but the invocation point is being managed by akka materializer instead. Think of it like GetAsyncCallback takes a message handler as a parameter and return a mailbox/actorRef Send method.
  2. Maybe go with reactive-kafka poll-interval default? 50ms
  3. dunno
  4. AssertAllStagesStopped tries to stop streaming at the end of each run, so you won't end up hogging resources/connections from test to test
Alex Valuyskiy
@alexvaluyskiy
Sep 02 2017 11:57
@Horusiath Yes, I can poll each 50ms, but what is happen if a downstream can consume only 50 msg/sec, but Kafka produces 500 msg per sec, I will get OutOfMemoryException
Bartosz Sypytkowski
@Horusiath
Sep 02 2017 11:58
tbh. I think that simple prefetching (no timer logic) is the way to go
i.e. if consumer ask for X messages, prefetch 4 * X from kafka. Once the buffer will go down as result of more and more of downstream requests (i.e. if it has only X messages left), than ask Kafka again to enough messages to fill 4 * X buffer
does that make sense?
Alex Valuyskiy
@alexvaluyskiy
Sep 02 2017 12:08
Confluent.Kafka nightly version has Pause/Resume methods
Maybe we could reuse them?