I tried your flatten function:
import rx._
import rx.async._
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.ExecutionContext
implicit class FutureCombinators[T](val v: rx.Rx[Future[T]]) extends AnyVal {
def flatten(initial: T)(implicit ec: ExecutionContext, ctx: Ctx.Owner): Rx[T] = {
val result = Var(initial)
v.foreach { x => x.foreach(result.update) }
result
}
}
def request(id: Int): Future[String] = {
val p = Promise[String]()
js.timers.setTimeout(500) {
println(s"Request: $id")
p.success(id.toString)
}
p.future
}
import scala.concurrent.ExecutionContext.Implicits.global
val id = Var[Int](0)
val idString = id.map(i => request(i)) .flatten("...")
val result = for {
i <- id
is <- idString
} yield s"$i -> $is"
result.foreach(println(_))
id() = 10
id() = 20
println("End...")
This code generates the following result in ScalaFiddle:
0 -> ...
10 -> ...
20 -> ...
End...
Request: 0
20 -> 0
Request: 10
20 -> 10
Request: 20
20 -> 20
This is not exactly what I expected..... but still better than an infinite loop ;-)
What do you think?
Hi again,
I am sorry to report that the latest master does not resolve the loop in flatMap with toRx.
The following script can be executed in ammonite:
import $ivy.`com.lihaoyi::scalarx:0.4.0-SNAPSHOT`, rx._, rx.async._
import Ctx.Owner.Unsafe._
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
object FlatMap {
def test() = {
def request(id: Int): Future[String] = Future {
Thread.sleep(500)
println(s"Request: $id")
id.toString
}
val id = Var[Int](0)
val idString = id.flatMap(i => request(i).toRx("..."))
id() = 10
id() = 20
println("End...")
Await.result(Future{Thread.sleep(3000); println("Waited long enough")}, 30 seconds)
}
}
FlatMap.test()
and produces the following output:
End...
Request: 0
Request: 10
Request: 20
Request: 20
Request: 20
Request: 20
Request: 20
Waited long enough
Request: 20
kill()
doesn't work on Var
?
c
via b() = 3
, the dependency c
has on a
is just recreated. I guess "kill" is kind of a misleading name for what actually happens on a Var
.
test("watchOrders") {
var result = List.empty[Order]
val stream = client.watchOrders()
val promise = Promise[Assertion]()
stream.fold(List.empty[Order]) {
case (acc, Some(elem)) =>
val result = elem :: acc
println("result size: " + result.size)
if (result.size == 4) promise.success({
assert(validateTheOrders(result))
})
result
}
promise.future
}
validateTheOrders(result)
is passed, the test will pass and terminate as expected
result size
printed in console will be:result size: 1
result size: 2
result size: 3
result size: 4
result size: 4
result size: 4
result size: 4
...
@Voltir I just make it work inspired with the eventually
method from haoyi's utest. Now I'm still using scalatest's SyncTestSuit, and the code is like:
private def waitUntil[T](stream: Rx[T])(cond: T => Boolean, maxWait: Duration): T = {
val promise = Promise[T]()
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() => {
val list = stream.now
if (cond(list)) {
promise.success(list)
}
}, 0L, 100L, TimeUnit.MILLISECONDS)
Await.result(promise.future, maxWait)
}
test("watchOrders") {
val stream = client.watchOrders()
val listRx: Rx[List[Order]] = stream.fold(List.empty[Order]) {
case (acc, Some(elem)) => elem :: acc
case (acc, _) => acc
}
val list = waitUntil(listRx)(_.size >= 4, 10.seconds)
assert(validateTheOrders(list))
}
It's working great now
implicit val ctx = Ctx.Owner.save()
in my class? I have seen that somewhere....save()
lacks documentation.
import rx._
but my code says not found: value Obs
in this code piece for providing the implicit conversion.