assign
api and not the usual subscribe
api on the KafkaConsumer object. Is this supported ? Could I get some pointers please ?
hi there, I am trying to assemble a fat jar with monix using sbt assembly and I am getting these errors:
[error] deduplicate: different file contents found in the following:
[error] /Users/anirudh.vyas/Library/Caches/Coursier/v1/https/repo1.maven.org/maven2/io/monix/monix-internal-jctools_3/3.4.0/monix-internal-jctools_3-3.4.0.jar:scala/runtime/function/JProcedure18.class
[error] /Users/anirudh.vyas/Library/Caches/Coursier/v1/https/repo1.maven.org/maven2/org/scala-lang/scala3-library_3/3.1.2/scala3-library_3-3.1.2.jar:scala/runtime/function/JProcedure18.class
[error] deduplicate: different file contents found in the following:
any suggestions for me?
libraryDependencies += "io.monix" %% "monix" % "3.4.0",
lazy val assemblySettings = Seq(
assembly / assemblyMergeStrategy := {
case PathList("module-info.class") => MergeStrategy.discard
case path if path.contains("META-INF/services") => MergeStrategy.concat
case PathList("META-INF", xs@_*) => MergeStrategy.discard
case "reference.conf" => MergeStrategy.concat
case _ => MergeStrategy.first
},
assembly / test := {} // skip unit test when build uber jar
)
Good morning. Will a Local
or ThreadLocal
be propagated when converting Task
-> Future
-> Task
? Looks to me like this isn't working. Any way I can ensure propagation here?
implicit val scheduler = Scheduler.traced
val local = Local(0)
for {
_ <- Task.fromFuture(Task(local.update(1)).runToFuture)
value <- Task(local.get)
} yield println(s"Local value in Future $value")
Await.result(f.runToFuture, Duration.Inf)
It prints Local value in Future 0
Even Local.isolate
doesn't work here
@Deeds67 I don't know why your particular sample doesn't work, but I can say that there are issues in this integration, as the propagation mechanisms are different.
Try running those tasks by setting an implicit Task.Options(localContextPropagation=true)
in that context, and see if that works. Or switch to usage of a TaskLocal
.
FrmPedido.scala:276:60: Symbol 'type cats.NonEmptyParallel.Aux' is missing from the classpath.
[error] This symbol is required by 'value monix.reactive.Observable.observableNonEmptyParallel'.
[error] Make sure that type Aux is in your classpath and check for conflicting dependencies with `-Ylog-classpath`.
[error] A full rebuild may help if 'Observable.class' was compiled against an incompatible version of cats.NonEmptyParallel.
ConcurrentSubject
. But ConcurrentSubject
buffers all messages in a Vector without removing them after consumption. In my case, I have a single producer and a single consumer, and there is no requirement for multi-cast to multiple consumers. Is there a simpler recipe here that is more memory efficient?
Hi there!
Is there a way do echoRepeated
, but only a limited amount of times and/or with changing timeout?
Or a more generalized approach, echoRepeated
, but with a "retryPolicy" of sorts.
I have a case where I should be emitting last know element from source, but in exponential delay fashion.
I quickly drafted the ugly way to do it:
def backoff[A](maxRetries: Int, backoffStrategy: BackoffStrategy): Observable[A] => Observable[A] =
source => {
@tailrec def
backoff0(source0: Observable[A], retries: Int = 0): Observable[A] = {
val delay = backoffStrategy.computeDelayBeforeNextRetry(
RetryPolicyContext.builder().retriesAttempted(retries).build(),
).toInt
if (retries >= maxRetries)
source0
else
backoff0(source0.echoOnce(delay.millis), retries + 1)
}
backoff0(source)
}
But was wondering if there is something more elegant?
Is there an easy way to disallow returning Task
in Task#map
? A somewhat common error that occurs for us is that people will return Task
in Task#map
, which leads to Task[Task[T]]
, but type erasure makes is so that this is indifferentiable from Task[Unit]
, leading to dangling tasks.
As an example, the following compiles but is bugged, since the inner doSomething(x)
will never run.
def f(): Task[Unit] = {
for {
x <- getSomething()
} yield {
doSomething(x) // doSomething returns a Task.
}
}
We currently ban Task[Unit] for this reason, but I'm wondering if there's a more standardized approach to handle this edge case?