Ref[F, State[A]]
as your main component
@ChristopherDavenport do you mean
, so that's why there is no F[_] parameter, it's normally in the outer definition
trait Thing[F[_], A] {
def get: F[A]
}
object Thing {
def create[F[_]: Concurrent: Timer, A](
action: F[A],
t: FiniteDuration
): Resource[F, Thing[F, A]] = {
sealed trait State
case class FirstCall(waitV: Deferred[F, A]) extends State
case class NextCalls(v: A) extends State
val initial = for {
d <- Deferred[F, A]
r <- Ref[F].of(FirstCall(d): State)
} yield r
Stream
.eval(initial)
.flatMap { state =>
val read = new Thing[F, A] {
def get = state.get.flatMap {
case FirstCall(wait) => wait.get
case NextCalls(v) => v.pure[F]
}
}
val write = action.flatMap { v =>
state.modify {
case FirstCall(wait) => NextCalls(v) -> wait.complete(v).void
case NextCalls(_) => NextCalls(v) -> ().pure[F]
}.flatten
}
Stream.emit(read).concurrently(Stream.repeatEval(write).metered(t))
}
.compile
.resource
.lastOrError
}
}
something along these lines
cats
instead.
unattempt
import cats.effect.concurrent.{Deferred, Ref}
import cats.effect.{Concurrent, Resource, Timer}
import cats.implicits._
import fs2.Stream
import scala.concurrent.duration._
trait PeriodicMonitor[F[_], A] {
def get: F[A]
}
object PeriodicMonitor {
def create[F[_]: Concurrent: Timer, A](
action: F[A],
t: FiniteDuration
): Resource[F, PeriodicMonitor[F, A]] = {
sealed trait State
case class FirstCall(waitV: Deferred[F, Either[Throwable, A]]) extends State
case class NextCalls(v: Either[Throwable, A]) extends State
val initial: F[Ref[F, State]] =
for {
d <- Deferred[F, Either[Throwable, A]]
r <- Ref[F].of(FirstCall(d): State)
} yield r
Stream
.eval(initial)
.flatMap { state: Ref[F, State] =>
val read = new PeriodicMonitor[F, A] {
def get: F[A] = state.get.flatMap {
case FirstCall(wait) => wait.get.rethrow
case NextCalls(v) => v.pure[F].rethrow
}
}
val write: F[Unit] = action.attempt.flatMap { v =>
state.modify {
case FirstCall(wait) => NextCalls(v) -> wait.complete(v).void
case NextCalls(_) => NextCalls(v) -> ().pure[F]
}.flatten
}
Stream.emit(read).concurrently(Stream.repeatEval(write).metered(t))
}
.compile
.resource
.lastOrError
}
}
Error
state to the State
ADT. I think what this means is that if the monitor encounters an error, it keeps working - there's no permanent transition from FirstCall
or NextCalls
to Error
, and the stream continues processing. If the caller of PeriodicMonitor.get
notices an error (raised by the reader), then the caller can take action on that. However, if I understand this code properly, it's possible the action
can fail, be written, and then recover and be overwritten with a subsequent successful value. I think this is not a bad property, although it wasn't what I originally set out to do.
hello everyone - any suggestions as to why I am not getting the same behaviour if I replace this
def life(b: Board): IO[Unit] =
for {
_ <- cls
_ <- showCells(b)
_ <- life(nextgen(b))
} yield ()
with this
def life(b: Board): IO[Unit] =
cls *> showCells(b) *> life(nextgen(b))
cls and showCells both return IO[Unit]
Am I losing the trampolining in the second version?
https://typelevel.org/cats-effect/datatypes/io.html#stack-safety
https://github.com/typelevel/cats-effect/blob/master/core/shared/src/main/scala/cats/effect/IO.scala#L759
*>
is before IO even executes, it takes a strict argument