Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Mateusz Górski
@goral09
separating those two concerns makes sense. Although for building the communication channel, as in examples for MVar, it looks easier to get of the ground with MVar than Ref + Deferred
Ionuț G. Stan
@igstan
"Concurrent Programming in ML" has probably one of the best coverage of these concurrency concepts. Really underated book, IMO.
Fabio Labella
@SystemFw
@goral09 I disagree
MVar gives a lot more ways to end up in deadlock
it's rare that you can get away with one MVar
you usually need multiple, and each can be empty or full multiple times, and there's blocking involved in each transition
so the states you need to think about are many many more
(you can probably achieve slightly more fine grained semantics)
Mateusz Górski
@goral09
hmm, deadlock? I thought that if can either put or read/take from it (waiting asyncly in both cases) the only deadlock I can think of is when all the clients are either read or write only
Fabio Labella
@SystemFw
@goral09 that's one MVar
Mateusz Górski
@goral09
yes
Fabio Labella
@SystemFw
you can't implement a queue with one MVar
Mateusz Górski
@goral09
single-element queue?
Fabio Labella
@SystemFw
a single-element queue is hardly a queue...
but sure, a single-element queue is easy to implement with MVar since an MVar basically is a single-element queue
Mateusz Górski
@goral09
worker#1 -> mvar.put(1)
worker#2 -> mvar.put(2)
…
worker#n -> mvar.put(n)

// they all block asyncly if mvar is non-empty
// clients reading forever
client#1 -> mvar.take
client#2 -> mvar.take
…
client#n -> mvar.take
I imagine if all workers and clients share the same mvar instance we get something that looks like a queue
Fabio Labella
@SystemFw
you mean take, not read there
but that depends on the semantics of reawakening
Mateusz Górski
@goral09
probably yes, I don't know the API
Fabio Labella
@SystemFw
but it still doesn't fit the bill imo
imagine an unbounded queue
I want workers to enqueue and keep going
Mateusz Górski
@goral09
yeah, not fit this design
Fabio Labella
@SystemFw
since one of the main usages of queue is decoupling producers speed and consumers speed
Mateusz Górski
@goral09
but this is good for back pressure
Fabio Labella
@SystemFw
what you have is synchronous
well, not even that
since the back pressure is fixed to one elem
Mateusz Górski
@goral09
that is true
Fabio Labella
@SystemFw
you want to able to control it
e.g. with a bounded queue
Mateusz Górski
@goral09
well, you can store a batch of elements in MVar
Fabio Labella
@SystemFw
which can be implement with a semaphore (also built on top of Ref + Deferred) on top of an unbounded queue
Mateusz Górski
@goral09
FYI I am not being a devil's advocate here, I am just trying to understand cases in which one can be better than other
Fabio Labella
@SystemFw
once you start going in that direction you'll see that a Ref + Deferred implementation is easier
@goral09 absolutely, this is a great conversation :)
(obviously there is a way to implement a queue with MVar, but it's fairly complicated, you can see examples in Parallel and Concurrent Programming in Haskell by Simon Marlow, and that implementation has some characteristics that are better than the queue we have, and others that are worse)
let me write down a simple queue with Ref and Deferred, to give you an idea
Mateusz Górski
@goral09
after playing with an example in my head I realize that MVar in itself is probably poor tool for back pressured queue
Fabio Labella
@SystemFw
the fs2 one has other concerns that make it harder to understand
Fabio Labella
@SystemFw
untested so it might be buggy, but this is the general idea
object Ex {
  import cats.implicits._
  import cats.effect._, concurrent._
  import cats.effect.implicits._

  /**
    * unbounded queue, `dequeue` semantically blocks on empty queue
    */
  trait Queue[F[_], A] {
    def enqueue(a: A): F[Unit]
    def dequeue: F[A]
  }
  object Queue {
    def unbounded[F[_]: Concurrent, A]: F[Queue[F, A]] =
      Ref[F]
        .of(Vector.empty[A] -> Vector.empty[Deferred[F, A]])
        .map { state =>
          new Queue[F, A] {
            def enqueue(a: A) =
              state.modify {
                case (elems, deqs) if deqs.isEmpty =>
                  (elems :+ a, deqs) -> ().pure[F]
                case (elems, deq +: deqs) =>
                  (elems, deqs) -> deq.complete(a).start.void
              }.flatten

            def dequeue =
              Deferred[F, A].flatMap { wait =>
                state.modify {
                  case (elems, deqs) if elems.isEmpty =>
                    (elems, deqs :+ wait) -> wait.get
                  case (e +: elems, deqs) =>
                    (elems, deqs) -> e.pure[F]
                }.flatten
              }
          }
        }
  }
}
Mateusz Górski
@goral09
really cool @SystemFw :thumbsup: thanks!
Fabio Labella
@SystemFw
if there are bugs, they might be me mixing up front and back of the queue or something, but the ideas you need are there
Mateusz Górski
@goral09
roger that
Fabio Labella
@SystemFw
deq.complete(a).start.void <-- arguably this is overkill and you could just deq.complete(a)
Mateusz Górski
@goral09
Thanks @SystemFw once again
Fabio Labella
@SystemFw
let me know if the implementation makes sense or you have further questions
Mateusz Górski
@goral09
it makes sense, sure.
it reminds me a bit of monix.Observer, it uses Future to signal the availability of the data