These are chat archives for akkadotnet/AkkaStreams

23rd
May 2016
Arjen Smits
@Danthar
May 23 2016 07:33
var statsFlow = Flow.Create<double>()
                    .ConflateWithSeed(_ => _, (agg, acc) => acc)
                    .Grouped(int.MaxValue)
                    .Select(s => {
                        var μ = s.Sum()/s.Count();

                        var se = s.Select(x => Math.Pow(x - μ, 2));
                        var σ = Math.Sqrt(se.Sum()/ se.Count());
                        return new { σ , μ , size=s.Count()};
                    });
@Silv3rcircl3 or @Horusiath can either of you confirm if the above sample translation is correct ?
Bartosz Sypytkowski
@Horusiath
May 23 2016 07:35
@Danthar starting from what?
Arjen Smits
@Danthar
May 23 2016 07:43
the sample above that one
val statsFlow = Flow[Double]
      .conflateWithSeed(Seq(_))(_ :+ _)
      .map { s =>
        val μ = s.sum / s.size
        val se = s.map(x => pow(x - μ, 2))
        val σ = sqrt(se.sum / se.size)
        (σ, μ, s.size)
      }
that one
Bartosz Sypytkowski
@Horusiath
May 23 2016 07:46
conflate with seed should be more like: .ConflateWithSeed(x => ImmutableList.Create(x), (acc, x) => acc.Add(x))
Arjen Smits
@Danthar
May 23 2016 07:47
oww. that might explain some things.
moemnt
Bartosz Sypytkowski
@Horusiath
May 23 2016 07:47
conflate is used to joining elements into one collection when buffer is overflowed
Arjen Smits
@Danthar
May 23 2016 07:47
moment
thats what i thought
I already found it wierd that i needed the grouped func
 var statsFlow = Flow.Create<double>()
                    .ConflateWithSeed(_ => ImmutableList.Create(_), (agg, acc) => agg.Add(acc))
                    .Select(s => {
                        var μ = s.Sum()/s.Count();

                        var se = s.Select(x => Math.Pow(x - μ, 2));
                        var σ = Math.Sqrt(se.Sum()/ se.Count());
                        return new { σ , μ , size=s.Count()};
                    });
thats it now
Bartosz Sypytkowski
@Horusiath
May 23 2016 07:49
it's ok now
Arjen Smits
@Danthar
May 23 2016 08:09
Do we have by any chance, an built-in type that mimics this:
Iterator.continually(_)
Its basically a generator which takes a func as valuegenerator
where _ in this case is the getNext func
Marc Piechura
@marcpiechura
May 23 2016 08:48
I don't think so
Arjen Smits
@Danthar
May 23 2016 08:53
how so ?
Not needed anywhere in the source ?
Marc Piechura
@marcpiechura
May 23 2016 08:54
I mean I don't think we have built in equivalent, but we have a Iterator class in Akka.Util that we use in some places in the code
You could maybe add it there
Arjen Smits
@Danthar
May 23 2016 08:56
akkadotnet/akka.net#1970
Marc Piechura
@marcpiechura
May 23 2016 09:27

@Danthar the code doesn't match the scala version, it's more like this

var lastFlow = Flow.Create<double>().Expand(_ => Enumerable.Repeat(_, int.MaxValue).GetEnumerator());

only difference is that continually repeats forever, but since expand is creating a new Enumerable every time upstream emits a new element it should be fine

Arjen Smits
@Danthar
May 23 2016 09:32
Somehow that does not fit with my mental model of how streams should work.
I would expect that Expand operators on the stream as a whole
instead of each individual item
Sure there are operations where this does not apply.
Marc Piechura
@marcpiechura
May 23 2016 09:36
maybe the problem is that the example doesn't make any sense :)
Arjen Smits
@Danthar
May 23 2016 09:36
i've had that thought with more of the samples in the docs
its one of those things that trips me up, as someone who is translating from scala to c# and discovering the streams api as i go.
Marc Piechura
@marcpiechura
May 23 2016 09:37
            var directoryImporter =
                Source.Tick(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(3), "Tick")
                    .Expand(_ => Directory.GetFiles("Your dir").ToList().GetEnumerator())
                    .RunForeach(Console.WriteLine, materializer);
does this make more sense for you?
Arjen Smits
@Danthar
May 23 2016 09:38
alot of the times im like. WTf is this, why does it do what it does.
ok let me take a stab at it
Marc Piechura
@marcpiechura
May 23 2016 09:39
upstream only emits a item every 3 seconds but downstream can run faster so you can use expand to provide more elements for the downstream then the upstream is actually producing
Arjen Smits
@Danthar
May 23 2016 09:39
every 3 seconds. enumerate a local dir, and output the names in the dir
yeah the difference in 'speed' between 'upstream' and 'downstream' is an interesting piece of behavior in the akka-streams lib
Marc Piechura
@marcpiechura
May 23 2016 09:59
so you're good or is it still unclear?
Arjen Smits
@Danthar
May 23 2016 11:32
no im good :)
Marc Piechura
@marcpiechura
May 23 2016 13:08
Btw there is another streams repository for add-on's that doesn't fit in the core repo https://github.com/akka/akka-stream-contrib