Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Teiva Harsanyi
@teivah
I need also to integrate what we have in observer_mock.go in the assertion API (ReactiveX/RxGo#109) and makes something more generic
Teiva Harsanyi
@teivah
btw, @avelino what about posting a message in RxGo (official)/Lobby to inform people to come here?
if they did not follow the Github issue
Avelino
@avelino
send msg on channel @teivah pls
Teiva Harsanyi
@teivah
done ;)
I'm not a ReactiveX member so I don't known if people are going to listen to me but ok :)
Avelino
@avelino
thanks, I change channel title
Teiva Harsanyi
@teivah
Artur Krysiak
@venth
@teivah
For the double subscription. In case of v2 second subscription would receive same elements of just sequence as the first?
Teiva Harsanyi
@teivah
well, to be discussed maybe but in RxJava for example if you create an observable from an array, the second subscription is going to receive the same elements as the first one
here we just lose the messages as it is sent in a channel and retrieved during the first subscription
(and there's no way to retry it for the time being)
Artur Krysiak
@venth
If we will create function create that would receive function as argument then we could overcome the channel issue
this function would get emitter as the argument and will be responsible for emitting sequence elements
Teiva Harsanyi
@teivah
you mean as an end-user or as a maintainer of RxGo?
Artur Krysiak
@venth
Either. Such a function ease implementation of plenty of operators
Like empty - just emit complete
Teiva Harsanyi
@teivah
yes agreed
Artur Krysiak
@venth
error emit error
abd so forth
Teiva Harsanyi
@teivah
but I'm just pointing out that the current behavior is not similar to what we have in the other Rx implementations
Artur Krysiak
@venth
That’s right. Current behavior doesn’t comply to other implementations
Teiva Harsanyi
@teivah
I know the philosophy is to be based as most as possible on Idiomatic Go code
yet, for this very use-case I don't really get why we should have something different in RxGo
just my opinion obviously
Artur Krysiak
@venth
I come along with you.
Teiva Harsanyi
@teivah
and obviously, if we change that in the v2, it has to be highlighted to make sure end-users understand that the behavior is different
Artur Krysiak
@venth
+1
Teiva Harsanyi
@teivah
We need to start thinking about how to handle hot observable: values are emitted regardless whether there is an actual subscription, multicasting (one item caught by several observer)
we need something to manage a kind of publish/subscribe
either by implementing something from scratch, or reusing a proven library for such use case
has anyone an idea about something which already exists for this?
Artur Krysiak
@venth
Hot observable can be implemented as follows: get create operator, pass closure with an emitter and subscription as arguments. If subscription is active (anything subscribed) use next, error etc. otherwise just pull an element and forget. Nothing is blocking. Hot source is working
I’m not sure about multicasting. Did you mean fan-out scenario or fan-in?
Teiva Harsanyi
@teivah
fan-out
mmh, in your implementation, each call to next for example should be done in another goroutine in case where we have two subscriptions for example
otherwise the first observer would block the second one
you see what I mean?
Artur Krysiak
@venth
I see and in case of fan out
Like in for e.g. in rxjava the reponsibility would be on connect and publish operator. I don’t remember the exact implementation but connect would buffer sequence elements and publish for each subscription create output channel. Then each time an element would appear it will be passed to all created output channels
Artur Krysiak
@venth
I think that a specific operator could be a challenge like connect, publish pair or window. For me question is how to deal with error sequence. What is an error in golang? Imagine tha a right result of a function indicates error. What shall be emitted? Error with which argument? What about panic?
Teiva Harsanyi
@teivah

I don’t remember the exact implementation but connect would buffer sequence elements and publish for each subscription create output channel. Then each time an element would appear it will be passed to all created output channels

I understand. I believe, it's also time to think about back-pressure. Like having different strategies: buffer everything, drop messages not consumed on time etc.

Artur Krysiak
@venth
Perhaps we can tackle it by extending emitent interface with request operation. If a source would handle back pressure then each time request operation is called it would adapt and delegate call to the handled source
Imagine, that at first you use create operator that wraps a channel. Without back pressure, each time something is on channel you call emitter.next with an element
With back pressure , first when there is request called on the observable constructed by create operator, you fetch up to requested number of elements to an intermediate buffer. In a emission loop you read from the buffer instead of wrapped channel
Teiva Harsanyi
@teivah
I understand but for me the problem remains at the parent observable level. I don't really see how we could use channels to manage hot observables.
Let's say we have one hot observable and two observers
we may decide to publish each item in both channels