Hi again,
I am sorry to report that the latest master does not resolve the loop in flatMap with toRx.
The following script can be executed in ammonite:
import $ivy.`com.lihaoyi::scalarx:0.4.0-SNAPSHOT`, rx._, rx.async._
import Ctx.Owner.Unsafe._
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
object FlatMap {
def test() = {
def request(id: Int): Future[String] = Future {
Thread.sleep(500)
println(s"Request: $id")
id.toString
}
val id = Var[Int](0)
val idString = id.flatMap(i => request(i).toRx("..."))
id() = 10
id() = 20
println("End...")
Await.result(Future{Thread.sleep(3000); println("Waited long enough")}, 30 seconds)
}
}
FlatMap.test()
and produces the following output:
End...
Request: 0
Request: 10
Request: 20
Request: 20
Request: 20
Request: 20
Request: 20
Waited long enough
Request: 20
kill()
doesn't work on Var
?
c
via b() = 3
, the dependency c
has on a
is just recreated. I guess "kill" is kind of a misleading name for what actually happens on a Var
.
test("watchOrders") {
var result = List.empty[Order]
val stream = client.watchOrders()
val promise = Promise[Assertion]()
stream.fold(List.empty[Order]) {
case (acc, Some(elem)) =>
val result = elem :: acc
println("result size: " + result.size)
if (result.size == 4) promise.success({
assert(validateTheOrders(result))
})
result
}
promise.future
}
validateTheOrders(result)
is passed, the test will pass and terminate as expected
result size
printed in console will be:result size: 1
result size: 2
result size: 3
result size: 4
result size: 4
result size: 4
result size: 4
...
@Voltir I just make it work inspired with the eventually
method from haoyi's utest. Now I'm still using scalatest's SyncTestSuit, and the code is like:
private def waitUntil[T](stream: Rx[T])(cond: T => Boolean, maxWait: Duration): T = {
val promise = Promise[T]()
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() => {
val list = stream.now
if (cond(list)) {
promise.success(list)
}
}, 0L, 100L, TimeUnit.MILLISECONDS)
Await.result(promise.future, maxWait)
}
test("watchOrders") {
val stream = client.watchOrders()
val listRx: Rx[List[Order]] = stream.fold(List.empty[Order]) {
case (acc, Some(elem)) => elem :: acc
case (acc, _) => acc
}
val list = waitUntil(listRx)(_.size >= 4, 10.seconds)
assert(validateTheOrders(list))
}
It's working great now
implicit val ctx = Ctx.Owner.save()
in my class? I have seen that somewhere....save()
lacks documentation.
import rx._
but my code says not found: value Obs
in this code piece for providing the implicit conversion.
"scalarx" % "0.2.8"
, but latest is "0.4.0"
implicit def rxFrag[T](r: Rx[T])(implicit dataCtx: Ctx.Data, ev: T => Frag): Frag = {
import Ctx.Owner.Unsafe._
def rSafe: dom.Node = span(r()).render
var last = rSafe
r.triggerLater {
val newLast = rSafe
js.Dynamic.global.last = last
last.parentNode.replaceChild(newLast, last)
last = newLast
}
last
}
implicit Ctx.Data
...