Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Sep 05 2019 14:43
    @typelevel-bot banned @jdegoes
  • Jan 31 2019 21:17
    codecov-io commented #484
  • Jan 31 2019 21:08
    scala-steward opened #484
  • Jan 31 2019 18:19
    andywhite37 commented #189
  • Jan 31 2019 02:41
    kamilongus starred typelevel/cats-effect
  • Jan 30 2019 00:01
    codecov-io commented #483
  • Jan 29 2019 23:51
    deniszjukow opened #483
  • Jan 29 2019 23:37
  • Jan 29 2019 23:22
  • Jan 29 2019 20:26
    Rui-L starred typelevel/cats-effect
  • Jan 29 2019 18:01
    jdegoes commented #480
  • Jan 29 2019 17:04
    thomaav starred typelevel/cats-effect
  • Jan 28 2019 17:43
    asachdeva starred typelevel/cats-effect
  • Jan 28 2019 07:12
    alexandru commented #480
  • Jan 28 2019 05:45
    codecov-io commented #482
  • Jan 28 2019 05:35
    daron666 opened #482
  • Jan 27 2019 13:56
    codecov-io commented #481
  • Jan 27 2019 13:46
    lrodero opened #481
  • Jan 27 2019 05:47
    codecov-io commented #460
  • Jan 27 2019 05:37
    codecov-io commented #460
Mark Tomko
@mtomko
I guess depending on how we use State[A] we could add a case class Unavailable(reason: Throwable) extends State[Nothing] and pass the error on to the consumer.
Fabio Labella
@SystemFw
trait Thing[F[_], A] {
  def get: F[A]
}
object Thing {
  def create[F[_]: Concurrent: Timer, A](
      action: F[A],
      t: FiniteDuration
  ): Resource[F, Thing[F, A]] = {
    sealed trait State
    case class FirstCall(waitV: Deferred[F, A]) extends State
    case class NextCalls(v: A) extends State

    val initial = for {
      d <- Deferred[F, A]
      r <- Ref[F].of(FirstCall(d): State)
    } yield r

    Stream
      .eval(initial)
      .flatMap { state =>
        val read = new Thing[F, A] {
          def get = state.get.flatMap {
            case FirstCall(wait) => wait.get
            case NextCalls(v) => v.pure[F]
          }
        }

        val write = action.flatMap { v =>
          state.modify {
            case FirstCall(wait) => NextCalls(v) -> wait.complete(v).void
            case NextCalls(_) => NextCalls(v) -> ().pure[F]
          }.flatten
        }

        Stream.emit(read).concurrently(Stream.repeatEval(write).metered(t))
      }
      .compile
      .resource
      .lastOrError

  }
}
something along these lines
only checked for compilation so might be buggy :P
also needs to deal with errors and interruption, but shouldn't be too hard depending on the semantics you want
Mark Tomko
@mtomko
okay, reading over it now!
Fabio Labella
@SystemFw
your comment about how to handle errors make sense btw, I would recommend that as the easiest approach (you need an Either in the Deferred too, in case the error happens in the first call)
for subsequent calls you have the choice of propagating the error to the consumer or leaving the old value there
Mark Tomko
@mtomko
right, both of those are fine options. I think in my case, propagating it to the caller might be right, but I could see leaving the old one in place and logging vehemently
also I'd presumably add some retry logic
oh, this is really interesting. I think I see what's going on and I wouldn't have thought of this.
Fabio Labella
@SystemFw
it's a technique that I find invaluable once complexity grows
since you can focus on deriving the transition function
and you transform, as far as possible, a concurrency problem into a data modelling problem (which is far easier to solve)
Mark Tomko
@mtomko
yeah. I think I'd been thinking the Thing class would contain an MVar[F, A] and it's get would translate to a read on the mvar.
Fabio Labella
@SystemFw
interruption safety is the biggest complication in general, but I hope my proposal for interruption in CE3 will address that
Mark Tomko
@mtomko
But here, everything's done in one place.
Fabio Labella
@SystemFw
let me edit a small but critical bug :)
done
Mark Tomko
@mtomko
thanks!
Fabio Labella
@SystemFw
:+1:
Mark Tomko
@mtomko
I am pretty sure I see what all is going on, thank you so much! I think I see how to add the error handling (as discussed) and I'll spend some time wrapping my heard around this. I have to step away, but I really appreciate your generous help. Be well, and thanks again!
Fabio Labella
@SystemFw
:) :wave:
Christopher Davenport
@ChristopherDavenport
How does the compiler know that F is correct outside that limited scope, I'm still missing an alignment.
Oh I see it's inside the F declaration.
Mark Tomko
@mtomko
Hmm. For F[_] that have MonadError[T, A] you can call .attempt to go from F[A] to F[Either[T, A]]. Is there an inverse operation that takes an F[Either[T, A]] and converts it to an F[A], raising T in the Left case?
maybe this belongs in cats instead.
Daniel Spiewak
@djspiewak
.rethrow
Mark Tomko
@mtomko
sweet, thanks!
that's better than what I was going to call it, unattempt
Daniel Spiewak
@djspiewak
:-)
Mark Tomko
@mtomko
Here it is with some error handling. I made a design choice which I think is debatable:
import cats.effect.concurrent.{Deferred, Ref}
import cats.effect.{Concurrent, Resource, Timer}
import cats.implicits._
import fs2.Stream

import scala.concurrent.duration._

trait PeriodicMonitor[F[_], A] {
  def get: F[A]
}

object PeriodicMonitor {

  def create[F[_]: Concurrent: Timer, A](
      action: F[A],
      t: FiniteDuration
  ): Resource[F, PeriodicMonitor[F, A]] = {
    sealed trait State
    case class FirstCall(waitV: Deferred[F, Either[Throwable, A]]) extends State
    case class NextCalls(v: Either[Throwable, A]) extends State

    val initial: F[Ref[F, State]] =
      for {
        d <- Deferred[F, Either[Throwable, A]]
        r <- Ref[F].of(FirstCall(d): State)
      } yield r

    Stream
      .eval(initial)
      .flatMap { state: Ref[F, State] =>
        val read = new PeriodicMonitor[F, A] {
          def get: F[A] = state.get.flatMap {
            case FirstCall(wait) => wait.get.rethrow
            case NextCalls(v)    => v.pure[F].rethrow
          }
        }

        val write: F[Unit] = action.attempt.flatMap { v =>
          state.modify {
            case FirstCall(wait) => NextCalls(v) -> wait.complete(v).void
            case NextCalls(_)    => NextCalls(v) -> ().pure[F]
          }.flatten
        }

        Stream.emit(read).concurrently(Stream.repeatEval(write).metered(t))
      }
      .compile
      .resource
      .lastOrError

  }
}
I didn't add an Error state to the State ADT. I think what this means is that if the monitor encounters an error, it keeps working - there's no permanent transition from FirstCall or NextCalls to Error, and the stream continues processing. If the caller of PeriodicMonitor.get notices an error (raised by the reader), then the caller can take action on that. However, if I understand this code properly, it's possible the action can fail, be written, and then recover and be overwritten with a subsequent successful value. I think this is not a bad property, although it wasn't what I originally set out to do.
Mark Tomko
@mtomko
I ran a very superficial test (no errors) and it seems to do what I expected.
Adam Rosien
@arosien
:stars:
Philip
@philipschwarz

hello everyone - any suggestions as to why I am not getting the same behaviour if I replace this

  def life(b: Board): IO[Unit] =
    for {
      _ <- cls
      _ <- showCells(b)
      _ <- life(nextgen(b))
    } yield ()

with this

  def life(b: Board): IO[Unit] =
    cls *> showCells(b) *> life(nextgen(b))

cls and showCells both return IO[Unit]
Am I losing the trampolining in the second version?
https://typelevel.org/cats-effect/datatypes/io.html#stack-safety
https://github.com/typelevel/cats-effect/blob/master/core/shared/src/main/scala/cats/effect/IO.scala#L759

Fabio Labella
@SystemFw
@philipschwarz it should work if you use >>
the problem with *> is before IO even executes, it takes a strict argument
btw the first version also has a potential memory problem if you don't use better-monadic-for, since those yield () get transformed into a massive chain of map calls
Philip
@philipschwarz
Hey @SystemFw - yes, >> works - thank you very much for your help and for the explanation, and the warning ! much appreciated :thumbsup: :smiley:
Philip
@philipschwarz
FWIW, just for the curious, in ZIO it looks like *> would work https://youtu.be/TWdC7DhvD8M?t=1199
image.png
ah, but probably only because the recursion is finite in that example
in my case I would have the same issue because the life function has no base case
Fabio Labella
@SystemFw
I don't think that's the main issue. *> in cats is strict, and changing it would require changing a lot of other things
in ZIO it's probably lazy
but we do have a lazy *>, and that's >>
in fact it looks like *> in ZIO is implemented with flatMap, so it's literally >>
Philip
@philipschwarz
yes, thank you, that makes sense - I thought it was maybe kind of interesting that the right shark had different behaviour (eager/lazy) in different libraries
Raas Ahsan
@RaasAhsan
@SystemFw in your Fibers talk, you mentioned that concurrency and parallelism are generally independent of each other. Based on your definitions of both terms, how would you characterize the interaction of fibers or threads whose effects or steps aren't necessarily interleaved? e.g. several fibers running simultaneously and interacting with each other via Ref, but they are all bound to their own thread, so the discrete steps that comprise the fiber aren't necessarily interleaved. Would you consider that to be purely parallelism or is there something to be said about concurrency as well
Fabio Labella
@SystemFw
@RaasAhsan it's a tricky discussion tbh, because strictly speaking Threads are also concurrency