Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Rohan Sircar
@rohan-sircar

or

Task.parSequence(List(firstTask, secondTask, thirdTask)).doOnFinish(_ => pushToKafkaTask)

if you want try/finally semantics

Jasper Moeys
@Jasper-M
Task.sequence should be sequential / Monad though
Rohan Sircar
@rohan-sircar
I thought so too. Maybe it's because of the asyncboundary calls?
Sourabh-s
@Sourabh-s
@rohan-sircar yeah it seems to be because of the asyncboundary calls. Sadly even the solution given by you does not work .
Sourabh-s
@Sourabh-s
I will be adding further code snippet below to show what firstTask and secondTask look like :
val firstTask = dbOperation1(k)

def dbOperation1(k: objPUT)(jt: JdbcTemplate, io: Scheduler): Task[Int] = {
    val params = Array(user.userId, DateUtils.currentTimestamp, k.getId)

    Task(jt.update(tDao.tUpdate, params: _*)).executeOn(io).asyncBoundary
  }


val secondTask = dbOperation2(t.getId, t, existing)


def dbOperation2(id: String,input: objPUTGen, existing: objPUTGen = null,iStd: Boolean = true, 
                 isNSRefUpdate: Boolean = false,)(implicit user: UserDetails, jt: JdbcTemplate): Task[_] =

    Task.sequence(Seq(dbOperation3(id, input),
                      if (iStd)  dbOperation4( id, input) else Task.unit, dbOperation5(id, input, existing, isNSRefUpdate) ))


def dbOperation3(id: String, input: TemplateGeneric)(implicit user: UserDetails, jt: JdbcTemplate, io: Scheduler): Task[_] = {

    val sDel =
      s"""
         | delete from "tableName"
         | where ID = ?
       """.stripMargin

    Task(jt.update(sDel, id)).executeOn(io).asyncBoundary
  }
Rohan Sircar
@rohan-sircar
Well.. it should work. Code above looks fine, the problem could be in pushToKafka if it has un-suspended side effects. Otherwise idk ¯\(ツ)
etienne
@crakjie
I have question, I'm currently looking for a logged exception that seems to be reported to the sheduler. I supposed I could "catch" it if I do somthing as onErrorFallbackTo(Task.unit) just before any startAndForget. It's do not work so I'm wondering if the it's not reported what ever it's catch or not in UnfoldObservable.unsafeSubscribeFn.
Do someone have an advice that could help me to properly control logs with (supposly) async exception?
1 reply
Mateusz Zakarczemny
@Matzz
Hi,
How do you guys manage java memory when running in container environment (eg k8s) while using monix io sheduler (Scheduler.io())
IO scheduler is unbounded so it may spawn undefined number of threads. Threads keep stacks out of java heap space (xmx) so we cannot limit them.
Therefore in case of huge number of IO operations using Scheduler.io may lead to OOM errors on the container.
Do you deal with that / manage that somehow?
Vish Ramachandran
@vishramachandran
This is a question on monix-kafka. I want to be able to create a consumer Observable by subscribing to a specific partition number, of a specific topic, starting at a specific offset. I manage checkpointing outside kafka. Is this possible ? This requires using Kafka's assign api and not the usual subscribe api on the KafkaConsumer object. Is this supported ? Could I get some pointers please ?
1 reply
If not supported, what is the recommended workaround ?
Anirudh Vyas
@AnirudhVyas
hi I am using monix scheduler to parallelize a long running workload, I need system to be up and running, for now I thought ok maybe I will have Thread.sleep(30 days) to do it, but I need a more elegant way to do this - any suggestions?
Eliav Lavi
@eliavlavi
Hello 👋 funny question - am I missing something or Task.materialize & Task.attempt are effectually the same thing?
Peter Nham
@petern-sc
Is there an easy way to go from Observable[ByteBuffer] => Observable[String]. I've attempted to use monix-nio utf decoding, but it's not splitting correctly on the line ends
Alex Nedelcu
@alexelcu:matrix.org
[m]
👋
Anirudh Vyas
@AnirudhVyas

hi there, I am trying to assemble a fat jar with monix using sbt assembly and I am getting these errors:

[error] deduplicate: different file contents found in the following:
[error] /Users/anirudh.vyas/Library/Caches/Coursier/v1/https/repo1.maven.org/maven2/io/monix/monix-internal-jctools_3/3.4.0/monix-internal-jctools_3-3.4.0.jar:scala/runtime/function/JProcedure18.class
[error] /Users/anirudh.vyas/Library/Caches/Coursier/v1/https/repo1.maven.org/maven2/org/scala-lang/scala3-library_3/3.1.2/scala3-library_3-3.1.2.jar:scala/runtime/function/JProcedure18.class
[error] deduplicate: different file contents found in the following:

any suggestions for me?

my build sbt has :
  libraryDependencies += "io.monix" %% "monix" % "3.4.0",
this is my assembly setting:
lazy val assemblySettings = Seq(
  assembly / assemblyMergeStrategy := {
    case PathList("module-info.class") => MergeStrategy.discard
    case path if path.contains("META-INF/services") => MergeStrategy.concat
    case PathList("META-INF", xs@_*) => MergeStrategy.discard
    case "reference.conf" => MergeStrategy.concat
    case _ => MergeStrategy.first
  },
  assembly / test := {} // skip unit test when build uber jar
)
Alex Nedelcu
@alexelcu:matrix.org
[m]
@AnirudhVyas: this is a known issue, will release a new version with the fix soon.
Alex Nedelcu
@alexelcu:matrix.org
[m]
@AnirudhVyas: try version the released 3.4.1
Pierre Marais
@Deeds67

Good morning. Will a Local or ThreadLocal be propagated when converting Task -> Future -> Task? Looks to me like this isn't working. Any way I can ensure propagation here?

implicit val scheduler = Scheduler.traced
val local = Local(0)

for {
  _     <- Task.fromFuture(Task(local.update(1)).runToFuture)
  value <- Task(local.get)
} yield println(s"Local value in Future $value")

Await.result(f.runToFuture, Duration.Inf)

It prints Local value in Future 0

Even Local.isolate doesn't work here

Pierre Marais
@Deeds67
Also tried explicitly with .runToFutureOpt(scheduler, Task.Options(true, true)
Alexandru Nedelcu
@alexandru

@Deeds67 I don't know why your particular sample doesn't work, but I can say that there are issues in this integration, as the propagation mechanisms are different.

Try running those tasks by setting an implicit Task.Options(localContextPropagation=true) in that context, and see if that works. Or switch to usage of a TaskLocal.

Pierre Marais
@Deeds67
Thanks, TaskLocal works :+1:
Alexandru Nedelcu
@alexandru
Help needed — monix/monix#1560
Arunav Sanyal
@Khalian
Hi. I was looking through the monix source code. It seems that for all cases https://github.com/monix/monix/blob/v3.4.0/monix-catnap/shared/src/main/scala/monix/catnap/CircuitBreaker.scala#L415 the circuit breaker results in a RejectedExecutionException. I was wondering if it would be possible to instead complete with a different task when the circuit state is open. The reason for this is that I want to run the circuit breaker in a "dry run" mode to see if I have setup my params (maxFailures, resetTimeout) etc correctly
Alexandru Nedelcu
@alexandru
@Khalian not sure what you need there. You can always catch the RejectedExecutionException — e.g., .recoverWith { case _: RejectedExecutionException => myTask }. Does that not suffice?
Arunav Sanyal
@Khalian
Well the problem is a bit more nuanced than that. i am effectively trying to write in something called as "Observer Mode" in which we don't even Circuit Break (the operating problem here for me is that i do not know upfront how quickly I should be rejecting requests in production and I want to get some actual data). Not to mention that I need different messaging for different use cases. And I am conceptually think of that as a [Option[() => Task[R]]] and in case. The latter half of the problem is solved by your suggestion, expect there is now a pointless raise error and recoverWith which could have been avoided and the former does not have a solution today (hence I was thinking if I should write a pull request for this model)
The other problem I am trying to solve for is, if I have a badly set config and I want to update it in production based on data I see, what do I do. And I suppose I can solve that with keeping a couple of pieces of state on the timeout and maxFailures and if they are changed (say by AWS AppConfig as the backing dynamic store), then I can just go and reinitialize a new CircuitBreaker