Comfy Cats Effect room. Ask questions. Rant about config file formats. Be nice!
.parTupled
etc syntax
Parallel
is in core
Blocker
and IOApp
correctly, your threads will be optimized for you, so you just need to worry about the memory footprint of all of your fibers. You can literally have tens of millions of them as long as they don't close over large state
parSequence
or parTupled
on Concurrent companion since those are from core
parTraverseN
is just an encoding of a common "parTraverse with semaphore" pattern that people kept asking about all the time they didn't have problems with partial unification
F[_]
style so I need the imports somewhat regularly
SignallingRef[F, Boolean](false).flatMap { paused =>
val doStuff = fs2.Stream.eval(myEffect).repeat.pauseWhen(paused)
pauseLogicStream(paused) concurrently doStuff
}
final override implicit def extremelyUnsafeIOAssertionToFuture(
test: IO[Assertion]
)(implicit pos: Position): Future[Assertion] = {
val result: Future[Assertion] = test.unsafeToFuture()
ctx.tick(1000.day) // Advance the clock
if (result.value.isDefined)
result
else {
fail(
s"""Test probably deadlocked. Test `IO` didn't resolve after simulating 1000 days of time.
| Remaining tasks: ${ctx.state.tasks}""".stripMargin
)(pos)
}
}
tick()
time relates to added real execution time
parTraverseN
or parSequenceN
which is pretty much what we were recommending before putting them into a lib:parSequence
+ semaphore
has a problem that it starts all IO
in the list and then suspends them. If your list is like a 1 000 000 you can get OOM.
follow up on the repeating effect thingy. I wrote some 30 lines of code without using fs2. I tested it with some unit tests. Will appreciate if anyone here would take a quick look and point out blunders if any
def create[F[_]](
effect: F[Unit],
repeatDuration: FiniteDuration
)(implicit F: Concurrent[F],
T: Timer[F]
): F[Repeating[F]] = {
Ref[F]
.of(none[Fiber[F, Unit]])
.map { ref =>
new Repeating[F] {
def pause: F[Boolean] =
for {
fiberO <- ref.modify(existing => (None, existing))
success <- fiberO.traverse(_.cancel).map(_.isDefined)
} yield success
def resume: F[Boolean] = {
def loop: F[Unit] =
T.sleep(repeatDuration) >> effect >> loop
running.ifM(
false.pure[F],
for {
fiber <- F.start(loop)
success <- ref.modify { existing =>
if (existing.isDefined) (existing, false)
else (Some(fiber), true)
}
_ <- if (!success) fiber.cancel else F.unit
} yield success
)
}
def running: F[Boolean] = ref.get.map(_.isDefined)
}
}
.flatTap(_.resume)
}
The definition of Repeating
is just as the instance suggested.