These are chat archives for akkadotnet/AkkaStreams

10th
Jul 2016
Boban
@bobanco
Jul 10 2016 01:46
Hello Guys, can someone help me how to translate this scala dsl to .net?
 val graph = FlowGraph.closed() { implicit b =>
    import FlowGraph.Implicits._

    val broadcast = b.add(Broadcast[ByteString](2))

    val logWindowFlow = Flow[ByteString]
      .groupedWithin(10000, 1.seconds)
      .map(group => group.map(_.size).foldLeft(0)(_ + _))
      .map(groupSize => logger.info(s"Sent $groupSize bytes"))

    linesSource ~> broadcast ~> serverConnection  ~> logCompleteSink
                   broadcast ~> logWindowFlow     ~> Sink.ignore
  }
Boban
@bobanco
Jul 10 2016 02:53
this is how i have translated to:
var system = ActorSystem.Create("reactive-akka");

            var mat = system.Materializer();

            var serverConnection = new Tcp().Apply(system).OutgoingConnection("localhost", 9181);

            Func<string[]> getLines = () => File.ReadAllLines("2008.csv");

            var linesSource = Source.From(getLines()).Select(line => ByteString.FromString(line + "\n"));

            var logCompletedSink = Sink.OnComplete<ByteString>(() => Console.WriteLine("Completed with "),
                exception => Console.WriteLine(exception));

            var mySink = Sink.Ignore<int>();

            var graph = RunnableGraph.FromGraph(GraphDsl.Create(b =>
            {
                var broadcast = b.Add(new Broadcast<ByteString>(2, false));

                var logWindowFlow = Flow.Create<ByteString>()
                    .GroupedWithin(10000, TimeSpan.FromSeconds(1))
                    .Select(group => group.Select(_ => _.Count).Aggregate(0, (a, c) => a + c));


                b.From(linesSource).Via(broadcast).Via(serverConnection.Shape).To(logCompletedSink);
                b.From(linesSource)
                    .Via(broadcast)
                    .Via(logWindowFlow)
                    .To(Sink.ForEach<int>(groupSize => Console.WriteLine("Sent {0} bytes", groupSize)).Shape);


                return ClosedShape.Instance;
            }));

            graph.Run(mat);
Marc Piechura
@marcpiechura
Jul 10 2016 08:13
@bobanco don't use connection.Shape, only use connection and the second From is in incorrect,you need to use broadcast there as From parameter.
Also keep in mind that this will not work since the TCP part is not working in akka streams yet
I assume you used .Shape because you got a type conflict for the materialized value, if so, use .MapMaterialzedValue(_=> NotUsed.Instance) to change it
Marc Piechura
@marcpiechura
Jul 10 2016 08:24
We need to wait for akkadotnet/akka.net#2108 before we continue on TCP support in Akka streams
Marc Piechura
@marcpiechura
Jul 10 2016 21:02
@cconstantin @Horusiath have you thought about updating streams to the latest jvm version? 2.4.2 is already 5 month old