Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Ryan Peters
@sloshy
Monix has auto-cancellation on map/flatMap calls as an option
This blog post might be a bit outdated, but it lays out the difference in how they work: https://monix.io/blog/2018/03/20/monix-vs-cats-effect.html
Ryan Peters
@sloshy
For sleeping - use Task.sleep instead, as it is non-blocking compared to Thread.sleep
Ryan Peters
@sloshy
If you're running a shell command you might want to consider either making it a background process that you can handle independently or using some kind of timeout to insert your own boundary to cancel on before retrying the command.
Geovanny Junio
@geovannyjs

If you're running a shell command you might want to consider either making it a background process that you can handle independently or using some kind of timeout to insert your own boundary to cancel on before retrying the command.

Thank you @sloshy :thumbsup: I will look for some docs of how to make it a background process and handle it independently

Piotr Gawryś
@Avasil
Keep in mind that even calling shellProcess.timeout(5.second) won't actually stop the process if it doesn't have any cancelation capability (it's not possible to cancel arbitrary synchronous actions). I've never used shell commands from Scala so can't give you any details but in your case it might be possible to send CTRL + C there on cancelation
Geovanny Junio
@geovannyjs
@Avasil thank you :thumbsup:
Geovanny Junio
@geovannyjs

I was trying to use Monix.Task to run the command in background, but I found out another construction of scala.sys.process that already runs the process in the background and has a destroy method:

import scala.sys.process._
val cmd = "/my/shell/command".run()
cmd.destroy()

Anyway it was a good opportunity to be introduced to Monix, until yesterday it was unknow to me. Thank you guys @sloshy @Avasil I really appreciate your help :thumbsup:

Ryan Peters
@sloshy
Huh, TIL you can destroy commands. Good to know @geovannyjs
Jake Schwartz
@jakehschwartz_twitter

Hey, Im switching to monix and trying to figure out the best way to keep my program running in between the runs of my scheduler. I currently have:

  val _ = scheduler.scheduleAtFixedRate(1, 120, TimeUnit.SECONDS, () => { ... })
  while (true) { Thread.sleep(1000000)}

While this works, I didnt know if there was a better way to do it using the library. Thanks!

Piotr Gawryś
@Avasil

You can set threads in Scheduler to not be daemonic, let's say:

  implicit val scheduler = Scheduler.fixedPool("my-pool", 4, daemonic = false)

  val _ = scheduler.scheduleAtFixedRate(1, 120, TimeUnit.MILLISECONDS, () => println("a"))

Alternatively, if you want to be pure, you can use Task for your scheduled actions. To schedule Task you can write a recursive function or just use a stream like Observable from monix-reactive, Iterant from monix-tail, fs2.Stream (different library) or whatever you prefer, like this:

https://monix.io/docs/3x/reactive/observable.html#intervalatfixedrate

Then you just use TaskApp in Main or call task.runSyncUnsafe() yourself

Pavel Perikov
@p-pavel

Hi. Supposing I have some side effectual function (e.g.

log(msg: Any): Unit

) is there any reason I'd better run the invocation in Task in situations like

val s: Observable[String] = ???
   s.map(log) //vs s.evalMap(msg => Task(log(msg)))

?

Paweł Kiersznowski
@pk044
hey guys, i wanted to contribute to Monix so I decided to take a look at this issue: monix/monix#957 - i already implemented the function, but i've been thinking a lot about what should be tested
I wonder if writing a similar test to the one testing async execution (in AsyncStateActionObservableSuite) would be a good approach? perhaps do you have any suggestions how I should test it?
Piotr Gawryś
@Avasil
@pk044 in this case I think it's enough to just put 1 sanity check test in AsyncStateActionObservableSuite that it works for cats.effect.IO, it's like this in DoOnNextSuite
Alexandru Nedelcu
@alexandru

Hi all, I’m looking for input on this PR: monix/monix#960

cc @mdedetrich any thoughts?

Ryan Peters
@sloshy
@p-pavel see this conversation I had on the cats-effect gitter https://gitter.im/typelevel/cats-effect?at=5d3b31736ec2e14223f4ed5c
TL;DR you want to use map for pure functions. Side-effects ideally should always be suspended
It guarantees that they execute when and how you specify (across effect system implementations too)
Alexandru Nedelcu
@alexandru

@sloshy @p-pavel indeed, the implementation of Task and Observable currently catch errors and are able to suspend the execution in map and flatMap, however it’s best if the side effects are clearly documented via a proper type.

In our code, when you see Task, you know that it has a side effect, whereas if you don’t then you know that the function is pure and won’t launch your nuclear missiles.

@pk044 yes, adding a test similar to that of Observable.fromAsyncStateAction would be enough, and instead of Task use IO
Paweł Kiersznowski
@pk044
@Avasil @alexandru thanks for the response - going to give it a try right now :)
Pavel Perikov
@p-pavel
@sloshy thanks that exactly what I thought of
@alexandru thanks I see the point. What I thought is getting the single abstraction -- Stream (being monad/monad plus/bracket etc). And yes, I'm trying to abstract the streaming library (I can remember the discussion here) mostly to get myself the feeling of minimal demands of parts of my code. So I was thinking if another abstraction (IO/Task) is required in my facade lib.
Alexandru Nedelcu
@alexandru

@p-pavel if you’re abstracting over the used library, then the behavior in map and flatMap from Observable is not captured in the laws of MonadError or the other implemented type classes, so you can’t expect the behavior.

If you’re working with restricted parametric polymorphism, you can’t assume any behavior that’s not specified in laws.

Therefore usage of an effect type to trigger side effects is mandatory.

To abstract over the effect type, you can use the Cats-Effect type classes, or specifically cats.effect.Sync 😉
Alexandru Nedelcu
@alexandru

New PR fixing #938: monix/monix#965

Alexandru Nedelcu
@alexandru
Another PR for review: monix/monix#917

And this one for Scheduler.features: monix/monix#960

Pavel Perikov
@p-pavel
@alexandru do you mean "the behaviour of map and flatMap catching errors" is not reflected in the laws of MonadError etc?
Piotr Gawryś
@Avasil
Yes
Alexandru Nedelcu
@alexandru

@p-pavel yes, the point being that catching errors is a nice extra the implementation gives you, but you can't rely on it in polymorphic code, since polymorphic code needs to rely on the common denominator between the libraries you're abstracting over.

But the fix is easy, you just wrap that log into a Task or other effect type.

Pavel Perikov
@p-pavel
@alexandru thanks I see. Just tried to figure out pros of the wrapping.
Paweł Kiersznowski
@pk044
hey guys, i was wondering if it's possible to run a single test or perhaps every test in a module? ideally i'd love to get fast feedback if my test works succeeds/fails before I run 1200 more tests :)
Piotr Gawryś
@Avasil

@pk044
To run single module:

project evalJVM
test 
// or
evalJVM/test

To run single test:

evalJVM/testOnly *TestName

And thanks for the interest in contributing!

Paweł Kiersznowski
@pk044
@Avasil tyvm for the answer :) no problem, happy to help!
Vasily Efimov
@voidconductor

Hi everyone!
I'm using monix-kafka and it seems to be problem with producer.
I've got one producer which i use to send records in parallel with Task.wanderUnordered, this gives me tons of locks on producer

--- 321416121577 ns (44.57%), 19443 samples
  [ 0] monix.kafka.KafkaProducer$Implementation
  [ 1] monix.kafka.KafkaProducer$Implementation.$anonfun$send$2
  [ 2] monix.kafka.KafkaProducer$Implementation$$Lambda$7060.1333366705.run
  [ 3] akka.dispatch.TaskInvocation.run
  [ 4] akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec
  [ 5] akka.dispatch.forkjoin.ForkJoinTask.doExec
  [ 6] akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask
  [ 7] akka.dispatch.forkjoin.ForkJoinPool.runWorker
  [ 8] akka.dispatch.forkjoin.ForkJoinWorkerThread.run

currently I'm running producer on fork-join scheduler and also see these locks on visualVM

this gives me worse performance compared to java producer wrapped in Task.async and unnecessary synchronization locks, so I'll have to use java producer for now

Am I doing something wrong or is there bug within producer implementation?

Piotr Gawryś
@Avasil

I think the synchronization is there to avoid sending anything to closed producer but I feel like it might be too much. I'll try to take a look this week, thanks. In the meantime, maybe @alexandru remembers if there were any additional reasons

btw how's the performance if you use Task.traverse with monix-kafka KafkaProducer vs wrapped java producer in parallel?

Alexandru Nedelcu
@alexandru

That code is old, we might be able to get rid of that synchronized. But I don't think think that the lock is very problematic.

I've got one producer which i use to send records in parallel with Task.wanderUnordered

You'll always have performance problems if you do this, no matter what you do. When writing data into a network socket, at some point some sort of synchronization is mandatory. Writing data on the network is a bottleneck. Parallelizing those writes doesn't help.

If you really need parallelism, which I doubt, but lets say that it is inevitable, the better approach is to use a concurrent buffer. So your parallel threads / actors / tasks whathaveyou can all write into that concurrent buffer and then you can have a single consumer that takes data out of that buffer and writes it to Kafka.

cc @Avasil @voidconductor
@Avasil I don't remember why that block is synchronized, but it's probably related to the callback's contract and if so we could get rid of it, preferring something based on an Atomic.compareAndSet instead.
This is assuming that the producer we are using is thread-safe.
@voidconductor try buffering your writes. You can do so with Observable easily.
Vasily Efimov
@voidconductor

Thanks!
@Avasil With 'Task.traverse' it's just a bit faster than monix KafkaProducer with parallelism, but still slower than wrapped java
Good news - there are no locked threads

I hope I find some time to do isolated measurements of this case, without rest of the pipeline being involved

@alexandru I'll try and see if it helps
currently we have a few streams (observables) reading from kafka, doing processing, sending to another topic and after comiting, so we tried sending in batches
so I wonder if multiple producers could be faster in that case, like one producer per stream

Vasily Efimov
@voidconductor
@alexandru I’ve found out apache KafkaProducer is tread-safe and has own buffer for records not yet sent. So if calling send sequentially - before sending another message we’ll have to wait until previous one is sent and acked, with parallelism we just put those messages on inner buffer, and when it’s full or linger.ms has passed - they sent in batch
Piotr Gawryś
@Avasil
We can probably get rid of the synchronized then, just need to check if that's the case in all of the versions we support and handle case of closing producer/calling task callback only once separately
Thank you @voidconductor ! If you are interested in contributing it, it's very welcome, otherwise I'll handle it before the next release