You can set threads in Scheduler
to not be daemonic, let's say:
implicit val scheduler = Scheduler.fixedPool("my-pool", 4, daemonic = false)
val _ = scheduler.scheduleAtFixedRate(1, 120, TimeUnit.MILLISECONDS, () => println("a"))
Alternatively, if you want to be pure, you can use Task
for your scheduled actions. To schedule Task
you can write a recursive function or just use a stream like Observable
from monix-reactive
, Iterant
from monix-tail
, fs2.Stream
(different library) or whatever you prefer, like this:
https://monix.io/docs/3x/reactive/observable.html#intervalatfixedrate
Then you just use TaskApp
in Main or call task.runSyncUnsafe()
yourself
map
for pure functions. Side-effects ideally should always be suspended
@sloshy @p-pavel indeed, the implementation of Task
and Observable
currently catch errors and are able to suspend the execution in map
and flatMap
, however it’s best if the side effects are clearly documented via a proper type.
In our code, when you see Task
, you know that it has a side effect, whereas if you don’t then you know that the function is pure and won’t launch your nuclear missiles.
Observable.fromAsyncStateAction
would be enough, and instead of Task
use IO
@p-pavel if you’re abstracting over the used library, then the behavior in map
and flatMap
from Observable
is not captured in the laws of MonadError
or the other implemented type classes, so you can’t expect the behavior.
If you’re working with restricted parametric polymorphism, you can’t assume any behavior that’s not specified in laws.
Therefore usage of an effect type to trigger side effects is mandatory.
cats.effect.Sync
😉
Scheduler.features
: monix/monix#960
@p-pavel yes, the point being that catching errors is a nice extra the implementation gives you, but you can't rely on it in polymorphic code, since polymorphic code needs to rely on the common denominator between the libraries you're abstracting over.
But the fix is easy, you just wrap that log into a Task
or other effect type.
Hi everyone!
I'm using monix-kafka and it seems to be problem with producer.
I've got one producer which i use to send records in parallel with Task.wanderUnordered
, this gives me tons of locks on producer
--- 321416121577 ns (44.57%), 19443 samples
[ 0] monix.kafka.KafkaProducer$Implementation
[ 1] monix.kafka.KafkaProducer$Implementation.$anonfun$send$2
[ 2] monix.kafka.KafkaProducer$Implementation$$Lambda$7060.1333366705.run
[ 3] akka.dispatch.TaskInvocation.run
[ 4] akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec
[ 5] akka.dispatch.forkjoin.ForkJoinTask.doExec
[ 6] akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask
[ 7] akka.dispatch.forkjoin.ForkJoinPool.runWorker
[ 8] akka.dispatch.forkjoin.ForkJoinWorkerThread.run
currently I'm running producer on fork-join scheduler and also see these locks on visualVM
this gives me worse performance compared to java producer wrapped in Task.async
and unnecessary synchronization locks, so I'll have to use java producer for now
Am I doing something wrong or is there bug within producer implementation?
I think the synchronization is there to avoid sending anything to closed producer but I feel like it might be too much. I'll try to take a look this week, thanks. In the meantime, maybe @alexandru remembers if there were any additional reasons
btw how's the performance if you use Task.traverse
with monix-kafka KafkaProducer
vs wrapped java producer in parallel?
That code is old, we might be able to get rid of that synchronized
. But I don't think think that the lock is very problematic.
I've got one producer which i use to send records in parallel with Task.wanderUnordered
You'll always have performance problems if you do this, no matter what you do. When writing data into a network socket, at some point some sort of synchronization is mandatory. Writing data on the network is a bottleneck. Parallelizing those writes doesn't help.
If you really need parallelism, which I doubt, but lets say that it is inevitable, the better approach is to use a concurrent buffer. So your parallel threads / actors / tasks whathaveyou can all write into that concurrent buffer and then you can have a single consumer that takes data out of that buffer and writes it to Kafka.
Atomic.compareAndSet
instead.
Observable
easily.
Thanks!
@Avasil With 'Task.traverse' it's just a bit faster than monix KafkaProducer
with parallelism, but still slower than wrapped java
Good news - there are no locked threads
I hope I find some time to do isolated measurements of this case, without rest of the pipeline being involved
@alexandru I'll try and see if it helps
currently we have a few streams (observables) reading from kafka, doing processing, sending to another topic and after comiting, so we tried sending in batches
so I wonder if multiple producers could be faster in that case, like one producer per stream
KafkaProducer
is tread-safe and has own buffer for records not yet sent. So if calling send
sequentially - before sending another message we’ll have to wait until previous one is sent and acked, with parallelism we just put those messages on inner buffer, and when it’s full or linger.ms
has passed - they sent in batch
KafkaProducerSink
too
tryOnSuccess
/ tryOnError
, like Promise
does.
3.0.0
release.
test("fromstateaction example") { implicit s =>
var received = 0
val throwingFunction: Int => (Int, Int) = i => if (i < 10) (i, i + 1) else throw new Exception()
Observable.fromStateAction(throwingFunction)(0)
.subscribe { _ =>
received += 1; Continue
}
assertEquals((0 to received).toList, (0 to 9).toList)
}