val signal1 = Stream.repeatEval(producer("A", 100.millis))
val signal2 = Stream.repeatEval(producer("B", 200.millis))
val signal: Stream[IO, String] = signal1.merge(signal2)
class Bulb(ref: Ref[IO, Int]) {
def toggle(): IO[Unit] = IO.println(("toggle")) >> IO.unit //todo
}
def pipes(bulb: Bulb): Pipe[IO, String, String] = in =>
in.evalMap(signal => bulb.toggle() >> IO("Toggled"))
val ioBulb: IO[Bulb] = for {
r <- Ref[IO].of(0)
b = new Bulb(r)
} yield b
ayeo (ayeo) assuming every toggle sets the internal Ref to either 1 or 0, you can do something like this.
object Main extends IOApp.Simple:
val signal1 = Stream.emit[IO, String]("A").metered(100.millis)
val signal2 = Stream.emit[IO, String]("B").metered(200.millis)
def run: IO[Unit] =
Bulb.make[IO].flatMap { bulb =>
signal1
.merge(signal2)
.evalMap(_ => IO.println("toggle") *> bulb.toggle)
.interruptAfter(5.seconds)
.compile
.drain
}
trait Bulb[F[_]]:
def toggle: F[Unit]
object Bulb:
def make[F[_]: Functor: Ref.Make]: F[Bulb[F]] =
Ref[F].of(0).map { ref =>
new:
def toggle: F[Unit] = ref.update(x => if x == 0 then 1 else 0)
}
Sleeps and timeouts don't work so well in the browser but you can see the compiling version here: https://scastie.scala-lang.org/NXdBGJLZTwyJeFh3tsgIwg
def registerIntegration(req: Request[IO], auth: String) = {
// First check that we have the correct payload
req
.as[RegisterIntegrationPayload]
.option
.flatMap({
case None => BadRequest("Malformed request body")
case Some(payload) => {
// Check that the user referenced exists
repository
.getUser(payload.userId)
.flatMap({
case None => BadRequest("No user found")
case Some(_) => {
// Unwrap the lightning auth header
authStringToLightningAuth(auth) match {
case Left(st) => BadRequest(st)
case Right((user, pass)) => {
// Make sure we have the correct authentication
LightningAdServer
.getAccount(user, pass)
.flatMap({
case Left(error) =>
InternalServerError(
s"Lightning ad server failed: $error"
)
case Right(accountName) => {
// Mark that the user is lightning registered
repository
.markUserLightningRegistered(payload.userId)
.flatMap(registered =>
if (registered)
Ok("Registered for lightning integration")
else
InternalServerError(
"Failed to register for lightning integration"
)
)
}
})
}
}
}
})
}
})
}
EitherT
or OptionT
Eithers
into Option
or vice versa)
foo.flatMap(x => x.bar.flatMap(y => ???))
is the same as foo.flatMap(x => x.bar).flatMap(y => ???)
flatMap
calls.
for
syntax may help with that.
I'm trying to use parReduceMapA on a List[Set[MyType]] where MyType has a Semigroup instance defined for it, but I'm getting the error:could not find implicit value for parameter T: cats.Reducible[List]
The code is using tagless final, so it looks kinda like this
object MyService {
def impl[F[_]: Spawn: Parallel]: MyService[F] = new MyService[F] {
private def otherFunc(param1: String, param2: Set[MyType]): List[Set[MyType]] = ???
def myFunc(param1: String, param2: Set[MyType]): F[List[Set[MyType]]] =
for {
myVal <- otherFunc(param1, param2).parReduceMapA(Applicative[F].pure)
... etc