Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Piotr Gawryś
@Avasil
Observable can be used to consume from MVar: Observable.repeatEvalF(var0.take)
How would you like to feed elements there? Do you want to keep using MVar or something else like ConcurrentQueue?
Piotr Gawryś
@Avasil
@fayimora I'm not familiar with Play but runToFuture seems right if you need to interop with library using Future. That's what I did when I was mixing Task with Akka HTTP. I don't know how Action is implemented - perhaps there is a way to make it work with Task directly
Fayi Femi-Balogun
@fayimora
Akka HTTP, perfect. Play uses aka-http
Monix definately needs some better examples.. I’ll help when I get a better understanding and experiment a little.
Piotr Gawryś
@Avasil
It would be definitely very welcome. :D
Glen Marchesani
@fizzy33
@Avasil good question I am trying to see if I can expose something as much like mhtml.Var[String] as possible... So I am investigating ConcurrentQueue as a solution too
Piotr Gawryś
@Avasil
It looks like monix.reactive.subjects.Var
Glen Marchesani
@fizzy33
@Avasil That will do it, thanks.
Aleksandrov Vladimir
@invis87
Hi guys!
I am new to Monix, and want to ask for help - how to create stream of events (looks like it is Observerable in Monix) and some value that always have last element from that stream. All other elements can be ignored
Dicky Arinal
@arinal
@Avasil Thanks, it works perfectly!
Piotr Gawryś
@Avasil

@invis87

There are at least two ways, depending on whether you want to be pure or not, I will show you both.

Impure version using Hot Observable:

  implicit val s = Scheduler.global

  val stream: Observable[Long] =
    Observable
      .intervalAtFixedRate(100.millis)
      .doOnNext(l => Task(println(s"Processing $l")))

  // BehaviorSubject will always emit latest value as first element
  val behaviorObservable: ConnectableObservable[Long] = stream.behavior(initialValue = 0L)

  val impure = for {
    // starts emitting elements to all subscribers, at this point `stream` will start executing
    _ <- Task(behaviorObservable.connect())
    lastElement: Task[Long] = behaviorObservable.firstL
    _ <- lastElement.map(el => println(s"lastElement: $el"))
    _ <- lastElement.map(el => println(s"lastElement: $el")).delayExecution(500.millis)
    _ <- lastElement.map(el => println(s"lastElement: $el")).delayExecution(1.second)
  } yield ()

  impure.runSyncUnsafe()
Pure version using regular Observable + Ref:
  val pure = for {
    lastElement <- Ref[Task].of(0L)
    // will start running in the background
    _ <- stream.doOnNext(l => lastElement.set(l)).completedL.forkAndForget
    _ <- lastElement.get.map(el => println(s"lastElement: $el"))
    _ <- lastElement.get.map(el => println(s"lastElement: $el")).delayExecution(500.millis)
    _ <- lastElement.get.map(el => println(s"lastElement: $el")).delayExecution(1.second)
  } yield ()

  pure.runSyncUnsafe()
Aleksandrov Vladimir
@invis87
@Avasil thanks a lot, will try :) !
Aleksandrov Vladimir
@invis87
ok, but we store last element in Task, is there a way to store it naked ? Without wrapper
Aleksandrov Vladimir
@invis87
I want to write auth token refresher on Monix, but in some val currentToken: AuthToken store last token (and, yes, amazing if it will be exactly value, not variable) sounds impossible, but maybe there is some ways to do that, or something similar to what I want
Aleksandrov Vladimir
@invis87
    var currentValue: Long = 0

    val stream: Observable[Long] =
      Observable
        .intervalAtFixedRate(300.millis)
        .doOnNext(l => println(s"processing: $l"))
        .doOnNext(v => currentValue = v)

    stream.subscribe()
How bad it is :) ? Super bad or ooookey? Probably currentValue should be volatile
Piotr Gawryś
@Avasil

It's bad if you want pure FP but you can't have a pure mutable object without any effect like Task. In your case I think this approach is fine.

Yes, it needs to be either volatile or Atomic. Maybe try something like

def currentToken: AuthToken = currentValue

@volatile private var currentValue: Long = 0

or

def currentToken: AuthToken = currentValue.get

private val currentValue: AtomicLong = AtomicLong(0L)

so it can't be mutated from outside.

Aleksandrov Vladimir
@invis87
"so it can't be mutated from outside." - yep of course, the way I wrote it is just a prototype to show idea. I am not sure about changing var inside Observable
Glen Marchesani
@fizzy33
@Avasil poking around it looks like monix.reactive.subjects.Var is scala 2.13.0 . I can't find a recent build of monix for scala 2.12.0. Not a big deal just confirming.
Piotr Gawryś
@Avasil
@invis87 It's fine as long as you take concurrency into account
@fizzy33 Are you sure? I think Var should be available for all Scala versions
If you need Scala 2.13 then there is a snapshot: 3.0.0-eddc79d-SNAPSHOT
Glen Marchesani
@fizzy33
no definitely not sure @Avasil I found my mistake. The ordering of versions I see 3.0.0-RC3 for scala 2.11 2.12 and 2.13 now thanks
I just looked at the top and assumed that is the most recent
Daniel Karch
@danielkarch

Hi, I just read through https://blog.softwaremill.com/thread-shifting-in-cats-effect-and-zio-9c184708067b and was wondering how this works in monix.
Suppose I run

t1 *> t2 *> t3

and I want t1 and t3 to run on the same scheduler, but t2 is wrapping some library call that uses its own thread pool which is not in my control,
how can I achieve this? I found executeOn, but from reading the documentation I don't understand if it would shift back after the task is run.

Piotr Gawryś
@Avasil

@danielkarch
It will always come back like in ZIO:

  implicit val s1 = Scheduler.singleThread("s1") // default scheduler
  val s2 = Scheduler.singleThread("s2")
  val s3 = Scheduler.singleThread("s3")

  def task(i: Int) = Task(println(s"$i: Executing on ${Thread.currentThread().getName}"))

  val t =
    for {
      _ <- task(1).executeOn(s2) // executes on s2
      _ <- task(2) // executes on s1
      _ <- task(3).executeOn(s3) // executes on s3
      _ <- task(4) // executes on s1
      // executes both tasks on s2
      _ <- (task(5) >> Task.shift >> task(6)).executeOn(s2)
    } yield ()

  t.runSyncUnsafe()

Task.async , fromFuture worked like in Cats-Effect until quite recently, on master it always stays on the default Scheduler

Daniel Karch
@danielkarch
Thank you. So I get this behaviour with 3.0.0-RC3?
What is the purpose of the Task.shift in the example?
Piotr Gawryś
@Avasil

Thank you. So I get this behaviour with 3.0.0-RC3?

For executeOn yes, I think it works like this even in 2.x but I'm not sure. For Task builders it is not released yet

What is the purpose of the Task.shift in the example?

shift without argument shifts to the default thread pool, not the current one. cats.effect.IO with evalOn would execute 5 on s2, then shift back to s1 and execute 6 on s1. Task.executeOn overrides the default for the duration of a Task so it is safe in this regard. I think it will also happen for IO in 3.0 but it needs a major change in the internals

Daniel Karch
@danielkarch
But would the example above not behave the same without the Task.shift? I.e., execute both 5 and 6 on s2?
Piotr Gawryś
@Avasil
Yeah, it would behave the same, I just added it for fuller picture
Daniel Karch
@danielkarch
Understood :thumbsup:
Pierre-Henri Toussaint
@botrunner_gitlab

Hi All,

I just learned about TaskLocal.isolate and started using it to isolate task from incoming http requests to make sure that MDC and other tracing utilities are set properly. It works well for that use case.

However I found a strange behavior when using Task.fromFuture. My goal was to emulate a 3rd party library, blocking and clearing the MDC.

  test("isolate contexts from Future" ) {

    implicit val scheduler: Scheduler = monix.execution.Scheduler.Implicits.traced
    implicit val opts: Task.Options   = Task.defaultOptions.enableLocalContextPropagation

    val s2 = Scheduler.singleThread("s2")

    val local: Local[Int] = Local[Int](-1)
    val test = (i: Int) => {
      for {
        _ <- Task(local.update(i))
        _ <- TaskLocal.isolate(Task.fromFuture(Future {
          Thread.sleep(10)
          local.clear()
        }(s2)))
        r <- Task(assert(local.get == i))
      } yield r
    }
    Task.gather(Range(0, 5).map(i => TaskLocal.isolate(test(i)))).runSyncUnsafeOpt()
  }

The code above fails, but only with the addition of Thread.sleep.
I could make the test pass by shifting back just after Task.fromFuture, or using Task.executeOn.

Am I missing something?
I tested with the latest on master, as I thought that this fix could help: monix/monix#966

Piotr Gawryś
@Avasil
Thanks for the report @botrunner_gitlab I can investigate tomorrow. Does it work with Task.deferFuture? Task.fromFuture takes Future eagerly so it might be the case that the Future starts executing before isolation takes place
Pierre-Henri Toussaint
@botrunner_gitlab
It does not, but I learned something :)
This works (Task.shift)
    val local: Local[Int] = Local[Int](-1)
    val test = (i: Int) => {
      for {
        _ <- Task(local.update(i))
        _ <- TaskLocal.isolate(Task.deferFuture(Future {
          Thread.sleep(10)
          local.clear()
        }) <* Task.shift)
        r <- Task(assert(local.get == i))
      } yield r
    }
    Task.gather(Range(0, 5).map(i => TaskLocal.isolate(test(i)))).runSyncUnsafeOpt()
Oleg Pyzhcov
@oleg-py
It might work by accident, non-deterministically, since you have several threads
megri
@megri
Is Monix tested to handle very long running processes? It's a bit of a curious question now that I've built my server, but the intention is to let the server schedule tasks on a by-week interval, as well as broadcasting a single stream indefinitely.
Piotr Gawryś
@Avasil

@oleg-py I think it is just yet another reason to revisit restoreLocals in 3.1.0 :P monix/monix#986

@botrunner_gitlab I think I minimized failing case a bit and I think my fix should work with your original example. Are you able to verify from my PR?

Piotr Gawryś
@Avasil
@megri It should work fine from my experience and it is something that we definitely support if you stumble upon any issues
Pierre-Henri Toussaint
@botrunner_gitlab
@Avasil I just tested it and it works fine with my previously failing tests. Thanks a lot for investigating!
megri
@megri
Hmm, I want to map a Resource[Coeval, Whatever] to Resource[Task, Whatever]. I've found .mapK and applied it by defining a def coevalToTask[A](coeval: Coeval[A]): Task[A] = coeval.to[Task], but this feels like I'm just repeating the signature of .to[Task]. I'm pretty new to the FunctionK-concept so maybe I'm missing some imports?
Oleg Pyzhcov
@oleg-py
@megri you should be able to do resource.mapK(TaskLike[Coeval]). See #865 for more details
(assuming you're on RC3)
megri
@megri
@oleg-py Sweet, thanks
Paul K
@pk044
@Avasil monix/monix#955 i think this issue can be closed, unless you want something more to be implemented in terms of unfold?
Piotr Gawryś
@Avasil
Closed, thanks!