Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Feb 01 10:11
    @SystemFw banned @Hudsone_gitlab
  • Jan 31 2019 04:19
    404- forked
    404-/fs2
  • Jan 31 2019 03:01
    SethTisue commented #1232
  • Jan 30 2019 17:22
  • Jan 30 2019 13:45
  • Jan 30 2019 10:48
    pchlupacek commented #1406
  • Jan 30 2019 10:47
    pchlupacek commented #1406
  • Jan 30 2019 10:39
    pchlupacek commented #1407
  • Jan 30 2019 09:58
    lJoublanc commented #870
  • Jan 30 2019 09:42
    vladimir-popov commented #1407
  • Jan 30 2019 08:10
    vladimir-popov closed #1407
  • Jan 30 2019 08:10
    vladimir-popov commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 18:57
    SystemFw commented #1406
  • Jan 29 2019 17:47
    pchlupacek commented #1406
  • Jan 29 2019 17:42
    pchlupacek commented #1406
  • Jan 29 2019 17:39
    pchlupacek commented #1407
  • Jan 29 2019 17:39
    vladimir-popov edited #1407
  • Jan 29 2019 17:38
    vladimir-popov commented #1406
Fabio Labella
@SystemFw
and the problem is this: given a bunch of shards, process the elements in each shard sequentially, but different shards concurrently
peterstorm
@peterstorm:matrix.org
[m]
Yeah, that's exactly the problem
Fabio Labella
@SystemFw
or more precisely, given a stream of `key -> elements), and a desired number of shard, shard the elements (making sure the same key ends on the same shard), process the elements in each shard sequentially, but different shards concurrently
basically a mini kafka
but that's more complex that your use case, and it would require queues (and an approach roughly inline with that example, when fixed)
(I can write a working solution, if you need that and are struggling to do it yourself at this stage)
peterstorm
@peterstorm:matrix.org
[m]
I feel like I would be cheating then :D
But I am so interested in this stuff, hehe
The evalFilterAsync implementation is super many times slower than my regular one, by the way. Is that weird?
Fabio Labella
@SystemFw
what's your regular one?
ah, you mean just computing primes?
peterstorm
@peterstorm:matrix.org
[m]
Yeah
Fabio Labella
@SystemFw
that's not necessarily weird, because concurrency has a cost
peterstorm
@peterstorm:matrix.org
[m]
Yeah, makes sense
Fabio Labella
@SystemFw
the tools in fs2 are more useful when you have actual concurrency, rather than data parallelism
peterstorm
@peterstorm:matrix.org
[m]
Yeah, I'm not even sure I need concurreny in my use case, but I wanted to try and implement it, if it was needed, maybe weird way around
But I'd be really interested in your solution, if you have time?
I'll just use evalFilterAsync in my code, but this stuff fascinates me
Fabio Labella
@SystemFw
yeah I can write one
I'm going to assume that the logic to process a single message is (K, V) => F[Unit]
you can have more complex pattern, but the api gets more complex too
and the implementation is interesting even in this case
(also I'm not actually going to run it, so you'll have to play with it to see if it actually works ;) )
peterstorm
@peterstorm:matrix.org
[m]
Of course, thank you :D
Fabio Labella
@SystemFw
should be something like this
  def sharded[F[_]: Concurrent, K: Hash, V](
      input: Stream[F, (K, V)],
      process: (K, V) => F[Unit],
      numShards: Int
  ): Stream[F, Unit] =
    Stream
      .eval(Queue.noneTerminated[F, (K, V)]) // unbounded for simplicity
      .repeatN(numShards.toLong)
      .foldMap(Vector(_))
      .flatMap { shards =>
        val close = shards.traverse_(_.enqueue1(None))

        val writer = input
          .evalMap {
            case v @ (k, _) =>
              shards(k.hash % numShards).enqueue1(v.some)
          }
          .onFinalize(close)

        val readers =
          Stream
            .emits(shards)
            .map(_.dequeue.evalMap(process.tupled))
            .parJoinUnbounded

        readers concurrently writer
      }
peterstorm
@peterstorm:matrix.org
[m]
I'll try it out, thanks :D
And see if I can understand it, hehe
Fabio Labella
@SystemFw
let me know if it does work (and feel free to ask clarifying questions)
peterstorm
@peterstorm:matrix.org
[m]
Well, it seems to work :D I'll try and see if I can rewrite it to my usecase for learning purposes!
peterstorm
@peterstorm:matrix.org
[m]
Ok, its alot faster than just doing evalFilterAsync... how is that?? :D
Artem Nikiforov
@nikiforo

I have a separate question. :sweat_smile:
But this one is easily reproducible.

When searching solution to the previous question, I've tried to update from fs2 2.3.0 to 2.4.6
Behavior differs in this test
<...>
Is it supposed to differ?

I've bisected the difference:

8a7a13f4200648e9ec8a2e48f2d8641457e4e212 is the first bad commit
commit 8a7a13f4200648e9ec8a2e48f2d8641457e4e212
Author: Michael Pilquist <mpilquist@gmail.com>
Date:   Fri Jun 5 12:45:35 2020 -0400

    Fix scope leak with Stream.retry and in general, with pulls that do not consume all elements from source

:040000 040000 08176432576c76388ef2e6893039d63e8cd56fb7 ec2a4ee774b6b86d7216668793ad201fe45d2f5c M    core

Link to commit: typelevel/fs2@8a7a13f

I'm still not sure if it's a bug.

Artem Nikiforov
@nikiforo
I created typelevel/fs2#2191 , wrote there all information I'm aware of.
peterstorm
@peterstorm:matrix.org
[m]
@SystemFw: Hmmm, it seems though, that it does not emit the results in the correct way. I repurposed your implementation, and came up with this:
def calculatePrimePar2(
    i: Int,
    numShards: Int
  ): Stream[F, Int] =
    Stream
      .eval(Queue.noneTerminated[F, Int]) // unbounded for simplicity
      .repeatN(numShards.toLong)
      .foldMap(Vector(_))
      .flatMap { shards =>
        val close = shards.traverse_(_.enqueue1(None))

        val writer = Stream.range(0, i)
          .evalMap {
            case v  =>
              shards(v % numShards).enqueue1(v.some)
          }
          .onFinalize(close)

        val readers =
          Stream
            .emits(shards)
            .map(_.dequeue.evalFilter(algebra.calculateIsPrime(_)))
            .parJoinUnbounded

        readers concurrently writer
      }
And sometimes it interleaves primes
Fabio Labella
@SystemFw
wdym?
bear in mind that the behaviour of sharding in this context is "ordered and sequential within a shard, non-ordered and concurrent across shards" (this is how Kafka works, for example)
so it's not the same behaviour as mapAsync, it's a different set of tradeoffs
so you can't use it for computing all the primes concurrently while keeping order
peterstorm
@peterstorm:matrix.org
[m]
Ah ok, I think that was one of the requirements of the other implementation from profunktor
That it was sequential all the way
But thank you very much, I still learned alot trying to understand it
Fabio Labella
@SystemFw

think that was one of the requirements of the other implementation from profunktor

I see. Can't really follow that code too much, since as written it will non deterministically lose elements, as well as being essentially sequential (with no concurrency at all), when it doesn't lose them

but you can still look at mapAsync for "effects concurrently, results sequentially"
peterstorm
@peterstorm:matrix.org
[m]
Yeah, evalFilterAsync, in my case is what I need, and it works fine :)
It's just really slow compared to your version
Artem Nikiforov
@nikiforo

@nikiforo: nothing really jumps out - could you create a standalone repository that reproduces the issue?

nikiforo/fs-handle#1

Felix Palludan Hargreaves
@hejfelix
hey all, I have a protocol where I want to create some state after the first message (a session id), and use that for all following messages
I don't want to fold immediately, because that means carrying around an Option that I know is only None for 1 message
it's basically 2 layers of protocol where the 1st layer is only a handshake after which the stream should be processed with the 2nd layer