Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Piotr Gawryś
@Avasil
You can also overwrite "default" Scheduler with executeOn to change where your own shifts go
Joseph Denman
@JosephDenman
@oleg-py Thanks! That's interesting. The result of parSequence_ is known so the main task continues, but all tasks may not have completed.
/cancelled
@oleg-py I'm writing a DSL that requires me to be able to run an arbitrary number of processes in parallel, where all other processes are cancelled if any one of them fails. I also need the evaluation of said processes to return when all of the processes have been cancelled/completed (basically sink semantics). Is there a combinator in the monix library that would do what I've described?
Oleg Pyzhcov
@oleg-py

@JosDenmark that is parSequence. It's just that cancellation is by nature non-deterministic, concurrent to actual task's execution, and depends on how you construct your tasks. E.g.

Task { ... } is either ran or not ran, and it's never stopped in the middle.

Task { ... } >> Task {...} can be stopped in the >> bit, but will almost certainly not unless you use a scheduler with AlwaysAsyncExecution model or change it to Task { .. } >> Task.cancelBoundary >> Task { .. }.
Task.async { ... } >> Task { ... } can be cancelled in the middle of async call, or before it gets to the 2nd task, but not in the middle of 2nd task. However, the cancellation will not undo whatever you did in the body of async. Its variant, Task.cancelable lets you specify your own logic for cancellation, but you can't block in its body.

Piotr Gawryś
@Avasil
AFAIK you can't do it using provided cancelation model for the reasons mentioned by Oleg. I had a use case for it once and I ended up doing manual flag: https://github.com/VirtusLab/akka-workshop-client/blob/cats-effect-complete-step5/src/main/scala/client/Decrypting.scala
Oleg Pyzhcov
@oleg-py
@Avasil why not IO.cancelBoundary? It makes sense if you want to recover from it later, as you can't recover from being cancelled, but I don't see why it's hardline required.
Piotr Gawryś
@Avasil
It was weird imaginary use case for a workshop. There was a Decryptor that could decrypt N items in parallel but it was sometimes failing and silently corrupting everything else in progress. I had to wait until it's actually done. Otherwise old corrupted "batch" would also corrupt the fresh one and I wouldn't even know. IO.cancelBoundary wasn't enough here and I wanted 100% correctness to beat Akka and Kotlin's implementations. :D
Matthew de Detrich
@mdedetrich
@Avasil I am having a look at the latest Monix wrt to the changes of Future isolation right now
Sorry for the delay
Bromel777
@Bromel777
Hello! Can anyone suggest a resource for a set of tasks for Monix (like in http://scala-exercises.org), if it exists?
YulawOne
@YulawOne
I'm looking for a way to implement tracking for blocking operations like vert.x do it (https://vertx.io/docs/vertx-core/java/#golden_rule) and my first thoughts are to do it in a similar way as tracing scheduler done, to wrap execution in Runnable that store execution start time (plus thread ref) and additionally run async thread which will check those start time and report if time exceeds threshold. What do you think about it?
Oleg Pyzhcov
@oleg-py
@YulawOne sounds like something easy to do with Task - race a blocking task against a one that does reporting after a certain threshold and then never terminates.
YulawOne
@YulawOne
@oleg-py That would be in place solution when you know where you could have a block, but I want to track all arbitrary executions associated with a scheduler, to prevent unwanted code blocking
Piotr Gawryś
@Avasil
@Bromel777 I'm not aware of anything like it. Apart from the documentation (including scaladocs), you can look at cats.effect.IO documentation (there is even a tutorial), there are a lot of similarities with Task. For Observable there is quite a bit of common ground with reactiveX. If you're looking to practice anything specific, maybe I could come up with something
Shane
@Shailpat3Shane_twitter
How can I do this wait for Observer to Subscribe before PublishSubject publish, So that I don't lose any message ( can I do this avoiding ReplaySubject)
Oleg Pyzhcov
@oleg-py
@Shailpat3Shane_twitter what should happen if I publish before subscription, then?
Shane
@Shailpat3Shane_twitter
@oleg-py The message should stay may be in the queue I am not sure what's the best but don't want to lose the message Whats happening some times I am losing messages if I subscribe later, Thank you
Oleg Pyzhcov
@oleg-py
Then ReplaySubject is what you want, no?
Shane
@Shailpat3Shane_twitter
@oleg-py Yes but it will keep in buffer everything we will need a large buffer and Once we get observer subscribed we don't need the buffer
Is there any other way I can achieve
rms264
@rms264

When using doOnFinish is it better to do your resource cleanup inside another task or do the cleanup and return Task.unit?

.doOnFinish{_ => 
   file.close
  Task.unit
}

.doOnFinish(_ => Task(file.close)}

or does it matter?

Oleg Pyzhcov
@oleg-py
It's "better" to use Task.bracket or Resource and do cleanups in another task, not outside.
rms264
@rms264
Did not know about bracket. I implemented it that way. Thanks for your help
etienne
@crakjie
is it normal that Observable.complete return Observable[Nothing] and not Observable[Unit] ?
Ryan Peters
@sloshy
Nothing is a subtype of every other type, so it can be used everywhere. It lets it typecheck.
Observable[Unit] would actually be able to emit elements - ()
So if you have an Observable[Nothing] that means effectively it is empty.
@crakjie here's a simple example using List you can try yourself:
scala> val a = List.empty[Nothing]
a: List[Nothing] = List()

scala> val b = List(1, 2, 3)
b: List[Int] = List(1, 2, 3)

scala> a ++ b
res0: List[Int] = List(1, 2, 3)

scala> val c = List.empty[Unit]
c: List[Unit] = List()

scala> a ++ b ++ c
res1: List[AnyVal] = List(1, 2, 3)
Note how the last list has basically lost its type information, whereas the common supertype for a ++ b is actually still List[Int]
Ryan Peters
@sloshy
You'll note that no other type extends Nothing, so if you want a List[Nothing] or an Observable[Nothing] you cannot actually have any elements in it because they cannot exist in the scala type system.
Whereas there is exactly one possible Unit element, and Unit cannot take the place of concrete types.
So Nothing is a very good type to use for something that contains nothing, or emits nothing, so you can still use it in cases with more specific types, whereas Unit typically (but not always) means some side-effect is being performed.
Nathaniel Fischer
@kag0
Is there any more documentation for Local? Especially how it could be used with other effect monads like cats IO as discussed here monix/monix#880
etienne
@crakjie

@sloshy I agree with that but the problem is due to covarience Observable[Nothing] <: Observable[Int] wich is a problem when you relie on the compiler to detect errors
exemple

def fun() : Observable[Int] = {
  Observable(1,2,3,4).completed
}

val consumer : Consumer[Int] = ...
fun().consumeWith(consumer).runToFuture

Here the consumer will never see any number wich can be quite confusing when looking at return types.

Piotr Gawryś
@Avasil

@kag0 Not much. There were a lot of changes recently and we're spending most of our efforts to push for 3.0.0 release. I have plans to write more docs/blog post with example repo but I won't find time to do it before October. Feel free to ask questions, at the very least I can copy answers to the docs

As long as you use TracingScheduler it should work for Future but you will have to call Local.isolate { f } for the ones that should be completely isolated from the rest. In the current model we pretty much always share locals unless we tell otherwise. We can do it withisolate, bind etc. which is also called in Task during running it so it works in many cases without any additional effort. We're probably going to stick with it for 3.0.0, gather feedback and reevaluate it for 3.x

I don't know about IO/ZIO, I don't expect it to fully work without any extra support.

@crakjie The behavior is expected (it's mentioned in the scaladoc). I guess it can be confusing but I'd say it's like Observable.empty[Int] or List.empty[Int]
Nathaniel Fischer
@kag0
@Avasil so I'm trying it out, does this look right?
implicit val sched = TracingScheduler(ExecutionContext.global)
val local = Local(0)

val f1 = for {
  _ <- Future.successful("shift")
  _ <- Future.successful{
    println(Thread.currentThread.getName + s" 1- ${local()}")
    assert(local() == 0)
    local := 50
  }
  _ <- Future.successful{
    println(Thread.currentThread.getName + s" 1- ${local()}")
    assert(local() == 50)
  }
  _ <- Local.isolate(Future.successful{
    println(Thread.currentThread.getName + s" 1- ${local()}")
    assert(local() == 50)
    local := 100
  })
  _ <- Future.successful{
    println(Thread.currentThread.getName + s" 1- ${local()}")
    assert(local() == 50)
  }
} yield ()

val f2 = Local.isolate(for {
  _ <- Future.successful("shift")
  _ <- Future.successful{
    println(Thread.currentThread.getName + s" 2- ${local()}")
    assert(local() == 0)
    local := 5
  }
  _ <- Future.successful{
    println(Thread.currentThread.getName + s" 2- ${local()}")
    assert(local() == 5)
  }
  _ <- Local.isolate(Future.successful{
    println(Thread.currentThread.getName + s" 2- ${local()}")
    assert(local() == 5)
    local := 10
  })
  _ <- Future.successful{
    println(Thread.currentThread.getName + s" 2- ${local()}")
    assert(local() == 5)
  }

} yield ())

Await.result(f1, Duration.Inf)
Await.result(f2, Duration.Inf)
Piotr Gawryś
@Avasil
yeah
is that what you expect?
Nathaniel Fischer
@kag0
I think I find myself expecting the outer Local.isolate to be implied. But it makes sense when I think about it, and that could be invoked as part of a framework or something and logic code wouldn't need to worry about it.
I'm pleasantly surprised it works so cleanly.
Swapping the futures for IOs does not seem to work so well. I was hoping a IO.contextShift(TracingScheduler(ExecutionContext.global)) would do it but it seems like not.
Piotr Gawryś
@Avasil

Local.isolate won't work for lazy structures, Task uses:

    Task {
      val current = Local.getContext()
      Local.setContext(current.mkIsolated)
      current
    }.bracket(_ => task)(backup => Task(Local.setContext(backup)))

it might help a bit but there is also some logic encoded in Task run-loop itself

automatically isolating outer Future would be awesome but challenging to implement (we have to somehow detect that it is the first Future). Maybe one day x)
Nathaniel Fischer
@kag0
So how would one rewrite the above test to use Tasks rather than Futures?
Is it as simple as swapping all the Locals for TaskLocals? Because that wouldn't be too bad to start with Future and migrate later.
Piotr Gawryś
@Avasil
Yes, and if you call the Task separately, let's say runToFuture instead of Task.parZipthen you get outer isolate for free
in many cases it's probably all you need if you want to set sth like correlationId
Nathaniel Fischer
@kag0
Yes, that's exactly what I need to set 🙄 There are very few other cross cutting concerns I'd want to put in something like a Local, but when you need it nothing's better.
I'm interested to watch how it evolves in 3.x. It would be great if local context propagation made its way into cats, but I don't know how likely that really is.
Piotr Gawryś
@Avasil
I feel like many proficient cats users/maintainers prefer to use Reader / ApplicativeLocal for tracing so there is not enough demand to incorporate it into c.e.IO directly. I think @oleg-py who came up with our current Local model was thinking about supporting IO / F[_] without any support there so who knows, maybe it will happen one day
Nathaniel Fischer
@kag0
That makes sense. For me (not a super heavy cats user) I'd rather log a warning if a correlation id is missing on the thread than add a new type everywhere. Tracing just doesn't warrant the rigor.