I don’t think this has anything to do with map
or functor laws, as replacing map
with the corresponding Rx
construct leads to the same problem:
import $ivy.`com.lihaoyi::scalarx:0.3.2`, rx._
object Example {
val v1 = Var(0)
val v2 = Rx { v1() }
val v22 = Rx { v1() }
val v23 = Rx { v22() }
val v3 = Rx { v23() }
def q(implicit trackDependency: Ctx.Data) = {
if (v1() == 0) v2()
else {
val b = v3() != v2()
if (b)
103
else
17
}
}
val v = Rx { q }
val list = {
var result = List.empty[Int]
val obs = v.trigger { result = result :+ v.now }
v1() = 1
v1() = 2
result
}
}
println(Example.list)
This prints List(0, 103, 17)
instead of List(0, 17)
, too.
Hi,
I've got a problem with calling Futures out of Rx.
Just copy and paste the following code into ScalaFiddle:
import rx._
import rx.async._
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.ExecutionContext.Implicits.global
def request(id: Int): Future[String] = {
val p = Promise[String]()
js.timers.setTimeout(500) {
println(s"Request: $id")
p.success(id.toString)
}
p.future
}
val id = Var[Int](0)
val idString = id.flatMap(i => request(i).toRx("..."))
id() = 10
println("End...")
And you fall into a loop, with the following output:
End...
Request: 0
Request: 10
Request: 10
Request: 10
Request: 10
Request: 10
Request: 10
Request: 10
etc....
Is this a known behaviour? or is this a bug?
Is there a workaround for this kind of pattern?
Thanks for your help.
Roll
@rreckel Yea, currently the way flatMap works, it recreates the inner rx whenever the inner rx changes - this interacts poorly with toRx because, as you see in your test, it throws it in an infinite loop.
Im having a hard time coming up with an alternate definition of toRx that actually works within flatmap and am considering just removing it all together.
So ive been playing around with alternatives, and this seems to work:
implicit class FutureCombinators[T](val v: rx.Rx[Future[T]]) extends AnyVal {
def flatten(initial: T)(implicit ec: ExecutionContext, ctx: Ctx.Owner): Rx[T] = {
var result = Var(initial)
v.foreach { x => x.foreach(result.update) }
result
}
}
And its used this way:id.map { x => request(x) }.flatten("...")
I tried your flatten function:
import rx._
import rx.async._
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.ExecutionContext
implicit class FutureCombinators[T](val v: rx.Rx[Future[T]]) extends AnyVal {
def flatten(initial: T)(implicit ec: ExecutionContext, ctx: Ctx.Owner): Rx[T] = {
val result = Var(initial)
v.foreach { x => x.foreach(result.update) }
result
}
}
def request(id: Int): Future[String] = {
val p = Promise[String]()
js.timers.setTimeout(500) {
println(s"Request: $id")
p.success(id.toString)
}
p.future
}
import scala.concurrent.ExecutionContext.Implicits.global
val id = Var[Int](0)
val idString = id.map(i => request(i)) .flatten("...")
val result = for {
i <- id
is <- idString
} yield s"$i -> $is"
result.foreach(println(_))
id() = 10
id() = 20
println("End...")
This code generates the following result in ScalaFiddle:
0 -> ...
10 -> ...
20 -> ...
End...
Request: 0
20 -> 0
Request: 10
20 -> 10
Request: 20
20 -> 20
This is not exactly what I expected..... but still better than an infinite loop ;-)
What do you think?
Hi again,
I am sorry to report that the latest master does not resolve the loop in flatMap with toRx.
The following script can be executed in ammonite:
import $ivy.`com.lihaoyi::scalarx:0.4.0-SNAPSHOT`, rx._, rx.async._
import Ctx.Owner.Unsafe._
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
object FlatMap {
def test() = {
def request(id: Int): Future[String] = Future {
Thread.sleep(500)
println(s"Request: $id")
id.toString
}
val id = Var[Int](0)
val idString = id.flatMap(i => request(i).toRx("..."))
id() = 10
id() = 20
println("End...")
Await.result(Future{Thread.sleep(3000); println("Waited long enough")}, 30 seconds)
}
}
FlatMap.test()
and produces the following output:
End...
Request: 0
Request: 10
Request: 20
Request: 20
Request: 20
Request: 20
Request: 20
Waited long enough
Request: 20
kill()
doesn't work on Var
?
c
via b() = 3
, the dependency c
has on a
is just recreated. I guess "kill" is kind of a misleading name for what actually happens on a Var
.
test("watchOrders") {
var result = List.empty[Order]
val stream = client.watchOrders()
val promise = Promise[Assertion]()
stream.fold(List.empty[Order]) {
case (acc, Some(elem)) =>
val result = elem :: acc
println("result size: " + result.size)
if (result.size == 4) promise.success({
assert(validateTheOrders(result))
})
result
}
promise.future
}
validateTheOrders(result)
is passed, the test will pass and terminate as expected
result size
printed in console will be:result size: 1
result size: 2
result size: 3
result size: 4
result size: 4
result size: 4
result size: 4
...
@Voltir I just make it work inspired with the eventually
method from haoyi's utest. Now I'm still using scalatest's SyncTestSuit, and the code is like:
private def waitUntil[T](stream: Rx[T])(cond: T => Boolean, maxWait: Duration): T = {
val promise = Promise[T]()
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() => {
val list = stream.now
if (cond(list)) {
promise.success(list)
}
}, 0L, 100L, TimeUnit.MILLISECONDS)
Await.result(promise.future, maxWait)
}
test("watchOrders") {
val stream = client.watchOrders()
val listRx: Rx[List[Order]] = stream.fold(List.empty[Order]) {
case (acc, Some(elem)) => elem :: acc
case (acc, _) => acc
}
val list = waitUntil(listRx)(_.size >= 4, 10.seconds)
assert(validateTheOrders(list))
}
It's working great now
implicit val ctx = Ctx.Owner.save()
in my class? I have seen that somewhere....save()
lacks documentation.