Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Piotr Gawryś
@Avasil

I think whatever works with scala.concurrent.Future should work with monix.execution.CancelableFuture. Maybe it doesn't work for supertypes and you need to cast it to normal Future?
You can also runSyncUnsafe it in tests

TestScheduler is also a great advice, you can use it like this: https://github.com/monix/monix/blob/master/monix-eval/shared/src/test/scala/monix/eval/TaskRaceSuite.scala

Yann Moisan
@YannMoisan
Hello ! I've just read https://monix.io/docs/3x/best-practices/blocking.html and I'm wondering why the deadlock example is not shorter :
  implicit val ec = ExecutionContext
    .fromExecutor(Executors.newFixedThreadPool(1))

  def addOne(x: Int) = Future(x + 1)

  def multiply(x: Int, y: Int) = Future {
    val result = addOne(x)
    Await.result(result, Duration.Inf)
  }
matfournier
@matfournier
Hello. Observables question. What if I have a response that both is an answer I need and something that spawns another observable? E.g. I'm fetching remote pages in an observable (say following some sort of "next" token) from an asyncStateAction. For any response, it has 10 pages I need to emit. Those pages may have child pages (with their own cursors, so I'm walking some sort of remote tree) I need to fetch. A few questions: the 10 pages I get, do I convert that into another Observable.fromIterator from my List[Pages] I dig out of the response so I'm always emitting "Page", rather than List[Page] on my observable? Not sure how to handle the child page, as I both want to emit the parent page and spawn a new observable to start remote paging the child.
TapanVaishnav
@TapanVaishnav
@matfournier can you provide some demo code for all this?
matfournier
@matfournier
Working on that now, it will be a bit though. Converting a work problem to a simpler problem.
matfournier
@matfournier
That will have to be a tomorrow problem. I think I'm close. Will have code tomorrow evening.
TapanVaishnav
@TapanVaishnav
@matfournier :+1:
Vasily Shiyan
@xicmiah
Hi! Is there an Observable equivalent to fs2 through?
As in
def through[B](f: Observable[A] => Observable[B]): Observable[B] = f(this)
matfournier
@matfournier
Is flatMap stack safe? I came across this issue back from 2.3, which indicates that flatMap isn't stack safe in the 3.0 release? monix/monix#481
matfournier
@matfournier
@TapanVaishnav here : https://gist.github.com/matfournier/7e76330ae7970447cc249b46c2dca23e , apologies for the janky code from trying to build a simple example!
Piotr Gawryś
@Avasil

@matfournier flatMap for Observable is not stack-safe in 3.x, I think it's not possible in current protocol. Alex has some ideas for brand new protocol that would be stack safe but that's 4.x in the far future territory

I will take a look at your example tomorrow (CET timezone)

@xicmiah There used to be transform but it has been removed, I don't know why. If you'd like to have it back then please show your support here :D monix/monix#891
Right now you could add it as syntax with implicit class or use scala.util.chaining if you're on Scala 2.13
Piotr Gawryś
@Avasil
@YannMoisan Hello, probably for no particular reason, I think both examples are ok
matfournier
@matfournier
@Avasil thanks, appreciate any insights! I have a problem where I make N-many observables (many of length 1) which seems ... expensive. Gotta be a better way to do it.
Piotr Gawryś
@Avasil

@matfournier I looked at the code, I actually don't think creating single element Observable is as bad as you think. I wouldn't worry about it too much before measuring that it's a bottleneck. Observable.now is really cheap, it's like 1 allocation + whatever's going on in flatMap. It feels like calls for pages would dominate the work to do

If you want to do buffering, you can try bufferIntrospective or other buffering operator (perhaps bufferTumbling inside flatMap). If there's no batch API then maybe concurrent calls would work too, e.g. mergeMap or Task.wander

Overall the code looks good to me, I don't really have too much to add. :D Though I will be happy to help out if it turns out to be a problem

though one of the linked issues says flatMap is not stacksafe, need to test this... if it's not that's a problem!

Why is it a problem, do you flatMap in infinite loop somewhere?

matfournier
@matfournier
Nope, I don't recursve on flatMap. Just something I came across! Ha. Perils of doing code and writing notes for yourself at the same time. Thanks!
Yeah, in practice the calls for pages dominates. If you are unlucky there are hundreds. If you are really unlucky, semi-reliable third parties (cough facebook cough) will have a bug with cyclical hierarchies and give you page cursors forever until you OOM on accumulation...
TapanVaishnav
@TapanVaishnav
@matfournier Hi, thanks for the code. I'm also not that good with all the efficiency stuff of monix but as Piotr said creating a single element Observable might not be as bad as you think. I think I could think of is creating an unbounded Observable and doing BFS (breadth first search; just for the clarification, sorry for stating the obvious) on your remote tree, emitting the parent page and then appending all the child pages at the end of the Observable designed with ConcurrentQueue(!?) or any other iterable, and traversing in the similar manner till the end.
Does this make any sense?
matfournier
@matfournier
Makes sense. Thankfully I have other contracts that kick in that stop me from going too deep, either time bound (caller has long since timed out) or size bound (these get surfaced to a front-end which croaks after ~5k elements anyway). Those conditions aren't in the example but have been easy to model.
Wojtek Pituła
@Krever
hay, any hints on how to turn Observable into java.io.InputStream ?
Piotr Gawryś
@Avasil
@Krever We have a PR in progress: monix/monix#1050 feedback is welcome :D
Wojtek Pituła
@Krever
ahh nice, sadly I need it right now :P but good to know it will come
Piotr Gawryś
@Avasil
If you'd rather avoid beta testing then I guess there is an option of converting to fs2 / akka-streams which already have this function
Matthew de Detrich
@mdedetrich
When using guaranteeCase is it possible to somehow get the successful value from the Task?
Fabio Labella
@SystemFw
@mdedetrich you might want to look into Concurrent#continual
Matthew de Detrich
@mdedetrich
Hmm, let me answer it this, would the following implementation be correct
      /**
        * Returns a new `TaskResult` in which f` is scheduled to be run on
        * completion. This would typically be used to release any
        * resources acquired by this `Task`.
        *
        * The returned `TaskResult` completes when both the source and the task
        * returned by `f` complete.
        *
        * NOTE: The given function is only called when the task is
        * complete.  However the function does not get called if the task
        * gets canceled.  Cancellation is a process that's concurrent with
        * the execution of a task and hence needs special handling.
        *
        * See [[doOnCancel]] for specifying a callback to call on
        * canceling a task.
        *
        * @param f The function argument. The first boolean parameter indicates
        * if the `TaskResult` finished due to an Exception or it being on the Left
        *
        */
      def doOnFinish(f: Option[(Boolean, Throwable)] => TaskResult[Unit]): TaskResult[Unit] = {
        EitherT.right(result.value.flatMap {
          case Left(e)  => f(Some(false, e)).value.void
          case Right(_) => f(None).value.void
        }.guaranteeCase {
          case ExitCase.Completed => Task.unit
          case ExitCase.Completed => Task.unit
          case ExitCase.Error(e) => f(Some(true, e)).value.void
        })
      }
Where type TaskResult[T] = EitherT[Task, GeneralError, T]
So basically I am trying to do a monix doOnFinish but I also want to handle the Left case if I am in an EitherT
Fabio Labella
@SystemFw
can you not call guaranteeCase on the EitherT directly?
Matthew de Detrich
@mdedetrich
I can but I can't get the success value which is what I need
Otherwise f would get called multiple times
Fabio Labella
@SystemFw
to get the success value, you might want to look at continual instead, as I said :)
Matthew de Detrich
@mdedetrich
Is that any better than the proposed solution?
Also I am trying to wrap my head around Concurrent.continual
Fabio Labella
@SystemFw
continual gives you a guarantee, everything else does not
value.flatMap is not interruption safe
Piotr Gawryś
@Avasil
In flatMap(f).guaranteeCase you might call both f (success) and case ExitCase.Canceled => if you're unlucky and it gets canceled in between
Matthew de Detrich
@mdedetrich
Okay thanks, I will look into continual then
Paweł Kiersznowski
@pk044
hey guys, are there any more changes I should make to monix/monix#1054 ?
TapanVaishnav
@TapanVaishnav
@pk044 Hi, added some comments, let me know what you think.
Matthew de Detrich
@mdedetrich
@Avasil @SystemFw Regarding the question I made last week about Concurrent.continual, would something like this be the correct solution?
      def doOnFinish(f: Option[(Boolean, Throwable)] => TaskResult[Unit]): TaskResult[T] =
        EitherT(
          Concurrent
            .continual(result.value) {
              case Left(e) => f(Some(true, e)).value
              case Right(value) =>
                value match {
                  case Left(generalError) => f(Some(false, generalError)).value
                  case Right(_)           => f(None).value
                }
            }
            .flatMap { _ =>
              result.value
            })
Fabio Labella
@SystemFw
I think there is no need to manually unwrap/rewrap the EitherT, just say result.continual { case ...
well, I guess it depends on how you are surfacing your errors (in the Task or in the EitherT layer)
Matthew de Detrich
@mdedetrich

I think there is no need to manually unwrap/rewrap

I need to do this to get the Left(generalError) case

Otherwise I only have T
matfournier
@matfournier
More of a style question. I have some observable producing Either[E, A] from a asyncStateAction, so really Task[Either[E, A]] at any point if I get a Left[E] I need to shutdown the entire stream and not bother (no recovery, just log). Am I better off just raising a failed task early, and thus having much simpler downstream observable actions, simpler consumers (no Either unwrapping), or is the preference to leave it as Either[E, A] ? I don't want to wade into the whole failed task vs Task[Either] vs EitherT[Task,] vs MonadError debate but curious what people have done.
Piotr Gawryś
@Avasil
@matfournier In your case I would either fail or do collectWhile { case Right(value) => value } (it will complete successfully IIRC). Streaming Either[E, A] when nobody can really do anything with E seems pointless
matfournier
@matfournier
Yeah, figured as much. Thanks.
Richard
@Executioner1939
Hi :-). What is the reason for defaulting threads created with the Scheduler to be daemon threads?