Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Nick Childers
    @Voltir

    So ive been playing around with alternatives, and this seems to work:

      implicit class FutureCombinators[T](val v: rx.Rx[Future[T]]) extends AnyVal {
        def flatten(initial: T)(implicit ec: ExecutionContext, ctx: Ctx.Owner): Rx[T] = {
          var result = Var(initial)
          v.foreach { x => x.foreach(result.update) }
          result
        }
      }

    And its used this way:
    id.map { x => request(x) }.flatten("...")

    @rreckel would something like that work? @fdietze what do you think?
    Nick Childers
    @Voltir
    Bleh, that actually doesnt work if you end up having the flattened future inside of a flatMap
    Roland Reckel
    @rreckel

    I tried your flatten function:

    import rx._
    import rx.async._
    import scala.concurrent.Future
    import scala.concurrent.Promise
    import scala.concurrent.ExecutionContext
    
    implicit class FutureCombinators[T](val v: rx.Rx[Future[T]]) extends AnyVal {
      def flatten(initial: T)(implicit ec: ExecutionContext, ctx: Ctx.Owner): Rx[T] = {
        val result = Var(initial)
        v.foreach { x => x.foreach(result.update) }
        result
      }
    }
    
    def request(id: Int): Future[String] = {
      val p = Promise[String]()
      js.timers.setTimeout(500) {
        println(s"Request: $id")
        p.success(id.toString)
      }
      p.future
    }
    
    import scala.concurrent.ExecutionContext.Implicits.global
    
    val id = Var[Int](0)
    val idString = id.map(i => request(i)) .flatten("...")
    
    val result = for {
      i <- id
      is <- idString
    } yield s"$i -> $is"
    
    result.foreach(println(_))
    
    id() = 10
    id() = 20
    println("End...")

    This code generates the following result in ScalaFiddle:

    0 -> ...
    10 -> ...
    20 -> ...
    End...
    Request: 0
    20 -> 0
    Request: 10
    20 -> 10
    Request: 20
    20 -> 20

    This is not exactly what I expected..... but still better than an infinite loop ;-)
    What do you think?

    Nick Childers
    @Voltir
    Interesting - I tried a similar test but had the map/flatten in the for loop and that created an infinite loop
    Maybe this and a section in the readme explaining this is sufficient?
    Felix Dietze
    @fdietze
    Are you trying with a 0.3 version or with latest master? To me it looks like the flatMap leak, which we fixed recently.
    Roland Reckel
    @rreckel
    I am using version 0.3.2
    I integrated the flatten function in my project now, and I don't have any loops anymore.
    Does an "official" latest version of scala.rx exist? I often try some snippets of code in ScalaFiddle but there the only version is 0.3.2
    Anyway I can test the code with the latest master and keep you informed.
    Roland Reckel
    @rreckel

    Hi again,
    I am sorry to report that the latest master does not resolve the loop in flatMap with toRx.
    The following script can be executed in ammonite:

    import $ivy.`com.lihaoyi::scalarx:0.4.0-SNAPSHOT`, rx._, rx.async._
    import Ctx.Owner.Unsafe._
    
    import scala.concurrent._
    import scala.concurrent.duration._
    import scala.concurrent.ExecutionContext.Implicits.global
    
    object FlatMap {
    
      def test() = {
        def request(id: Int): Future[String] = Future {
          Thread.sleep(500)
          println(s"Request: $id")
          id.toString
        }
    
        val id = Var[Int](0)
        val idString = id.flatMap(i => request(i).toRx("..."))
    
        id() = 10
        id() = 20
        println("End...")
        Await.result(Future{Thread.sleep(3000); println("Waited long enough")}, 30 seconds)
      }
    }
    
    FlatMap.test()

    and produces the following output:

    End...
    Request: 0
    Request: 10
    Request: 20
    Request: 20
    Request: 20
    Request: 20
    Request: 20
    Waited long enough
    Request: 20
    Felix Dietze
    @fdietze
    @rreckel Could you please open a PR with a failing test case?
    Nick Childers
    @Voltir
    I've been trying to tackle this problem - it's not the same issue as the flatmap problem. It also seems really hard to solve in a satisfying way.
    Making futures compose nicely in Rx contexts is hard because the re-evaluation step recreates the future, creating the cycle of infinite loop
    Roland Reckel
    @rreckel
    Call me crazy, but would it be possible to create a "toRx" function for Monix Task? Would that be easier perhaps, as those are lazy evaluated?
    I guess I will have to look at the scala.rx code a bit.
    Nick Childers
    @Voltir
    Sure - I think that's possible
    Perhaps something akin to how sttp does its effect encoding https://github.com/softwaremill/sttp
    I wouldn't want to add a dependency to monix in the core repo though
    Roland Reckel
    @rreckel
    You are right.... adding a dependency is not good.
    Nick Childers
    @Voltir
    What gives me hope is that toRx does appear to work if the definition is outside of the flatmap - I think if I can make the flatmap macro generate the equivalent of that, we might finally have a decent solution
    Minghao Liu
    @molikto
    @  val a = Var(1)  
    
    a: Var[Int] = Var@70(1)
    
    @  
    
    @ val b = Var(2)  
    b: Var[Int] = Var@6b(2)
    
    @  val c = Rx { println("c triggered "+ ( a() + b()))}  
    c triggered 3
    c: Rx.Dynamic[Unit] = Rx@2e(())
    
    
    @  
    
    @ a.kill() 
    
    
    @ b() = 3 
    c triggered 4
    
    
    @ a() = 5 
    c triggered 8
    so seems kill() doesn't work on Var?
    Nick Childers
    @Voltir
    @molikto ah, thats interesting - what happens if you dont do the b() = 3 step?
    kill on a Var just clears the downstream listeners - Vars dont really have an owner. What I think happens is when you trigger a re-evaluation of c via b() = 3, the dependency c has on a is just recreated. I guess "kill" is kind of a misleading name for what actually happens on a Var.
    Peng Li
    @freewind
    Hi guys, I have some questions when testing scala.rx code, and post a question here: https://stackoverflow.com/questions/49073908/how-to-test-vars-of-scala-rx-with-scalatest
    Thanks for helping
    Nick Childers
    @Voltir
    @freewind i took a stab at answering it, let me know if you want any clarification
    Peng Li
    @freewind
    @Voltir Thanks for your answer! It gives me some idea and I changed my test using scalatest's AsyncTestSuite and the code is:
    test("watchOrders") {
      var result = List.empty[Order]
      val stream = client.watchOrders()
      val promise = Promise[Assertion]()
      stream.fold(List.empty[Order]) {
        case (acc, Some(elem)) =>
          val result = elem :: acc
          println("result size: " + result.size)
          if (result.size == 4) promise.success({
            assert(validateTheOrders(result))
          })
          result
      }
      promise.future
    }
    The good new is if the validateTheOrders(result) is passed, the test will pass and terminate as expected
    But if the assertion is false, it will never stop, and the result size printed in console will be:
    result size: 1
    result size: 2
    result size: 3
    result size: 4
    result size: 4
    result size: 4
    result size: 4
    ...
    Peng Li
    @freewind

    @Voltir I just make it work inspired with the eventually method from haoyi's utest. Now I'm still using scalatest's SyncTestSuit, and the code is like:

      private def waitUntil[T](stream: Rx[T])(cond: T => Boolean, maxWait: Duration): T = {
        val promise = Promise[T]()
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() => {
          val list = stream.now
          if (cond(list)) {
            promise.success(list)
          }
        }, 0L, 100L, TimeUnit.MILLISECONDS)
        Await.result(promise.future, maxWait)
      }
    
      test("watchOrders") {
        val stream = client.watchOrders()
        val listRx: Rx[List[Order]] = stream.fold(List.empty[Order]) {
          case (acc, Some(elem)) => elem :: acc
          case (acc, _) => acc
        }
        val list = waitUntil(listRx)(_.size >= 4, 10.seconds)
        assert(validateTheOrders(list))
      }

    It's working great now

    Nick Childers
    @Voltir
    @/all scala.rx 0.4.0 was published
    scalway
    @scalway
    @Voltir where? I cannot find sources.
    Oh... there is still old documentation & there is no github release :).
    moritz bust
    @busti
    Why do I have to have an implicit Owner in scope when using rx.fold()() in a class / trait, but not when I use it elsewhere? Can I avoid that somehow?
    This is how scala.rx fights against memory leaks
    moritz bust
    @busti
    Well yes, but if I understand it correctly, the context is generated automagically via macros most of the time? When I use fold in a repl, I do not need to declare an ownership context...
    Is it okay to just include an implicit val ctx = Ctx.Owner.save() in my class? I have seen that somewhere...
    But unfortunately .save() lacks documentation.
    moritz bust
    @busti

    In this example:

    class Foo {
      val a = Var(0)
    
      val b = a.fold(1)(_ * _)
    }

    how do I obtain / define / pass an Ownership Context?
    It just is not very clear to me.

    Nick Childers
    @Voltir
    Pass it in as an implicit class param on Foo
    moritz bust
    @busti
    @Voltir But where does that implicit come from? I am not using any rx's elsewhere...
    Nick Childers
    @Voltir
    so, if you have some object:
    object Main {
      val foo = new Foo()
    }
    you will get a context for free
    (magic macro stuff happens to make that possible)
    if you are inside a class, then you can again make that outer class take an implicit ctx
    if you know for sure that Foo (or the class that wraps it) is going to be instantiated a finite number of times, then its safe to use the "unsafe" owner ctx