Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Fabio Labella
@SystemFw
you mean take, not read there
but that depends on the semantics of reawakening
Mateusz Górski
@goral09
probably yes, I don't know the API
Fabio Labella
@SystemFw
but it still doesn't fit the bill imo
imagine an unbounded queue
I want workers to enqueue and keep going
Mateusz Górski
@goral09
yeah, not fit this design
Fabio Labella
@SystemFw
since one of the main usages of queue is decoupling producers speed and consumers speed
Mateusz Górski
@goral09
but this is good for back pressure
Fabio Labella
@SystemFw
what you have is synchronous
well, not even that
since the back pressure is fixed to one elem
Mateusz Górski
@goral09
that is true
Fabio Labella
@SystemFw
you want to able to control it
e.g. with a bounded queue
Mateusz Górski
@goral09
well, you can store a batch of elements in MVar
Fabio Labella
@SystemFw
which can be implement with a semaphore (also built on top of Ref + Deferred) on top of an unbounded queue
Mateusz Górski
@goral09
FYI I am not being a devil's advocate here, I am just trying to understand cases in which one can be better than other
Fabio Labella
@SystemFw
once you start going in that direction you'll see that a Ref + Deferred implementation is easier
@goral09 absolutely, this is a great conversation :)
(obviously there is a way to implement a queue with MVar, but it's fairly complicated, you can see examples in Parallel and Concurrent Programming in Haskell by Simon Marlow, and that implementation has some characteristics that are better than the queue we have, and others that are worse)
let me write down a simple queue with Ref and Deferred, to give you an idea
Mateusz Górski
@goral09
after playing with an example in my head I realize that MVar in itself is probably poor tool for back pressured queue
Fabio Labella
@SystemFw
the fs2 one has other concerns that make it harder to understand
Fabio Labella
@SystemFw
untested so it might be buggy, but this is the general idea
object Ex {
  import cats.implicits._
  import cats.effect._, concurrent._
  import cats.effect.implicits._

  /**
    * unbounded queue, `dequeue` semantically blocks on empty queue
    */
  trait Queue[F[_], A] {
    def enqueue(a: A): F[Unit]
    def dequeue: F[A]
  }
  object Queue {
    def unbounded[F[_]: Concurrent, A]: F[Queue[F, A]] =
      Ref[F]
        .of(Vector.empty[A] -> Vector.empty[Deferred[F, A]])
        .map { state =>
          new Queue[F, A] {
            def enqueue(a: A) =
              state.modify {
                case (elems, deqs) if deqs.isEmpty =>
                  (elems :+ a, deqs) -> ().pure[F]
                case (elems, deq +: deqs) =>
                  (elems, deqs) -> deq.complete(a).start.void
              }.flatten

            def dequeue =
              Deferred[F, A].flatMap { wait =>
                state.modify {
                  case (elems, deqs) if elems.isEmpty =>
                    (elems, deqs :+ wait) -> wait.get
                  case (e +: elems, deqs) =>
                    (elems, deqs) -> e.pure[F]
                }.flatten
              }
          }
        }
  }
}
Mateusz Górski
@goral09
really cool @SystemFw :thumbsup: thanks!
Fabio Labella
@SystemFw
if there are bugs, they might be me mixing up front and back of the queue or something, but the ideas you need are there
Mateusz Górski
@goral09
roger that
Fabio Labella
@SystemFw
deq.complete(a).start.void <-- arguably this is overkill and you could just deq.complete(a)
Mateusz Górski
@goral09
Thanks @SystemFw once again
Fabio Labella
@SystemFw
let me know if the implementation makes sense or you have further questions
Mateusz Górski
@goral09
it makes sense, sure.
it reminds me a bit of monix.Observer, it uses Future to signal the availability of the data
Fabio Labella
@SystemFw
I originally called Deferred Promise
the main difference is that a scala.Promise encapsulates Either[Throwable, A]
whereas Deferred encapsulates an A, you have to introduce the possibility of failure explicitly
if you give me 15 mins I can show you a queue based on MVar (translating from Haskell)
Mateusz Górski
@goral09
it's ok , you don't have to :) thanks
so Deferred doesn't have the notion of failure? no TimeoutException, no ThreadInterruptedException etc
Fabio Labella
@SystemFw
well, it has the notion of failure as in MemoryError or something
but it doesn't have a user defined failure in it
if you want to achieve that
you have to create a Deferred[F, Either[Throwable, A]]
and then fa.attempt.flatTap(deferred.complete) and deferred.get.rethrow
and you get back the behaviour of a scala Promise (but pure)
Mateusz Górski
@goral09
got it
it's really cool like from this small, orthogonal pieces you can create bigger blocks
Paulius Imbrasas
@CremboC
oh shit
wrong channel, sorry. Gitter is really terrible sometimes