These are chat archives for akkadotnet/AkkaStreams

20th
May 2016
Marc Piechura
@marcpiechura
May 20 2016 08:55
@Danthar maybe you want to wait until akkadotnet/akka.net#1967 and akkadotnet/akka.net#1966
Are merged before you push the next docs, they fix your api remarks.
Arjen Smits
@Danthar
May 20 2016 10:31
Ah cool!. I saw the issues coming by on email and wondered just that.
Thanks for the update
Why arent they merged anyway ?
Marc Piechura
@marcpiechura
May 20 2016 10:34
The second one still needs a review, don't know why @cconstantin didn't merged the first one, maybe the build wasn't finished after his review
But I'm trying to avoid to merge my own PR ;)
Arjen Smits
@Danthar
May 20 2016 10:34
yeah the #1966 one has some windows unit test failures, but afaik they are unrelated
I just yolo'd it in ;)
Marc Piechura
@marcpiechura
May 20 2016 10:34
yep
:+1:
Arjen Smits
@Danthar
May 20 2016 10:35
the #1967. Whats the impact? I would guess and say its none, since streams isn't used anywhere yet
besides me/us having to recheck the code samples to make sure they are still current
Marc Piechura
@marcpiechura
May 20 2016 10:36
no impact at all, all code runs just as before, but you didn't need to call Flow.FromGraph(someOtherGraph.Async())
now Async returns a Flow instead of IGraph
and all the other methods too like Named, WithAttributes ...
Arjen Smits
@Danthar
May 20 2016 10:37
yup thats what i thought.
Going to merge that in then.
ow wait. Merge conflicts
you have to resolve those first ;)
Marc Piechura
@marcpiechura
May 20 2016 10:38
ah yeah form the other PR :)
Arjen Smits
@Danthar
May 20 2016 10:38
rebasing should fix it
Marc Piechura
@marcpiechura
May 20 2016 10:49
done
Arjen Smits
@Danthar
May 20 2016 11:08
build server is taking its sweet time today
Arjen Smits
@Danthar
May 20 2016 11:29
@Silv3rcircl3 GraphBalanceSpec.A_Balance_must_support_waiting_for_demand_from_all_non_cancelled_downstream_subscriptions is failing
Marc Piechura
@marcpiechura
May 20 2016 11:30
It's a racy one, you can ignore it for now
Marc Piechura
@marcpiechura
May 20 2016 12:46
@Danthar can you merge akkadotnet/getakka.net#165 in?
I don't have the rights ;)
Arjen Smits
@Danthar
May 20 2016 12:47
sheesh. Where do you get the time :P are you a student or something ?
Marc Piechura
@marcpiechura
May 20 2016 12:50
hehe nop, let's say it this way, I'm a little bit unchallenged at my current job and therefore have some time left ;-) that's also the reason why I'm searching for a new position
Arjen Smits
@Danthar
May 20 2016 12:51
ah i see. Well best of luck on your search then :)
Marc Piechura
@marcpiechura
May 20 2016 12:51
thx
Arjen Smits
@Danthar
May 20 2016 13:20
hah sweet
   var source = Source.From(Enumerable.Range(1, 100))
                   .Select(i => { Console.WriteLine($"A: {i}"); return i; }).Async()
                   .Select(i => { Console.WriteLine($"B: {i}"); return i; }).Async()
                   .Select(i => { Console.WriteLine($"C: {i}"); return i; }).Async()
                    .RunWith(Sink.Ignore<int>(), materializer);
looks much better
Arjen Smits
@Danthar
May 20 2016 13:25
Got an issue porting this sample:
case class Tick()

RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
  import GraphDSL.Implicits._

  // this is the asynchronous stage in this graph
  val zipper = b.add(ZipWith[Tick, Int, Int]((tick, count) => count).async)

  Source.tick(initialDelay = 3.second, interval = 3.second, Tick()) ~> zipper.in0

  Source.tick(initialDelay = 1.second, interval = 1.second, "message!")
    .conflateWithSeed(seed = (_) => 1)((count, _) => count + 1) ~> zipper.in1

  zipper.out ~> Sink.foreach(println)
  ClosedShape
})
the way i understood it was that the sources which are ~> into zipper inlets need to be added to the builder beforehand
So that works
But the zipper.out ~> Sink.foreach does not match signatures
I now have b.From(zipper.Out).To<int,ClosedShape,NotUsed>(Sink.ForEach<int>(Console.WriteLine));
Marc Piechura
@marcpiechura
May 20 2016 13:27
Problem is that Sink.Foreach has a Task as materialized value but NotUsed is expected, so you need to change that via sink.MapMaterialzedValue(_ => NotUsed.Instance)
Arjen Smits
@Danthar
May 20 2016 13:29
b.From(zipper.Out).To<int,ClosedShape,NotUsed>(Sink.ForEach<int>(Console.WriteLine).MapMaterializedValue(_ => NotUsed.Instance));
Marc Piechura
@marcpiechura
May 20 2016 13:29
and remove the generic arguments from the to
Arjen Smits
@Danthar
May 20 2016 13:30
hmm yeah i added to force the resolve. And to better understand whats going on. But iv'e since come to realise that those arguments are only there to satisfy some type constraints, hence the NotUsed. The only type that really matters in this is the int
and the result given back
hm it seems my sample is incorrect:
   var graph = RunnableGraph.FromGraph(GraphDsl.Create(b => {
                    // this is the asynchronous stage in this graph
                    var zipper = b.Add(ZipWith.Apply<Tick,int,int>((tick, count) => count).Async());

                    var s = b.Add(Source.Tick(TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(3), new Tick()).Async());
                    b.From(s).To(zipper.In0);

                    var s2 = b.Add(Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1), "message!")
                        .ConflateWithSeed(seed => 1, (count, _) => count + 1));

                    b.From(s2).To(zipper.In1);

                    b.From(zipper.Out).To(Sink.ForEach<int>(i => Console.WriteLine($"test: {i}")).MapMaterializedValue(_ => NotUsed.Instance));

                    return ClosedShape.Instance;
                }));
                graph.Run(materializer);
this produces no output
Marc Piechura
@marcpiechura
May 20 2016 13:36
Have you waited 3 seconds?
Arjen Smits
@Danthar
May 20 2016 13:37
ah that second async is wrong
but yeah i did. Nothing
Marc Piechura
@marcpiechura
May 20 2016 13:38
Are you running it inside a test? :)
Arjen Smits
@Danthar
May 20 2016 13:39
nope. simple console app. Wrapped in an actor system and an using (var materializer = system.Materializer())
oe that might be it now that i think about it
the graph.run is nonblocking right
Marc Piechura
@marcpiechura
May 20 2016 13:39
yeah right
Arjen Smits
@Danthar
May 20 2016 13:40
yup that was it
one Console.ReadLine(); after the graph.run did the trick
Marc Piechura
@marcpiechura
May 20 2016 13:40
👍
Arjen Smits
@Danthar
May 20 2016 13:40
runnable graphs are not awaitable
returns NotUsed
Marc Piechura
@marcpiechura
May 20 2016 13:43
yep, you would need to add the sink via GraphDsl.Create(sink, (b,s)=>...) that creates a graph that materializes a Task that you then can await
Arjen Smits
@Danthar
May 20 2016 13:44
its interesting how the JVM samples never mention this stuff
Marc Piechura
@marcpiechura
May 20 2016 13:48
It's a general behavior, all Run methods are only materializing the stream and then return
Arjen Smits
@Danthar
May 20 2016 13:57
how does this translate? .conflateWithSeed(Seq(_))(_ :+ _)
Arjen Smits
@Danthar
May 20 2016 14:05
.ConflateWithSeed(_ => _, (agg, acc) => { return agg += acc; })
    val statsFlow = Flow[Double]
      .conflateWithSeed(Seq(_))(_ :+ _)
      .map { s =>
        val μ = s.sum / s.size
        val se = s.map(x => pow(x - μ, 2))
        val σ = sqrt(se.sum / se.size)
        (σ, μ, s.size)
      }
This sample is wierd. We conflate on a flow of doubles, and have a dummy aggregate function
to follow that up with a map statement
which receives the entire collection, instead of a single element. So it does not map to a Select
Marc Piechura
@marcpiechura
May 20 2016 14:32
ConflateWithSeed adds the incoming elements to a list and when the map stage can handle another element it's emitting the complete list
It's like summary if your map stage is slower then the producer