Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Joseph Denman
@JosephDenman

The above program prints

before-main
before-task-1
before-task-2
after-main
body-task-1

to console

I thought it was odd that this program schedules Task.delay(println("body-task-1")) to execute after parSequence_ finishes, not before.
given that parSequence_ is advertised to complete when its sub-tasks complete.
I have two questions:
1) Is this behavior expected?
2) AFAIK, Task can perform internal context shifts arbitrarily. Does the call to contextShift above behave like internal context shifts, i.e., are internal context shifts implemented with bracket?
Oleg Pyzhcov
@oleg-py

@JosDenmark yes, this is expected. task2 is failed with error, so task1 is cancelled and then the result of parSequence is known (failure).

Except I lied and there's no "and then" relationship, it is all happening concurrently. parSequence doesn't wait for other tasks to finally be cancelled, and cancellation is not immediate, it happens concurrently with task1 which is already being executed - and it's very much possible that task1 completes before it's able to react on the cancellation signal, and the continuation of parSequence_.attempt - printing after-main - is also happening concurrently to all that. This has nothing to do with context shifts.

"Internal shifts" is not related to the bracketing nor needs it. The Task is running on a thread pool represented by a Scheduler, and if you have several thousands of tasks running on e.g. 8 threads, every several flatmaps (configurable) it's going to stop and schedule it on the same Scheduler it was running on, letting other Tasks to run until they either finish or get "rescheduled" too. There's nowhere to shift "back" to, it's still the same pool. That is different from evalOn, where you have two pools and evalOn does not happen internally.

Piotr Gawryś
@Avasil
You can also overwrite "default" Scheduler with executeOn to change where your own shifts go
Joseph Denman
@JosephDenman
@oleg-py Thanks! That's interesting. The result of parSequence_ is known so the main task continues, but all tasks may not have completed.
/cancelled
@oleg-py I'm writing a DSL that requires me to be able to run an arbitrary number of processes in parallel, where all other processes are cancelled if any one of them fails. I also need the evaluation of said processes to return when all of the processes have been cancelled/completed (basically sink semantics). Is there a combinator in the monix library that would do what I've described?
Oleg Pyzhcov
@oleg-py

@JosDenmark that is parSequence. It's just that cancellation is by nature non-deterministic, concurrent to actual task's execution, and depends on how you construct your tasks. E.g.

Task { ... } is either ran or not ran, and it's never stopped in the middle.

Task { ... } >> Task {...} can be stopped in the >> bit, but will almost certainly not unless you use a scheduler with AlwaysAsyncExecution model or change it to Task { .. } >> Task.cancelBoundary >> Task { .. }.
Task.async { ... } >> Task { ... } can be cancelled in the middle of async call, or before it gets to the 2nd task, but not in the middle of 2nd task. However, the cancellation will not undo whatever you did in the body of async. Its variant, Task.cancelable lets you specify your own logic for cancellation, but you can't block in its body.

Piotr Gawryś
@Avasil
AFAIK you can't do it using provided cancelation model for the reasons mentioned by Oleg. I had a use case for it once and I ended up doing manual flag: https://github.com/VirtusLab/akka-workshop-client/blob/cats-effect-complete-step5/src/main/scala/client/Decrypting.scala
Oleg Pyzhcov
@oleg-py
@Avasil why not IO.cancelBoundary? It makes sense if you want to recover from it later, as you can't recover from being cancelled, but I don't see why it's hardline required.
Piotr Gawryś
@Avasil
It was weird imaginary use case for a workshop. There was a Decryptor that could decrypt N items in parallel but it was sometimes failing and silently corrupting everything else in progress. I had to wait until it's actually done. Otherwise old corrupted "batch" would also corrupt the fresh one and I wouldn't even know. IO.cancelBoundary wasn't enough here and I wanted 100% correctness to beat Akka and Kotlin's implementations. :D
Matthew de Detrich
@mdedetrich
@Avasil I am having a look at the latest Monix wrt to the changes of Future isolation right now
Sorry for the delay
Bromel777
@Bromel777
Hello! Can anyone suggest a resource for a set of tasks for Monix (like in http://scala-exercises.org), if it exists?
YulawOne
@YulawOne
I'm looking for a way to implement tracking for blocking operations like vert.x do it (https://vertx.io/docs/vertx-core/java/#golden_rule) and my first thoughts are to do it in a similar way as tracing scheduler done, to wrap execution in Runnable that store execution start time (plus thread ref) and additionally run async thread which will check those start time and report if time exceeds threshold. What do you think about it?
Oleg Pyzhcov
@oleg-py
@YulawOne sounds like something easy to do with Task - race a blocking task against a one that does reporting after a certain threshold and then never terminates.
YulawOne
@YulawOne
@oleg-py That would be in place solution when you know where you could have a block, but I want to track all arbitrary executions associated with a scheduler, to prevent unwanted code blocking
Piotr Gawryś
@Avasil
@Bromel777 I'm not aware of anything like it. Apart from the documentation (including scaladocs), you can look at cats.effect.IO documentation (there is even a tutorial), there are a lot of similarities with Task. For Observable there is quite a bit of common ground with reactiveX. If you're looking to practice anything specific, maybe I could come up with something
Shane
@Shailpat3Shane_twitter
How can I do this wait for Observer to Subscribe before PublishSubject publish, So that I don't lose any message ( can I do this avoiding ReplaySubject)
Oleg Pyzhcov
@oleg-py
@Shailpat3Shane_twitter what should happen if I publish before subscription, then?
Shane
@Shailpat3Shane_twitter
@oleg-py The message should stay may be in the queue I am not sure what's the best but don't want to lose the message Whats happening some times I am losing messages if I subscribe later, Thank you
Oleg Pyzhcov
@oleg-py
Then ReplaySubject is what you want, no?
Shane
@Shailpat3Shane_twitter
@oleg-py Yes but it will keep in buffer everything we will need a large buffer and Once we get observer subscribed we don't need the buffer
Is there any other way I can achieve
rms264
@rms264

When using doOnFinish is it better to do your resource cleanup inside another task or do the cleanup and return Task.unit?

.doOnFinish{_ => 
   file.close
  Task.unit
}

.doOnFinish(_ => Task(file.close)}

or does it matter?

Oleg Pyzhcov
@oleg-py
It's "better" to use Task.bracket or Resource and do cleanups in another task, not outside.
rms264
@rms264
Did not know about bracket. I implemented it that way. Thanks for your help
etienne
@crakjie
is it normal that Observable.complete return Observable[Nothing] and not Observable[Unit] ?
Ryan Peters
@sloshy
Nothing is a subtype of every other type, so it can be used everywhere. It lets it typecheck.
Observable[Unit] would actually be able to emit elements - ()
So if you have an Observable[Nothing] that means effectively it is empty.
@crakjie here's a simple example using List you can try yourself:
scala> val a = List.empty[Nothing]
a: List[Nothing] = List()

scala> val b = List(1, 2, 3)
b: List[Int] = List(1, 2, 3)

scala> a ++ b
res0: List[Int] = List(1, 2, 3)

scala> val c = List.empty[Unit]
c: List[Unit] = List()

scala> a ++ b ++ c
res1: List[AnyVal] = List(1, 2, 3)
Note how the last list has basically lost its type information, whereas the common supertype for a ++ b is actually still List[Int]
Ryan Peters
@sloshy
You'll note that no other type extends Nothing, so if you want a List[Nothing] or an Observable[Nothing] you cannot actually have any elements in it because they cannot exist in the scala type system.
Whereas there is exactly one possible Unit element, and Unit cannot take the place of concrete types.
So Nothing is a very good type to use for something that contains nothing, or emits nothing, so you can still use it in cases with more specific types, whereas Unit typically (but not always) means some side-effect is being performed.
Nathaniel Fischer
@kag0
Is there any more documentation for Local? Especially how it could be used with other effect monads like cats IO as discussed here monix/monix#880
etienne
@crakjie

@sloshy I agree with that but the problem is due to covarience Observable[Nothing] <: Observable[Int] wich is a problem when you relie on the compiler to detect errors
exemple

def fun() : Observable[Int] = {
  Observable(1,2,3,4).completed
}

val consumer : Consumer[Int] = ...
fun().consumeWith(consumer).runToFuture

Here the consumer will never see any number wich can be quite confusing when looking at return types.

Piotr Gawryś
@Avasil

@kag0 Not much. There were a lot of changes recently and we're spending most of our efforts to push for 3.0.0 release. I have plans to write more docs/blog post with example repo but I won't find time to do it before October. Feel free to ask questions, at the very least I can copy answers to the docs

As long as you use TracingScheduler it should work for Future but you will have to call Local.isolate { f } for the ones that should be completely isolated from the rest. In the current model we pretty much always share locals unless we tell otherwise. We can do it withisolate, bind etc. which is also called in Task during running it so it works in many cases without any additional effort. We're probably going to stick with it for 3.0.0, gather feedback and reevaluate it for 3.x

I don't know about IO/ZIO, I don't expect it to fully work without any extra support.

@crakjie The behavior is expected (it's mentioned in the scaladoc). I guess it can be confusing but I'd say it's like Observable.empty[Int] or List.empty[Int]
Nathaniel Fischer
@kag0
@Avasil so I'm trying it out, does this look right?
implicit val sched = TracingScheduler(ExecutionContext.global)
val local = Local(0)

val f1 = for {
  _ <- Future.successful("shift")
  _ <- Future.successful{
    println(Thread.currentThread.getName + s" 1- ${local()}")
    assert(local() == 0)
    local := 50
  }
  _ <- Future.successful{
    println(Thread.currentThread.getName + s" 1- ${local()}")
    assert(local() == 50)
  }
  _ <- Local.isolate(Future.successful{
    println(Thread.currentThread.getName + s" 1- ${local()}")
    assert(local() == 50)
    local := 100
  })
  _ <- Future.successful{
    println(Thread.currentThread.getName + s" 1- ${local()}")
    assert(local() == 50)
  }
} yield ()

val f2 = Local.isolate(for {
  _ <- Future.successful("shift")
  _ <- Future.successful{
    println(Thread.currentThread.getName + s" 2- ${local()}")
    assert(local() == 0)
    local := 5
  }
  _ <- Future.successful{
    println(Thread.currentThread.getName + s" 2- ${local()}")
    assert(local() == 5)
  }
  _ <- Local.isolate(Future.successful{
    println(Thread.currentThread.getName + s" 2- ${local()}")
    assert(local() == 5)
    local := 10
  })
  _ <- Future.successful{
    println(Thread.currentThread.getName + s" 2- ${local()}")
    assert(local() == 5)
  }

} yield ())

Await.result(f1, Duration.Inf)
Await.result(f2, Duration.Inf)
Piotr Gawryś
@Avasil
yeah
is that what you expect?
Nathaniel Fischer
@kag0
I think I find myself expecting the outer Local.isolate to be implied. But it makes sense when I think about it, and that could be invoked as part of a framework or something and logic code wouldn't need to worry about it.
I'm pleasantly surprised it works so cleanly.
Swapping the futures for IOs does not seem to work so well. I was hoping a IO.contextShift(TracingScheduler(ExecutionContext.global)) would do it but it seems like not.
Piotr Gawryś
@Avasil

Local.isolate won't work for lazy structures, Task uses:

    Task {
      val current = Local.getContext()
      Local.setContext(current.mkIsolated)
      current
    }.bracket(_ => task)(backup => Task(Local.setContext(backup)))

it might help a bit but there is also some logic encoded in Task run-loop itself

automatically isolating outer Future would be awesome but challenging to implement (we have to somehow detect that it is the first Future). Maybe one day x)
Nathaniel Fischer
@kag0
So how would one rewrite the above test to use Tasks rather than Futures?
Is it as simple as swapping all the Locals for TaskLocals? Because that wouldn't be too bad to start with Future and migrate later.