cats
instead.
unattempt
import cats.effect.concurrent.{Deferred, Ref}
import cats.effect.{Concurrent, Resource, Timer}
import cats.implicits._
import fs2.Stream
import scala.concurrent.duration._
trait PeriodicMonitor[F[_], A] {
def get: F[A]
}
object PeriodicMonitor {
def create[F[_]: Concurrent: Timer, A](
action: F[A],
t: FiniteDuration
): Resource[F, PeriodicMonitor[F, A]] = {
sealed trait State
case class FirstCall(waitV: Deferred[F, Either[Throwable, A]]) extends State
case class NextCalls(v: Either[Throwable, A]) extends State
val initial: F[Ref[F, State]] =
for {
d <- Deferred[F, Either[Throwable, A]]
r <- Ref[F].of(FirstCall(d): State)
} yield r
Stream
.eval(initial)
.flatMap { state: Ref[F, State] =>
val read = new PeriodicMonitor[F, A] {
def get: F[A] = state.get.flatMap {
case FirstCall(wait) => wait.get.rethrow
case NextCalls(v) => v.pure[F].rethrow
}
}
val write: F[Unit] = action.attempt.flatMap { v =>
state.modify {
case FirstCall(wait) => NextCalls(v) -> wait.complete(v).void
case NextCalls(_) => NextCalls(v) -> ().pure[F]
}.flatten
}
Stream.emit(read).concurrently(Stream.repeatEval(write).metered(t))
}
.compile
.resource
.lastOrError
}
}
Error
state to the State
ADT. I think what this means is that if the monitor encounters an error, it keeps working - there's no permanent transition from FirstCall
or NextCalls
to Error
, and the stream continues processing. If the caller of PeriodicMonitor.get
notices an error (raised by the reader), then the caller can take action on that. However, if I understand this code properly, it's possible the action
can fail, be written, and then recover and be overwritten with a subsequent successful value. I think this is not a bad property, although it wasn't what I originally set out to do.
hello everyone - any suggestions as to why I am not getting the same behaviour if I replace this
def life(b: Board): IO[Unit] =
for {
_ <- cls
_ <- showCells(b)
_ <- life(nextgen(b))
} yield ()
with this
def life(b: Board): IO[Unit] =
cls *> showCells(b) *> life(nextgen(b))
cls and showCells both return IO[Unit]
Am I losing the trampolining in the second version?
https://typelevel.org/cats-effect/datatypes/io.html#stack-safety
https://github.com/typelevel/cats-effect/blob/master/core/shared/src/main/scala/cats/effect/IO.scala#L759
*>
is before IO even executes, it takes a strict argument
better-monadic-for
, since those yield ()
get transformed into a massive chain of map
calls
*>
, and that's >>
*>
in ZIO is implemented with flatMap
, so it's literally >>
Ref
, but they are all bound to their own thread, so the discrete steps that comprise the fiber aren't necessarily interleaved. Would you consider that to be purely parallelism or is there something to be said about concurrency as well
Concurrency is not parallelism
on the Go (!) blog, uses a very similar one
a < b
or b < a
(where <
in this case is happened before), and therefore a
and b
are concurrent
happens before
is more general (Lamport really is a genius), but doesn't help with the aspect that matters of the talk, i.e. the logical thread as a structuring abstraction