firstTask
and secondTask
look like :val firstTask = dbOperation1(k)
def dbOperation1(k: objPUT)(jt: JdbcTemplate, io: Scheduler): Task[Int] = {
val params = Array(user.userId, DateUtils.currentTimestamp, k.getId)
Task(jt.update(tDao.tUpdate, params: _*)).executeOn(io).asyncBoundary
}
val secondTask = dbOperation2(t.getId, t, existing)
def dbOperation2(id: String,input: objPUTGen, existing: objPUTGen = null,iStd: Boolean = true,
isNSRefUpdate: Boolean = false,)(implicit user: UserDetails, jt: JdbcTemplate): Task[_] =
Task.sequence(Seq(dbOperation3(id, input),
if (iStd) dbOperation4( id, input) else Task.unit, dbOperation5(id, input, existing, isNSRefUpdate) ))
def dbOperation3(id: String, input: TemplateGeneric)(implicit user: UserDetails, jt: JdbcTemplate, io: Scheduler): Task[_] = {
val sDel =
s"""
| delete from "tableName"
| where ID = ?
""".stripMargin
Task(jt.update(sDel, id)).executeOn(io).asyncBoundary
}
onErrorFallbackTo(Task.unit)
just before any startAndForget
. It's do not work so I'm wondering if the it's not reported what ever it's catch or not in UnfoldObservable.unsafeSubscribeFn
.assign
api and not the usual subscribe
api on the KafkaConsumer object. Is this supported ? Could I get some pointers please ?
hi there, I am trying to assemble a fat jar with monix using sbt assembly and I am getting these errors:
[error] deduplicate: different file contents found in the following:
[error] /Users/anirudh.vyas/Library/Caches/Coursier/v1/https/repo1.maven.org/maven2/io/monix/monix-internal-jctools_3/3.4.0/monix-internal-jctools_3-3.4.0.jar:scala/runtime/function/JProcedure18.class
[error] /Users/anirudh.vyas/Library/Caches/Coursier/v1/https/repo1.maven.org/maven2/org/scala-lang/scala3-library_3/3.1.2/scala3-library_3-3.1.2.jar:scala/runtime/function/JProcedure18.class
[error] deduplicate: different file contents found in the following:
any suggestions for me?
libraryDependencies += "io.monix" %% "monix" % "3.4.0",
lazy val assemblySettings = Seq(
assembly / assemblyMergeStrategy := {
case PathList("module-info.class") => MergeStrategy.discard
case path if path.contains("META-INF/services") => MergeStrategy.concat
case PathList("META-INF", xs@_*) => MergeStrategy.discard
case "reference.conf" => MergeStrategy.concat
case _ => MergeStrategy.first
},
assembly / test := {} // skip unit test when build uber jar
)
Good morning. Will a Local
or ThreadLocal
be propagated when converting Task
-> Future
-> Task
? Looks to me like this isn't working. Any way I can ensure propagation here?
implicit val scheduler = Scheduler.traced
val local = Local(0)
for {
_ <- Task.fromFuture(Task(local.update(1)).runToFuture)
value <- Task(local.get)
} yield println(s"Local value in Future $value")
Await.result(f.runToFuture, Duration.Inf)
It prints Local value in Future 0
Even Local.isolate
doesn't work here
@Deeds67 I don't know why your particular sample doesn't work, but I can say that there are issues in this integration, as the propagation mechanisms are different.
Try running those tasks by setting an implicit Task.Options(localContextPropagation=true)
in that context, and see if that works. Or switch to usage of a TaskLocal
.
FrmPedido.scala:276:60: Symbol 'type cats.NonEmptyParallel.Aux' is missing from the classpath.
[error] This symbol is required by 'value monix.reactive.Observable.observableNonEmptyParallel'.
[error] Make sure that type Aux is in your classpath and check for conflicting dependencies with `-Ylog-classpath`.
[error] A full rebuild may help if 'Observable.class' was compiled against an incompatible version of cats.NonEmptyParallel.
ConcurrentSubject
. But ConcurrentSubject
buffers all messages in a Vector without removing them after consumption. In my case, I have a single producer and a single consumer, and there is no requirement for multi-cast to multiple consumers. Is there a simpler recipe here that is more memory efficient?
Hi there!
Is there a way do echoRepeated
, but only a limited amount of times and/or with changing timeout?
Or a more generalized approach, echoRepeated
, but with a "retryPolicy" of sorts.
I have a case where I should be emitting last know element from source, but in exponential delay fashion.
I quickly drafted the ugly way to do it:
def backoff[A](maxRetries: Int, backoffStrategy: BackoffStrategy): Observable[A] => Observable[A] =
source => {
@tailrec def
backoff0(source0: Observable[A], retries: Int = 0): Observable[A] = {
val delay = backoffStrategy.computeDelayBeforeNextRetry(
RetryPolicyContext.builder().retriesAttempted(retries).build(),
).toInt
if (retries >= maxRetries)
source0
else
backoff0(source0.echoOnce(delay.millis), retries + 1)
}
backoff0(source)
}
But was wondering if there is something more elegant?
Is there an easy way to disallow returning Task
in Task#map
? A somewhat common error that occurs for us is that people will return Task
in Task#map
, which leads to Task[Task[T]]
, but type erasure makes is so that this is indifferentiable from Task[Unit]
, leading to dangling tasks.
As an example, the following compiles but is bugged, since the inner doSomething(x)
will never run.
def f(): Task[Unit] = {
for {
x <- getSomething()
} yield {
doSomething(x) // doSomething returns a Task.
}
}
We currently ban Task[Unit] for this reason, but I'm wondering if there's a more standardized approach to handle this edge case?