Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Ryan Peters
@sloshy
@p-pavel see this conversation I had on the cats-effect gitter https://gitter.im/typelevel/cats-effect?at=5d3b31736ec2e14223f4ed5c
TL;DR you want to use map for pure functions. Side-effects ideally should always be suspended
It guarantees that they execute when and how you specify (across effect system implementations too)
Alexandru Nedelcu
@alexandru

@sloshy @p-pavel indeed, the implementation of Task and Observable currently catch errors and are able to suspend the execution in map and flatMap, however it’s best if the side effects are clearly documented via a proper type.

In our code, when you see Task, you know that it has a side effect, whereas if you don’t then you know that the function is pure and won’t launch your nuclear missiles.

@pk044 yes, adding a test similar to that of Observable.fromAsyncStateAction would be enough, and instead of Task use IO
Paweł Kiersznowski
@pk044
@Avasil @alexandru thanks for the response - going to give it a try right now :)
Pavel Perikov
@p-pavel
@sloshy thanks that exactly what I thought of
@alexandru thanks I see the point. What I thought is getting the single abstraction -- Stream (being monad/monad plus/bracket etc). And yes, I'm trying to abstract the streaming library (I can remember the discussion here) mostly to get myself the feeling of minimal demands of parts of my code. So I was thinking if another abstraction (IO/Task) is required in my facade lib.
Alexandru Nedelcu
@alexandru

@p-pavel if you’re abstracting over the used library, then the behavior in map and flatMap from Observable is not captured in the laws of MonadError or the other implemented type classes, so you can’t expect the behavior.

If you’re working with restricted parametric polymorphism, you can’t assume any behavior that’s not specified in laws.

Therefore usage of an effect type to trigger side effects is mandatory.

To abstract over the effect type, you can use the Cats-Effect type classes, or specifically cats.effect.Sync 😉
Alexandru Nedelcu
@alexandru

New PR fixing #938: monix/monix#965

Alexandru Nedelcu
@alexandru
Another PR for review: monix/monix#917

And this one for Scheduler.features: monix/monix#960

Pavel Perikov
@p-pavel
@alexandru do you mean "the behaviour of map and flatMap catching errors" is not reflected in the laws of MonadError etc?
Piotr Gawryś
@Avasil
Yes
Alexandru Nedelcu
@alexandru

@p-pavel yes, the point being that catching errors is a nice extra the implementation gives you, but you can't rely on it in polymorphic code, since polymorphic code needs to rely on the common denominator between the libraries you're abstracting over.

But the fix is easy, you just wrap that log into a Task or other effect type.

Pavel Perikov
@p-pavel
@alexandru thanks I see. Just tried to figure out pros of the wrapping.
Paweł Kiersznowski
@pk044
hey guys, i was wondering if it's possible to run a single test or perhaps every test in a module? ideally i'd love to get fast feedback if my test works succeeds/fails before I run 1200 more tests :)
Piotr Gawryś
@Avasil

@pk044
To run single module:

project evalJVM
test 
// or
evalJVM/test

To run single test:

evalJVM/testOnly *TestName

And thanks for the interest in contributing!

Paweł Kiersznowski
@pk044
@Avasil tyvm for the answer :) no problem, happy to help!
Vasily Efimov
@voidconductor

Hi everyone!
I'm using monix-kafka and it seems to be problem with producer.
I've got one producer which i use to send records in parallel with Task.wanderUnordered, this gives me tons of locks on producer

--- 321416121577 ns (44.57%), 19443 samples
  [ 0] monix.kafka.KafkaProducer$Implementation
  [ 1] monix.kafka.KafkaProducer$Implementation.$anonfun$send$2
  [ 2] monix.kafka.KafkaProducer$Implementation$$Lambda$7060.1333366705.run
  [ 3] akka.dispatch.TaskInvocation.run
  [ 4] akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec
  [ 5] akka.dispatch.forkjoin.ForkJoinTask.doExec
  [ 6] akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask
  [ 7] akka.dispatch.forkjoin.ForkJoinPool.runWorker
  [ 8] akka.dispatch.forkjoin.ForkJoinWorkerThread.run

currently I'm running producer on fork-join scheduler and also see these locks on visualVM

this gives me worse performance compared to java producer wrapped in Task.async and unnecessary synchronization locks, so I'll have to use java producer for now

Am I doing something wrong or is there bug within producer implementation?

Piotr Gawryś
@Avasil

I think the synchronization is there to avoid sending anything to closed producer but I feel like it might be too much. I'll try to take a look this week, thanks. In the meantime, maybe @alexandru remembers if there were any additional reasons

btw how's the performance if you use Task.traverse with monix-kafka KafkaProducer vs wrapped java producer in parallel?

Alexandru Nedelcu
@alexandru

That code is old, we might be able to get rid of that synchronized. But I don't think think that the lock is very problematic.

I've got one producer which i use to send records in parallel with Task.wanderUnordered

You'll always have performance problems if you do this, no matter what you do. When writing data into a network socket, at some point some sort of synchronization is mandatory. Writing data on the network is a bottleneck. Parallelizing those writes doesn't help.

If you really need parallelism, which I doubt, but lets say that it is inevitable, the better approach is to use a concurrent buffer. So your parallel threads / actors / tasks whathaveyou can all write into that concurrent buffer and then you can have a single consumer that takes data out of that buffer and writes it to Kafka.

cc @Avasil @voidconductor
@Avasil I don't remember why that block is synchronized, but it's probably related to the callback's contract and if so we could get rid of it, preferring something based on an Atomic.compareAndSet instead.
This is assuming that the producer we are using is thread-safe.
@voidconductor try buffering your writes. You can do so with Observable easily.
Vasily Efimov
@voidconductor

Thanks!
@Avasil With 'Task.traverse' it's just a bit faster than monix KafkaProducer with parallelism, but still slower than wrapped java
Good news - there are no locked threads

I hope I find some time to do isolated measurements of this case, without rest of the pipeline being involved

@alexandru I'll try and see if it helps
currently we have a few streams (observables) reading from kafka, doing processing, sending to another topic and after comiting, so we tried sending in batches
so I wonder if multiple producers could be faster in that case, like one producer per stream

Vasily Efimov
@voidconductor
@alexandru I’ve found out apache KafkaProducer is tread-safe and has own buffer for records not yet sent. So if calling send sequentially - before sending another message we’ll have to wait until previous one is sent and acked, with parallelism we just put those messages on inner buffer, and when it’s full or linger.ms has passed - they sent in batch
Piotr Gawryś
@Avasil
We can probably get rid of the synchronized then, just need to check if that's the case in all of the versions we support and handle case of closing producer/calling task callback only once separately
Thank you @voidconductor ! If you are interested in contributing it, it's very welcome, otherwise I'll handle it before the next release
we could probably take advantage of it in KafkaProducerSink too
Alexandru Nedelcu
@alexandru

calling task callback only once separately

With the Monix on Master, if that callback comes from the Task, you don't have to do anything other than swallowing an IllegalStateException :)

Maybe we should have something like a tryOnSuccess / tryOnError, like Promise does.
@voidconductor if it's thread-safe, we can definitely remove that synchronization. Somebody will have to work on it though and I think right now we're busy with the Monix 3.0.0 release.
Piotr Gawryś
@Avasil
yeah, I was thinking about doing it just after the release
Vasily Efimov
@voidconductor
Great, I guess I’ll create an issue and try to submit some solution on this weekend or next week
Right now I have modified producer running in my project, it just throws when sending to closed, so good enough for me, but maybe not so good in general
Piotr Gawryś
@Avasil
Awesome, ping me if you have any questions/want to consult anything
Paweł Kiersznowski
@pk044
im currently trying to implement unfold, decided to take a look how fromStateAction works already, so I wrote a simple example:
test("fromstateaction example") { implicit s =>
    var received = 0

    val throwingFunction: Int => (Int, Int) = i => if (i < 10) (i, i + 1) else throw new Exception()
    Observable.fromStateAction(throwingFunction)(0)
      .subscribe { _ =>
        received += 1; Continue
      }

    assertEquals((0 to received).toList, (0 to 9).toList)
  }
received List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) != expected List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
was wondering if it's the correct output?
the 'regular' unfold would return the expected list of numbers
Paweł Kiersznowski
@pk044
i'm probably missing some basic fact about observables
Vasily Efimov
@voidconductor
Looks like it’s correct, generator function can be called 10 times before throwing with provided seed value
maybe (0 to received) should be (0 until received) here