These are chat archives for ReactiveX/RxJava

13th
Mar 2016
Alex Reisberg
@a-reisberg
Mar 13 2016 15:10
Quick question about SerializedSubject. Suppose I have a serialized subject subject, and I subscribe to it subject.subscribe(some action f), then on which thread is f executed?
Dorus
@Dorus
Mar 13 2016 15:11
The thread that calls subject.onNext().
Alex Reisberg
@a-reisberg
Mar 13 2016 15:11
I experimented with it, tried to do onNext from different threads, but as long as one of the onNext's is on the main thread, all the subscriptions are on main
Dorus
@Dorus
Mar 13 2016 15:13
if you want to select a different thread for calls onf, you can use subject.observeOn(some scheduler).subscribe(some action f).
Alex Reisberg
@a-reisberg
Mar 13 2016 15:14
I understand, but I want to know what the contract is
Dorus
@Dorus
Mar 13 2016 15:14
What contract do you mean?
The contract only dictate that onNext calls (and completion calls like onError and onCompleted) are serialized.
For all it cares, every call can be on a different thread.
Alex Reisberg
@a-reisberg
Mar 13 2016 15:15
so in my example, so if I call subject.onNext() on different thread, as long as one of those threads is main then everything seems to be called on the main thread
Dorus
@Dorus
Mar 13 2016 15:15
Probably SerializedSubject does some magic here.
Alex Reisberg
@a-reisberg
Mar 13 2016 15:15
yeah, exactly
and I'm trying to figure out what the magic is precisely
I can make a few guesses.
1). Rx never cares about what thread does what. So any thread can make those calls, all SerializedSubject is serialize them.
2). Rx prefers not to block, so probably it schedules calls when another one is already in flight.
3). Rx doesn't introduce new threads by default unless you tell it to do so trough a scheduler. So probably it reuses the first thread that has been calling onNext if another one arrives while the previous one is still going.
Alex Reisberg
@a-reisberg
Mar 13 2016 15:19
so in my example, I have an Observable created on some new thread, and the subscribe a SerializedSubject to it
Dorus
@Dorus
Mar 13 2016 15:19
lets take a peak at the code about what it really does. But word of advice, non of that is in the contract, so dont depend on it!
Alex Reisberg
@a-reisberg
Mar 13 2016 15:19
I see
and then I have subject.onNext() on the main thread
when I do subject.subscribe, everything is running on main
Dorus
@Dorus
Mar 13 2016 15:20
Very well possible.
Alex Reisberg
@a-reisberg
Mar 13 2016 15:20
but that's weird right?
why does it pick main?
Dorus
@Dorus
Mar 13 2016 15:21
Why? Main calls subject.onNext(). And subject.onNext() calls f.
Alex Reisberg
@a-reisberg
Mar 13 2016 15:21
but the first observable is on a different thread
Dorus
@Dorus
Mar 13 2016 15:21
The code that runs subject.subscribe only hooks f to subject.onNext().
Alex Reisberg
@a-reisberg
Mar 13 2016 15:22
so the first guy is like observable.subscribe(subject)
Dorus
@Dorus
Mar 13 2016 15:22
That thread immediately returns.
Alex Reisberg
@a-reisberg
Mar 13 2016 15:22
and the observable is on a different thread
Dorus
@Dorus
Mar 13 2016 15:22
It's not kept hostage so that it can execute calls to onNext when main calls them.
Alex Reisberg
@a-reisberg
Mar 13 2016 15:23
so suppose I don't run onNext on main and just have observable.subscribe(subject), then it does run on the thread given by the observable
Dorus
@Dorus
Mar 13 2016 15:23
If Rx did that, any thread that calls subscribe would be lost untill the subscription end. And since it wont return you cant even call unsubscribe. Yuk.
If you never call onNext, the subscription wont really do anything at all ;)
Alex Reisberg
@a-reisberg
Mar 13 2016 15:24
observable.subscribe(subject) calls subject.onNext
and feeds in the stuff of observable
Dorus
@Dorus
Mar 13 2016 15:24
Ofcourse, there is a different situation that you are probably confusing.
The thread uses to call subscribe on normal observables, might also be used to call the onSubscribe action. And thus run the onNext calls.
Alex Reisberg
@a-reisberg
Mar 13 2016 15:25
sure!
Dorus
@Dorus
Mar 13 2016 15:25
Observables like Observable.Range(0,4) will emit all items right away.
And thus they can use the subscribing thread.
Alex Reisberg
@a-reisberg
Mar 13 2016 15:25
I don't understand what you're trying to say
Dorus
@Dorus
Mar 13 2016 15:25
an subject cant really do that can it?
Alex Reisberg
@a-reisberg
Mar 13 2016 15:25
so I have the following: observable = Observable.just(1, 2).subscribeOn(Schedulers.computation())
I also have a subject lying around, so I do subject.subscribe(some action)
Dorus
@Dorus
Mar 13 2016 15:26
In that case, the subscribing thread will run into subscribeOn, stop, and schedule just(1, 2) on the computation scheduler.
Alex Reisberg
@a-reisberg
Mar 13 2016 15:26
then I do observable.subscribe(subject)
then, the some action that I subscribed on my subject is called on the computaton scheduler
great!
now, suppose right after observation.subscribe(subject)
I have subject.onNext(100) in the main thread
then now, everything is running in the main thread
including the 1, 2 above
Dorus
@Dorus
Mar 13 2016 15:28
Are you sure about 1,2? That surprises me.
Alex Reisberg
@a-reisberg
Mar 13 2016 15:28
it surprised me too!!!
that's why I'm here
should I paste the code here?
Dorus
@Dorus
Mar 13 2016 15:29
sure, if it's not too long. Else a gist.
Anyway, all of that is totally allowed.
Alex Reisberg
@a-reisberg
Mar 13 2016 15:29
it's in scala. here you go
object Launcher extends App {
  val subject = new SerializedSubject(PublishSubject.create[Int])
  val observable = Observable.just(1, 2).subscribeOn(Schedulers.computation())

  subject.subscribe(new Action1[Int] {
    override def call(t: Int): Unit =
      println(s"Subject subscribeOn $t, ${Thread.currentThread().getName}")
  })

  observable.subscribe(subject)

  Observable.just(6).subscribe(subject)

  while (true) {}
}
Dorus
@Dorus
Mar 13 2016 15:32
What's the output?
Alex Reisberg
@a-reisberg
Mar 13 2016 15:32
everything running on main
Subject subscribeOn 6, main
Subject subscribeOn 1, main
Subject subscribeOn 2, main
now, the more wacky part is that if I switch the order:
  Observable.just(6).subscribe(subject)
  observable.subscribe(subject)
then, only 6 gets printed out
1 and 2 don't even show up
Dorus
@Dorus
Mar 13 2016 15:34
Because just(6) emits onCompleted i think
Alex Reisberg
@a-reisberg
Mar 13 2016 15:34
ah, that makes sense
but the main thing is still confusing
wait, but that's still weird, since the first subscribe is also from a just, but maybe the oncomplete doesn't have time to run yet
Dorus
@Dorus
Mar 13 2016 15:41
yeah, serializedsubject probably tries to put all onNext msgs trough before it calls onCompleted.
Thats probably why everything runs on main. Main is the first thing that hits the publishSubject.
What if you subscribe to the PublishSubjectinstead?
Alex Reisberg
@a-reisberg
Mar 13 2016 15:43
why doesn't it put everything on the computation thread instead?
Dorus
@Dorus
Mar 13 2016 15:43
object Launcher extends App {
  val pubSub = PublishSubject.create[Int];
  val serSub = new SerializedSubject(pubSub)
  val observable = Observable.just(1, 2).subscribeOn(Schedulers.computation())

  serSub.subscribe(new Action1[Int] {
    override def call(t: Int): Unit =
      println(s"Subject subscribeOn $t, ${Thread.currentThread().getName}")
  })

  observable.subscribe(pubSub)

  Observable.just(6).subscribe(pubSub)

  while (true) {}
}
Alex Reisberg
@a-reisberg
Mar 13 2016 15:44
then it behaves as expected
run on different threads
Dorus
@Dorus
Mar 13 2016 15:45
aaah
Alex Reisberg
@a-reisberg
Mar 13 2016 15:45
that makes sense right? it's just that there's some magic of SerializedSubject that confuses me
Dorus
@Dorus
Mar 13 2016 15:46
I think that's how you need to use SerializedSubject. The serSub is for subscribing and the pubSub for calls to onNext.
Alex Reisberg
@a-reisberg
Mar 13 2016 15:47
oh, I misread your code, let me rerun it
yeah, that's correct
weird!
why isn't there a place that wrote this lol
thanks man. that's great! :)
Dorus
@Dorus
Mar 13 2016 15:52
i'm not sure. Subjects are confusing. I usually tend to avoid them.
Even in this case i'm still in doubt if it's a bug or if we're doing something wrong.
Alex Reisberg
@a-reisberg
Mar 13 2016 15:53
:/
so what I'm trying to achieve is to create an eventbus
using Rx
what's the alternative?
Dorus
@Dorus
Mar 13 2016 17:41
@a-reisberg i'm back. For one thing i would only expose the observer or the observable. Preferably have a static entry point for your events and if you really do need to expose both, expose one half as an observer and another as observable.
You really do not want an subject that's reachable all over your code, that would be a huge global state and quickly degenerate into spaghetti code.