Hi ,
I have a below use case .
Execute DB operations in async, after that is done send out a kafka event to another microservice so that it reads from DB. However as of now the kafka event is being sent even before the DB operation is complete. My code looks as below :
firstTask = Task(doSomeDBOperation).executeOn(io).asyncBoundary
secondTask = Task(doSomeDBOperation).executeOn(io).asyncBoundary
thirdTask = Task(doSomeDBUpdate).executeOn(io).asyncBoundary
Task.sequence(Seq(firstTask, secondTask, thirdTask, pushToKafkaTask))
Is there any way to ensure pushToKafkaTask surely happens after the first three task ?
applicative
methods, for sequential computations you use Monad
methods (flatMap)
or
Task.parSequence(List(firstTask, secondTask, thirdTask)).doOnFinish(_ => pushToKafkaTask)
if you want try/finally semantics
firstTask
and secondTask
look like :val firstTask = dbOperation1(k)
def dbOperation1(k: objPUT)(jt: JdbcTemplate, io: Scheduler): Task[Int] = {
val params = Array(user.userId, DateUtils.currentTimestamp, k.getId)
Task(jt.update(tDao.tUpdate, params: _*)).executeOn(io).asyncBoundary
}
val secondTask = dbOperation2(t.getId, t, existing)
def dbOperation2(id: String,input: objPUTGen, existing: objPUTGen = null,iStd: Boolean = true,
isNSRefUpdate: Boolean = false,)(implicit user: UserDetails, jt: JdbcTemplate): Task[_] =
Task.sequence(Seq(dbOperation3(id, input),
if (iStd) dbOperation4( id, input) else Task.unit, dbOperation5(id, input, existing, isNSRefUpdate) ))
def dbOperation3(id: String, input: TemplateGeneric)(implicit user: UserDetails, jt: JdbcTemplate, io: Scheduler): Task[_] = {
val sDel =
s"""
| delete from "tableName"
| where ID = ?
""".stripMargin
Task(jt.update(sDel, id)).executeOn(io).asyncBoundary
}
onErrorFallbackTo(Task.unit)
just before any startAndForget
. It's do not work so I'm wondering if the it's not reported what ever it's catch or not in UnfoldObservable.unsafeSubscribeFn
.assign
api and not the usual subscribe
api on the KafkaConsumer object. Is this supported ? Could I get some pointers please ?
hi there, I am trying to assemble a fat jar with monix using sbt assembly and I am getting these errors:
[error] deduplicate: different file contents found in the following:
[error] /Users/anirudh.vyas/Library/Caches/Coursier/v1/https/repo1.maven.org/maven2/io/monix/monix-internal-jctools_3/3.4.0/monix-internal-jctools_3-3.4.0.jar:scala/runtime/function/JProcedure18.class
[error] /Users/anirudh.vyas/Library/Caches/Coursier/v1/https/repo1.maven.org/maven2/org/scala-lang/scala3-library_3/3.1.2/scala3-library_3-3.1.2.jar:scala/runtime/function/JProcedure18.class
[error] deduplicate: different file contents found in the following:
any suggestions for me?
libraryDependencies += "io.monix" %% "monix" % "3.4.0",
lazy val assemblySettings = Seq(
assembly / assemblyMergeStrategy := {
case PathList("module-info.class") => MergeStrategy.discard
case path if path.contains("META-INF/services") => MergeStrategy.concat
case PathList("META-INF", xs@_*) => MergeStrategy.discard
case "reference.conf" => MergeStrategy.concat
case _ => MergeStrategy.first
},
assembly / test := {} // skip unit test when build uber jar
)
Good morning. Will a Local
or ThreadLocal
be propagated when converting Task
-> Future
-> Task
? Looks to me like this isn't working. Any way I can ensure propagation here?
implicit val scheduler = Scheduler.traced
val local = Local(0)
for {
_ <- Task.fromFuture(Task(local.update(1)).runToFuture)
value <- Task(local.get)
} yield println(s"Local value in Future $value")
Await.result(f.runToFuture, Duration.Inf)
It prints Local value in Future 0
Even Local.isolate
doesn't work here
@Deeds67 I don't know why your particular sample doesn't work, but I can say that there are issues in this integration, as the propagation mechanisms are different.
Try running those tasks by setting an implicit Task.Options(localContextPropagation=true)
in that context, and see if that works. Or switch to usage of a TaskLocal
.