These are chat archives for akkadotnet/akka.net

21st
Feb 2018
Vasily Kirichenko
@vasily-kirichenko
Feb 21 2018 05:21
@MarcPiechura_twitter ah! I forget that its possible to materialize a source to seq. Yeah, your code is simpler.
Vasily Kirichenko
@vasily-kirichenko
Feb 21 2018 09:32
@MarcPiechura_twitter however, Sink.Seq accumulates all elements of the stream, then push it downstream :( AFAIKMergeHub.Source does not do that.
Marc Piechura
@marcpiechura
Feb 21 2018 09:37
But in reality that whould be your dB query which produces a list, no?
Vasily Kirichenko
@vasily-kirichenko
Feb 21 2018 09:50
yes
I can just do something like db.query("...").toList instead of creating a stream.
Marc Piechura
@MarcPiechura_twitter
Feb 21 2018 14:04
@vasily-kirichenko I found the one operation we missed ;)
var source = Source.From(Enumerable.Range(0, 10))
                .Throttle(1, TimeSpan.FromSeconds(0.5), 0, ThrottleMode.Shaping);

            var tick = Source.Tick(TimeSpan.Zero, TimeSpan.FromSeconds(1), "")
                .Select(_ =>
                {
                    Console.WriteLine("TICK");
                    return _;
                })
                .MergeMany(1, _=> source.MapMaterializedValue(__ => default(ICancelable)))
                .Throttle(1, TimeSpan.FromSeconds(1), 0, ThrottleMode.Shaping)
                .RunWith(Sink.ForEach<int>(Console.WriteLine), materializer);
so MergeMany does the same as the mergehub but directly in the stream ;)
I think it's flatMapMerge in scala
Vasily Kirichenko
@vasily-kirichenko
Feb 21 2018 17:05
@MarcPiechura_twitter ah, yes :)
Source.tick(Duration.Zero, 1 seconds, ())
    .flatMapMerge(1, _ => source().mapMaterializedValue(_ => ()))
    .runWith(Sink.foreach(println))
streams seem to have an answer to any problem :)
cool
BTW, it works as well:
 Source.tick(Duration.Zero, 1 seconds, ())
    .flatMapMerge(1, _ => source())
    .runWith(Sink.foreach(println))
why did you add MapMaterializedValue?
and even
  Source.tick(Duration.Zero, 1 seconds, ())
    .flatMapConcat(_ => source())
    .runWith(Sink.foreach(println))
:)
wow. it's to elegant to be true
Vasily Kirichenko
@vasily-kirichenko
Feb 21 2018 17:42
ha. Interesting observation:
def source() = Source(1 to 10)

  Source.tick(Duration.Zero, 1 seconds, ())
    .flatMapConcat(_ => {
      println("New source!")
      source()
    })
    .throttle(1, 1 seconds, 0, ThrottleMode.Shaping)
    .runWith(Sink.foreach(println))
New source!
1
2
3
4
5
6
7
8
9
New source!
10
1
2
3
4
5
6
7
8
9
New source!
10
1
2
3
4
5
6
...
ah, not so much interesting, the last element in a source is just delayed by the throttle
Marc Piechura
@marcpiechura
Feb 21 2018 17:49
@vasily-kirichenko in .net both sources need to have the same type for materialization. Could be a bug in the API because it doesn’t make much sense since the inner source is materialzed multiple times while the outer only once
I assume we simply missed a TMat2 back in the days ;-)
Or it’s some scala magic which does it automatically if the inner source has NotUsed 😜
Vasily Kirichenko
@vasily-kirichenko
Feb 21 2018 17:52
yeah, Scala's signature is def flatMapConcat[T, M](f: Out ⇒ Graph[SourceShape[T], M]): Repr[T], no idea what Repr[T] is :)
type Repr[+O] <: FlowOps[O, Mat] {
    type Repr[+OO] = FlowOps.this.Repr[OO]
    type Closed = FlowOps.this.Closed
}
omg how can I unsee it
Marc Piechura
@marcpiechura
Feb 21 2018 18:01
Doesn’t look too bad, try to take a look at some of the akka-http internals, that’s crazy shit ;-)
Bartosz Sypytkowski
@Horusiath
Feb 21 2018 21:09
@vasily-kirichenko scala DSL for akka streams is all packed with scala-specific language features - one of the harder places to port