Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
ziggystar
@ziggystar
No, I cannot control it. Can I make the Observer buffering?
Or make it drop extensive events?
Piotr Gawryś
@Avasil
Yes, take a look at BufferedSubscriber
There are many options via OverflowStrategy
ziggystar
@ziggystar
Hmm, what would I do with a Stop response from onNext? I cannot stop the clicking, but I could stop reporting it.
Piotr Gawryś
@Avasil
maybe cancel label and remove actionListener if possible? Might make sense to do it on release as well
ziggystar
@ziggystar
That appears to be correct. I'd like to thank you very much for your help, I've got to go to bed now, unfortunately. This has helped me a great deal, especially the idea with the Resource. I hope this works out when I try to rebuild the application in this way, because this handles cleaning up nicely. Thanks again and bye.
Piotr Gawryś
@Avasil
No problem, I'm happy to help
Gabriel Claramunt
@gclaramunt
I'm using Scalatest... what's the best way to test the result of a Task ? I'm using runToFuture, but Scalatest's AsyncSpec doesn't seem to like CompletableFuture ... any recommendation?
TapanVaishnav
@TapanVaishnav
@gclaramunt How about using Monix's TestSchedular?
https://monix.io/docs/2x/execution/scheduler.html#injecting-time-and-tests
Gabriel Claramunt
@gclaramunt
cool thanks! In my case I was doing something very silly like forgetting to map on the result :)
Piotr Gawryś
@Avasil

I think whatever works with scala.concurrent.Future should work with monix.execution.CancelableFuture. Maybe it doesn't work for supertypes and you need to cast it to normal Future?
You can also runSyncUnsafe it in tests

TestScheduler is also a great advice, you can use it like this: https://github.com/monix/monix/blob/master/monix-eval/shared/src/test/scala/monix/eval/TaskRaceSuite.scala

Yann Moisan
@YannMoisan
Hello ! I've just read https://monix.io/docs/3x/best-practices/blocking.html and I'm wondering why the deadlock example is not shorter :
  implicit val ec = ExecutionContext
    .fromExecutor(Executors.newFixedThreadPool(1))

  def addOne(x: Int) = Future(x + 1)

  def multiply(x: Int, y: Int) = Future {
    val result = addOne(x)
    Await.result(result, Duration.Inf)
  }
matfournier
@matfournier
Hello. Observables question. What if I have a response that both is an answer I need and something that spawns another observable? E.g. I'm fetching remote pages in an observable (say following some sort of "next" token) from an asyncStateAction. For any response, it has 10 pages I need to emit. Those pages may have child pages (with their own cursors, so I'm walking some sort of remote tree) I need to fetch. A few questions: the 10 pages I get, do I convert that into another Observable.fromIterator from my List[Pages] I dig out of the response so I'm always emitting "Page", rather than List[Page] on my observable? Not sure how to handle the child page, as I both want to emit the parent page and spawn a new observable to start remote paging the child.
TapanVaishnav
@TapanVaishnav
@matfournier can you provide some demo code for all this?
matfournier
@matfournier
Working on that now, it will be a bit though. Converting a work problem to a simpler problem.
matfournier
@matfournier
That will have to be a tomorrow problem. I think I'm close. Will have code tomorrow evening.
TapanVaishnav
@TapanVaishnav
@matfournier :+1:
Vasily Shiyan
@xicmiah
Hi! Is there an Observable equivalent to fs2 through?
As in
def through[B](f: Observable[A] => Observable[B]): Observable[B] = f(this)
matfournier
@matfournier
Is flatMap stack safe? I came across this issue back from 2.3, which indicates that flatMap isn't stack safe in the 3.0 release? monix/monix#481
matfournier
@matfournier
@TapanVaishnav here : https://gist.github.com/matfournier/7e76330ae7970447cc249b46c2dca23e , apologies for the janky code from trying to build a simple example!
Piotr Gawryś
@Avasil

@matfournier flatMap for Observable is not stack-safe in 3.x, I think it's not possible in current protocol. Alex has some ideas for brand new protocol that would be stack safe but that's 4.x in the far future territory

I will take a look at your example tomorrow (CET timezone)

@xicmiah There used to be transform but it has been removed, I don't know why. If you'd like to have it back then please show your support here :D monix/monix#891
Right now you could add it as syntax with implicit class or use scala.util.chaining if you're on Scala 2.13
Piotr Gawryś
@Avasil
@YannMoisan Hello, probably for no particular reason, I think both examples are ok
matfournier
@matfournier
@Avasil thanks, appreciate any insights! I have a problem where I make N-many observables (many of length 1) which seems ... expensive. Gotta be a better way to do it.
Piotr Gawryś
@Avasil

@matfournier I looked at the code, I actually don't think creating single element Observable is as bad as you think. I wouldn't worry about it too much before measuring that it's a bottleneck. Observable.now is really cheap, it's like 1 allocation + whatever's going on in flatMap. It feels like calls for pages would dominate the work to do

If you want to do buffering, you can try bufferIntrospective or other buffering operator (perhaps bufferTumbling inside flatMap). If there's no batch API then maybe concurrent calls would work too, e.g. mergeMap or Task.wander

Overall the code looks good to me, I don't really have too much to add. :D Though I will be happy to help out if it turns out to be a problem

though one of the linked issues says flatMap is not stacksafe, need to test this... if it's not that's a problem!

Why is it a problem, do you flatMap in infinite loop somewhere?

matfournier
@matfournier
Nope, I don't recursve on flatMap. Just something I came across! Ha. Perils of doing code and writing notes for yourself at the same time. Thanks!
Yeah, in practice the calls for pages dominates. If you are unlucky there are hundreds. If you are really unlucky, semi-reliable third parties (cough facebook cough) will have a bug with cyclical hierarchies and give you page cursors forever until you OOM on accumulation...
TapanVaishnav
@TapanVaishnav
@matfournier Hi, thanks for the code. I'm also not that good with all the efficiency stuff of monix but as Piotr said creating a single element Observable might not be as bad as you think. I think I could think of is creating an unbounded Observable and doing BFS (breadth first search; just for the clarification, sorry for stating the obvious) on your remote tree, emitting the parent page and then appending all the child pages at the end of the Observable designed with ConcurrentQueue(!?) or any other iterable, and traversing in the similar manner till the end.
Does this make any sense?
matfournier
@matfournier
Makes sense. Thankfully I have other contracts that kick in that stop me from going too deep, either time bound (caller has long since timed out) or size bound (these get surfaced to a front-end which croaks after ~5k elements anyway). Those conditions aren't in the example but have been easy to model.
Wojtek Pituła
@Krever
hay, any hints on how to turn Observable into java.io.InputStream ?
Piotr Gawryś
@Avasil
@Krever We have a PR in progress: monix/monix#1050 feedback is welcome :D
Wojtek Pituła
@Krever
ahh nice, sadly I need it right now :P but good to know it will come
Piotr Gawryś
@Avasil
If you'd rather avoid beta testing then I guess there is an option of converting to fs2 / akka-streams which already have this function
Matthew de Detrich
@mdedetrich
When using guaranteeCase is it possible to somehow get the successful value from the Task?
Fabio Labella
@SystemFw
@mdedetrich you might want to look into Concurrent#continual
Matthew de Detrich
@mdedetrich
Hmm, let me answer it this, would the following implementation be correct
      /**
        * Returns a new `TaskResult` in which f` is scheduled to be run on
        * completion. This would typically be used to release any
        * resources acquired by this `Task`.
        *
        * The returned `TaskResult` completes when both the source and the task
        * returned by `f` complete.
        *
        * NOTE: The given function is only called when the task is
        * complete.  However the function does not get called if the task
        * gets canceled.  Cancellation is a process that's concurrent with
        * the execution of a task and hence needs special handling.
        *
        * See [[doOnCancel]] for specifying a callback to call on
        * canceling a task.
        *
        * @param f The function argument. The first boolean parameter indicates
        * if the `TaskResult` finished due to an Exception or it being on the Left
        *
        */
      def doOnFinish(f: Option[(Boolean, Throwable)] => TaskResult[Unit]): TaskResult[Unit] = {
        EitherT.right(result.value.flatMap {
          case Left(e)  => f(Some(false, e)).value.void
          case Right(_) => f(None).value.void
        }.guaranteeCase {
          case ExitCase.Completed => Task.unit
          case ExitCase.Completed => Task.unit
          case ExitCase.Error(e) => f(Some(true, e)).value.void
        })
      }
Where type TaskResult[T] = EitherT[Task, GeneralError, T]
So basically I am trying to do a monix doOnFinish but I also want to handle the Left case if I am in an EitherT
Fabio Labella
@SystemFw
can you not call guaranteeCase on the EitherT directly?
Matthew de Detrich
@mdedetrich
I can but I can't get the success value which is what I need
Otherwise f would get called multiple times
Fabio Labella
@SystemFw
to get the success value, you might want to look at continual instead, as I said :)
Matthew de Detrich
@mdedetrich
Is that any better than the proposed solution?
Also I am trying to wrap my head around Concurrent.continual
Fabio Labella
@SystemFw
continual gives you a guarantee, everything else does not
value.flatMap is not interruption safe
Piotr Gawryś
@Avasil
In flatMap(f).guaranteeCase you might call both f (success) and case ExitCase.Canceled => if you're unlucky and it gets canceled in between
Matthew de Detrich
@mdedetrich
Okay thanks, I will look into continual then