hi, this is right?
val obsMainItem =
publishOnEventDocument
//.dump("En el obsMainItem ************")
.filter {
case Right(value@_) => true
case Left(err@_) => false
}
.map {
case Right((oldItem, newItem, event)) =>
event match {
case NotFoundDoobie(msg) => oldItem
case _ => newItem
}
case _ => ViewProcess() //Here never must be reach
}
The Left(err)
side is for another observable where I handle the errors.
val obsProcessSideEffects =
publishOnEventDocument
//.dump("Changos en el processSideEffects **************")
.map {
case Right((oldItem@_, newItem@_, event)) =>
event match {
case FoundDoobie(x) => div(x)
case SavedDoobie(x) => div(x)
case DeletedDoobie(x) => div(x)
case NotFoundDoobie(x) => div(x)
case ErrorDoobie(x) => div(x)
}
case Left(error) => error
}
The most up to date info can be found in Typevel discord, Alex is active there: https://discord.gg/hWd4eS244g
He answered a question about the CE3 roadmap recently:
Good question. I'm trying to come up with one and it is ... complicated, because it needs to have an identity that's different from Cats-Effect & fs2.
<br><br>
The value of Monix is in its pragmatism, its approach to FP that allows for impure interfaces to live alongside the pure ones (with established conventions), its deep integration with the host, its coherence, its fairly stable backwards compatibility.
<br><br>
Unfortunately, the task at hand on deciding what to do next, isn't easy to solve, and personally I've been away, on what people call an OSS hiatus. Trying to get back on that ride though.
<br><br>
In other news, in private I'm experimenting with a new Task. I have ideas and I'm foreseeing a philosophical divergence from cats.effect.IO. If CE evolves towards Task, the question would have been if Monix's implementation makes any sense, but CE 3 has actually moved further from Task's philosophy, and now I'm thinking Task can continue to provide value, while integrating CE3 ideas and also implement its type-classes.
<br><br>
Anyway, I was thinking of getting together with Monix contributors, discuss current issues, decide on a road-map, and call for help maybe, as there's plenty to work on.
<br><br>
This does mean that we are nowhere near a new major release. Sorry for not being able to give you a better answer.
uncancelable
and then in the doOnCancel
of the Task
somehow stopping further fetches, sending a Stop
signal upstream, but upstream from the processing part, as that needs to happen. Observable
.interval(1.seconds)
.flatMap(a => Observable.fromTask(Task.delay(println(a)).as(a))) // Fetching something from API, once fetched we need to ensure it is processed
.mapEval(a => Task.sleep(2.seconds).as(a))
.uncancelable
.consumeWith(Consumer.foreachParallelTask[Long](1)(a => Task.delay(println(s"arrived at the end $a")))) // This step always HAS to be invoked
.as(ExitCode.Success)
.doOnCancel(Task.sleep(10.seconds) *> Task.delay(println("CIAO")))
Hi, someone could tell me, please.
Why this code isn't working properly?
I want to request on different links, when the Observable
triggers an event.
val eventComparative = PublishSubject[String]()
val eventComparativePublish = eventComparative.publish
val cancelEventComparativePublish = eventComparativePublish.connect()
val getMinimosRetensionISR = eventComparativePublish.dump("0").mapEval { case idComparative => ...
val getProcess = eventComparativePublish.dump("0").mapEval { case idComparative => ...
val getComparativo = eventComparativePublish.dump("0").mapEval { case idComparative => ...
val cmdPrint = button("Imprimir", cls := "myButton",
SyncIO ( onClick.transformLifted { event: Observable[MouseEvent] =>
event
.withLatestFrom(getIdComparative) {
case (event@_, idComparative: String) =>
idComparative
}} --> repoCompara.eventComparative
)
)
PublishSubject
live: Repository
val publisher = PublishSubject[String]().publish
val cancelEventComparativePublish = publisher.connect()
val o1 = eventComparativePublish.dump("Event received!")
val publishTask = Observable.repeatEvalF(Task(eventComparative.onNext("Hi")))
Observable.zip2(o1, publishTask).completedL.runSyncUnsafe()
Hi ,
I have a below use case .
Execute DB operations in async, after that is done send out a kafka event to another microservice so that it reads from DB. However as of now the kafka event is being sent even before the DB operation is complete. My code looks as below :
firstTask = Task(doSomeDBOperation).executeOn(io).asyncBoundary
secondTask = Task(doSomeDBOperation).executeOn(io).asyncBoundary
thirdTask = Task(doSomeDBUpdate).executeOn(io).asyncBoundary
Task.sequence(Seq(firstTask, secondTask, thirdTask, pushToKafkaTask))
Is there any way to ensure pushToKafkaTask surely happens after the first three task ?
applicative
methods, for sequential computations you use Monad
methods (flatMap)
or
Task.parSequence(List(firstTask, secondTask, thirdTask)).doOnFinish(_ => pushToKafkaTask)
if you want try/finally semantics
firstTask
and secondTask
look like :val firstTask = dbOperation1(k)
def dbOperation1(k: objPUT)(jt: JdbcTemplate, io: Scheduler): Task[Int] = {
val params = Array(user.userId, DateUtils.currentTimestamp, k.getId)
Task(jt.update(tDao.tUpdate, params: _*)).executeOn(io).asyncBoundary
}
val secondTask = dbOperation2(t.getId, t, existing)
def dbOperation2(id: String,input: objPUTGen, existing: objPUTGen = null,iStd: Boolean = true,
isNSRefUpdate: Boolean = false,)(implicit user: UserDetails, jt: JdbcTemplate): Task[_] =
Task.sequence(Seq(dbOperation3(id, input),
if (iStd) dbOperation4( id, input) else Task.unit, dbOperation5(id, input, existing, isNSRefUpdate) ))
def dbOperation3(id: String, input: TemplateGeneric)(implicit user: UserDetails, jt: JdbcTemplate, io: Scheduler): Task[_] = {
val sDel =
s"""
| delete from "tableName"
| where ID = ?
""".stripMargin
Task(jt.update(sDel, id)).executeOn(io).asyncBoundary
}
onErrorFallbackTo(Task.unit)
just before any startAndForget
. It's do not work so I'm wondering if the it's not reported what ever it's catch or not in UnfoldObservable.unsafeSubscribeFn
.assign
api and not the usual subscribe
api on the KafkaConsumer object. Is this supported ? Could I get some pointers please ?