by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Alexandru Nedelcu
@alexandru
What I propose is basically moving blocking out of the loop, so blocking(fastLoop(...))
And hopefully this fixes it. But see if there's a better way.
Vasily Kirichenko
@vasily-kirichenko
ok
Vasily Kirichenko
@vasily-kirichenko
@alexandru monix/monix#1184
Alexandru Nedelcu
@alexandru
👍
Jorge
@jvican
Has anyone experimented with bridging twitter futures with monix tasks? e.g. Task.fromTwitterFuture?
Alexandru Nedelcu
@alexandru
@jvican I'm not super familiar with Twitter's Future but should be straightforward ...
def fromTwitterFuture[A](fa: Future[A]): Task[A] =
  Task.cancelable { cb =>
    val cancel = new RuntimeException("cancel")
    fa.respond {
      case Success(a) => cb.onSuccess(a)
      case Failure(e) =>
        e match {
          case `cancel` => // do nothing
          case _ => cb.onFailure(e)
        }
    }
    Task(fa.interrupt(cancel))
  }
I haven't tested it and have no idea if it's correct, but should be something close to this
Let me know if this works
I'm curious :)
Jorge
@jvican
Sounds good, will give it a try. Was wondering if a more cooperative integration approach would be possible, but that should work :+1:
Alexandru Nedelcu
@alexandru
@jvican what were you thinking of? What do you mean by cooperation?
Jorge
@jvican
I meant having an integration that is more aware of monix's engine and scheduler, partly because I don't know what's going on on the twitter future side
I was merely considering if there were some performance implications or gotchas with that kind of callback integration. But yeah, I don't think so, I was just thinking out loud :)
Alexandru Nedelcu
@alexandru
@jvican Twitter's Future does not require an executioncontext for working, AFAIK, which means that this should be fine.
Do test the cancelation, I'm curious if it works
There is one thing we could investigate at some point... they also have their own Local and I'm not sure if we can transport that, probably not as the required integration is very tight.
But it's all a guess on my part atm
elyphas
@elyphas
Hi, Someone could tell me, please,
What would be the way to validate not to trigger when is empty in this case:
def getIdComparative = (hdlFolio: Observable[Int], hdlEjercicio: Observable[Int]).parMapN {
        case (folio: Int, ejercicio: Int) => folio + "-" + ejercicio + "-" + "ADQ2"
  }
any of them hdlFolio and hdlEjercicio
Piotr Gawryś
@Avasil
@elyphas I don't think it's possible because you need to run Observable to know if it's empty or not. I think the best you can do is interrupt the other in case one of them doesn't emit any elements. Would it work for you? I wonder if it shouldn't be the default behavior
elyphas
@elyphas

@Avasil ,

I think the best you can do is interrupt the other in case one of them doesn't emit any elements

I think this could be good for me, because I need both values at the same time

I think the best you can do is interrupt the other in case one of them doesn't emit any elements

How can I do it

Piotr Gawryś
@Avasil

@elyphas A bit verbose but should help:

  val sourceA: Observable[Int] = ???
  val sourceB: Observable[Int] = ???
  val f: (Int, Int) => String = ???

  val result: Observable[String] =
    Observable.fromTask(Deferred[Task, Unit]).flatMap { d =>
      Observable.combineLatestMap2(
        sourceA.switchIfEmpty(Observable.empty[Int].doOnComplete(d.complete(()))).takeUntilEval(d.get),
        sourceB.switchIfEmpty(Observable.empty[Int].doOnComplete(d.complete(()))).takeUntilEval(d.get)
      )(f)
    }

IIRC parMapN usees combineLatestMap

Let me know if it doesn't work or you'd like an explanation how it works

elyphas
@elyphas
@Avasil , Ok, I going to test it in my app
elyphas
@elyphas
@Avasil ; Sorry!, It keep triggering but I think because hdlFolio has a 0 value but in the inputBox just show a empty string, then I think the condition is with 0 in hdlFolio.
Piotr Gawryś
@Avasil
So it should interrupt the other but only if the only element is 0? Can you post few sample inputs and expected outputs? I wonder if zip wouldn't be better
elyphas
@elyphas
OK
elyphas
@elyphas

@Avasil ; it start on 0 but it is not a valid value.
so it has to discard.

0,   1,   2,   3   ... 10  .\
                                       .\
                                       ./       "1-2019",          "2-2019",     "1-2020",      "2-2020",        "3-2019"
2019,         2020       ./

I am trying to show the way how it has to works, not the observable because I think I can't put it in a better way, :)

sometimes would be 2019 and sometimes 2020. but in between the change on hdlEjercicio also sometimes hdlFolio has a 0 value mainly in the beggining of the application.

Piotr Gawryś
@Avasil
how about filter(_ != 0).switchIfEmpty(...) ?
if it starts at 0 but there are no other elements then it will be considered empty so the rest of my snippet should work
elyphas
@elyphas
ok
It works, thank you
@Avasil ; by the way, It would be great if you explain to me, I think I understand most of the code, but I don't want to make bad asumptions.
Piotr Gawryś
@Avasil

The code uses Deferred as a signal - takeUntilEval(d.get) will process the stream until Deferred is completed.
We want to set it only if one of the sources is empty:

sourceA.switchIfEmpty(Observable.empty[Int].doOnComplete(d.complete(())))

switchIfEmpty "switches" to the other Observable if sourceA completes without emitting any element. Internally it checks if upstream called onNext before onComplete so if the real source is filtered then it might not emit any onNext at this point.

We only want to complete Deferred as a backup but it expects the same type as sourceA so we can't do Observable.fromTask(d.complete(())) which returns Observable[Unit]. We can solve it with empty Observable (which can be any type) and just do d.complete when it finishes which happens right away

elyphas
@elyphas
Great! Thank You
Pau Alarcón
@paualarco
hello! just released monix-connect 0.1.0 🎉
the set of connectors available so far are akka, dynamodb, hdfs, parquet, redis and s3, but be aware that binary compatibility is not promised until 1.x
hope you enjoy it! any feedback would be appreciated :)
Piotr Gawryś
@Avasil
Fantastic work! I watch the repo and it reminds me every day to stop slacking off :D
Pau Alarcón
@paualarco
thanks @Avasil :)
Alexandru Nedelcu
@alexandru
Nice work @paualarco 😃
Alexandru Nedelcu
@alexandru

Fantastic work! I watch the repo and it reminds me every day to stop slacking off :D

Haha, I'm getting the same feeling watching you @Avasil 😅

Karl Bielefeldt
@kbielefe
Quick question. Can a fiber.cancel Task fail? Or hang indefinitely?
Piotr Gawryś
@Avasil
@kbielefe IIRC it can't fail but it could hang indefinitely - cancel will return after finalizers are run (release in bracket / guarantee), here's an example from tests: https://github.com/monix/monix/blob/master/monix-eval/shared/src/test/scala/monix/eval/TaskBracketSuite.scala#L345
Alexandru Nedelcu
@alexandru
Indeed, you could startAndForget it though.
Karl Bielefeldt
@kbielefe
Thanks.
Dominik Zajkowski
@dzajkowski

Hi guys. Question regarding the proper usage of Iterant with a reactive publisher.
Let me build the scene first:

val iterant = Iterant[Task].fromReactivePublisher[String](pub)

this should be a valid legal Iterant, right?

following the uncons scala doc I can write a uncons fold and that works like a charm.

but if I go and do something like this:

val head1 = iterant.uncons.headOptionL
val m = head1.flatMap {
  case Some(Some(res), rest) =>
    val h = rest.headOptionL
    h.runSyncUnsafe(5.minutes) // <-- (1)
    Task(1)
  case _ => Task(0)
}

the code hangs at (1)
Now I highjacked the implementation of IterantFromReactivePublisher case class and I see that the 'release' function is getting called right after the first flatmap.

But when I can write code that's based on a List (Iternat.fromList) which will handle the above recursion fine.

the runSyncUnsafe is just to showcase the finding I would much rather have headOptionL and uncons in the flatMap.

Now is this me misusing the underlying Scope or is this a bug?

I guess I should mentions: the release ExitCase is Completed
Alexandru Nedelcu
@alexandru

@dzajkowski I don't know why this is. Your code sample feels like a protocol violation, but it might be perfectly legit.

It might be a bug. Please fill an issue.

Dominik Zajkowski
@dzajkowski
@alexandru thanks