Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Alexandru Nedelcu
@alexandru
@ArturSoler hope I haven’t confused you - I have a tendency to give implementation details. Personally I think better if I get that.
Good night.
Radu Gancea
@radusw
I think many new Monix users are hitting this akka/akka streams interop issue and we should provide a comprehensive example to follow in the documentation. At least, until monix-nio and monix-netty :)
Artur Soler
@ArturSoler
@alexandru no, you didn't confuse me. i think better that way too
Artur Soler
@ArturSoler
is the lack of filter (and withFilter) on Task a design decision?
Edmund Noble
@edmundnoble
Yes.
This question needs to be in the FAQ.
withFilter makes no sense at all on Task. It would throw a meaningless exception if the filter didn't match, and using patterns in for comprehensions cannot be safe because even if the pattern is guaranteed to succeed, a call to withFilter is emitted.
And thus changing the type so that the pattern no longer succeeds doesn't result in a type error.
And then your code is broken and you don't know it until runtime.
Apologies, phone dying.
Artur Soler
@ArturSoler
np
it doesn't seem to compile if there's a type error with eg. lists, though
like
  for {
    (f, s) <- List((1, 2, 3), (4, 5, 6))
  } yield f + s
I agree that having the exception thrown inside of Task (or Future for that matter) isn't very nice. It's just that being able of pattern matching on for comprehensions can make code much more readeable
Artur Soler
@ArturSoler
I guess that idealistically the compiler shouldn't need to check at least a subset of all possible pattern matches with withFilter
Edmund Noble
@edmundnoble
I do agree. It's a flaw in the compiler.
It would be really nice to have.
I think separating out refutable from irrefutable patterns with some kind of keyword might help.
Edmund Noble
@edmundnoble
It is possible to check the types and determine that the match will always succeed. But for-comprehensions are removed from your source code long before type information is provided.
So someone would have to go in after the fact, recognize the irrefutable matches, and remove the withFilter calls.
Philippe Derome
@phderome
@clayrat yes I am interested, but do this for yourself and community not just me!! I might come back to look at some of your work in the summer. Job change and lot of vacation in April and May for me...
Edmund Noble
@edmundnoble
@ArturSoler I found the original thread; the compiler error only occurs if the types cannot be equal.
scala> def foo[A](xs: List[A]) = for { (a,b) <- xs } yield a

foo: [A](xs: List[A])List[Any]
Shun Yanaura
@yanana

Hi folks, I’m trying to use Monix in an Apache Spark’s workers to process IO-heavy jobs. But it seems impossible to use it, at least I have to implement AsyncSheduler by myself, I think. Spark’s SerializationDebuger complained as follows.

Serialization stack:
    - object not serializable (class: monix.execution.schedulers.TrampolineExecutionContext, value: monix.execution.schedulers.TrampolineExecutionContext@39faf275)
    - field (class: monix.execution.schedulers.ExecutorScheduler, name: monix$execution$schedulers$BatchingScheduler$$trampoline, type: class monix.execution.schedulers.TrampolineExecutionContext)
    - object (class monix.execution.schedulers.ExecutorScheduler$FromSimpleExecutor, monix.execution.schedulers.ExecutorScheduler$FromSimpleExecutor@2b19d6ea)

I think this was caused since the BatchingScheduler has non-serializable TrampolineExecutionContext variable, which is not injectable.

Sorry if my assumption was irrelevant. Is there any workaround for this?

Edmund Noble
@edmundnoble
You should not be in a situation which serializes and sends TrampolineExecutionContexts across the wire, from what I know.
Perhaps more helpfully: it doesn't make sense to serialize it at all, because it is a bunch of ThreadLocal state.
Hmmm. I don't think there's a serializable scheduler. Not sure if @alexandru knows more.
Shun Yanaura
@yanana
I know. But as AsyncScheduler extends BatchingScheduler, I have no idea currently.
Thanks for quick response @edmundnoble.
Edmund Noble
@edmundnoble
Welcome. I don't quite understand how Spark works; my understanding is that it shares implicits via serialization, but actually creating a fresh one on each machine is really what's called for.
Definitely not a Spark guru, unfortunately.
Perhaps it is time to define writeObject and readObject for BatchingScheduler?
Shun Yanaura
@yanana
In my understanding, some methods called transformation are serialized, and then transfered to nodes.
Edmund Noble
@edmundnoble
Yeah, we may want to just make a dummy serializer for BatchingScheduler which doesn't write any data for the serializing and just constructs a fresh instance for deserializing.
Shun Yanaura
@yanana
It sounds worth trying.
Edmund Noble
@edmundnoble
Absolutely. Maybe a good first PR? ;)
Shun Yanaura
@yanana
If I could inject TrampolineExecutionContext into BatchingScheduler, it might also make sense.
Thanks for all @edmundnoble. I’m going to try it. If it worked, I will create a PR :smile:
Edmund Noble
@edmundnoble
Yes, that sounds possible as well. Hope it works out :)
Artur Soler
@ArturSoler
given a hot observable with multiple subscribers, how would you make it so the subsystem (initialization + creation of the observable and subscribers) can be restarted by both the observable and the subscribers?
(if there is a way of signalling an error from a hot-observable subscriber to the observable that is reduced to the observable deciding to restart)
Alexandru Nedelcu
@alexandru

@ArturSoler error from where? From the data-source, you can use .onErrorRestartUnlimited or make up your own logic with .onErrorHandleWith, however this doesn’t work for shared / hot observables, if you think about it.

What is possible is to specify your restart logic before turning your observable into a shared / hot one:

source.onErrorRestartUnlimited.publish

Now, if you want your listeners to receive some sort of message that an error occured, then things get more complicated. My advice in that case is to not rely on onError at all, but rather to “materialize” your errors in the exposed type. E.g.

def myRestart[A](source: Observable[A]): Observable[Either[Throwable, A]] =
  source.map(x => Either.right(x)).onErrorRecoverWith { ex =>
    // First stream the error, then restart source
    Either.left(ex) +: source
  }

Your listeners will have to accept Either[Throwable, A] instead of just A. Which is fine, because you shouldn’t think of “restarting” your listeners, as that’s outside the scope of Monix’s observables.

Artur Soler
@ArturSoler
I hadn't though about this approach, seems interesting
In my case, though, source would be a Task[Observable[A]] instead of just an Observable[A]
The other missing part would be the signaling of error upstream (from subscriber to the hot-observable)
Alexandru Nedelcu
@alexandru
You’re creating a cycle in that communication though, it gets a little tricky when you do that.
So from your Task[Observable], you can go to a Task[Unit] lets say, which means that in case of error you restart the task
Artur Soler
@ArturSoler
I hadn't realized I can restart a Task
Alexandru Nedelcu
@alexandru
Yes, you can, wait a sec