Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Artur Krysiak
@venth
I’m not sure about multicasting. Did you mean fan-out scenario or fan-in?
Teiva Harsanyi
@teivah
fan-out
mmh, in your implementation, each call to next for example should be done in another goroutine in case where we have two subscriptions for example
otherwise the first observer would block the second one
you see what I mean?
Artur Krysiak
@venth
I see and in case of fan out
Like in for e.g. in rxjava the reponsibility would be on connect and publish operator. I don’t remember the exact implementation but connect would buffer sequence elements and publish for each subscription create output channel. Then each time an element would appear it will be passed to all created output channels
Artur Krysiak
@venth
I think that a specific operator could be a challenge like connect, publish pair or window. For me question is how to deal with error sequence. What is an error in golang? Imagine tha a right result of a function indicates error. What shall be emitted? Error with which argument? What about panic?
Teiva Harsanyi
@teivah

I don’t remember the exact implementation but connect would buffer sequence elements and publish for each subscription create output channel. Then each time an element would appear it will be passed to all created output channels

I understand. I believe, it's also time to think about back-pressure. Like having different strategies: buffer everything, drop messages not consumed on time etc.

Artur Krysiak
@venth
Perhaps we can tackle it by extending emitent interface with request operation. If a source would handle back pressure then each time request operation is called it would adapt and delegate call to the handled source
Imagine, that at first you use create operator that wraps a channel. Without back pressure, each time something is on channel you call emitter.next with an element
With back pressure , first when there is request called on the observable constructed by create operator, you fetch up to requested number of elements to an intermediate buffer. In a emission loop you read from the buffer instead of wrapped channel
Teiva Harsanyi
@teivah
I understand but for me the problem remains at the parent observable level. I don't really see how we could use channels to manage hot observables.
Let's say we have one hot observable and two observers
we may decide to publish each item in both channels
but it the publication in one channel may block the second one if the first observer is under pressure
we can decide to spawn the publication to a channel in a dedicated goroutine but this would break the guarantee sequentiality
before to even think about how to handle things at observer level I mean (even though I agree your solution could be appropriate)
Artur Krysiak
@venth
Hmm, if you have hot observable and you are not subscribed then nothing will be pushed to channel.
Ahh, I see, you mean that the channel blocks push till someone read from it
I’d say that now the buffer and buffer strategy comes into a play
so In case of hot observable a chosen strategy could be drop, if we cannot push into channel
Draft implementation would be probably more descriptive 😀
Teiva Harsanyi
@teivah

so In case of hot observable a chosen strategy could be drop, if we cannot push into channel

How do you check whether you can push or not into the channel?

you mean to wrap it in a kind of method handling mutexes to check whether the channel is free?
Artur Krysiak
@venth
I would use select method and check whether channel is ready
if it's ready it means that I can push at least one element
I believe
like: we can use select with a default clause to implement non-blocking sends, receives, and even non-blocking multi-way selects.
Artur Krysiak
@venth
Still an exploratory test implementation would be nice. I didn’t verify whether I can send to a channel and what happens if I send and nobody listens
Teiva Harsanyi
@teivah

Still an exploratory test implementation would be nice. I didn’t verify whether I can send to a channel and what happens if I send and nobody listens

It actually works

I mean if we take the link, it actually prints "no message sent" if there is no receiver at the time where we try to send a message to a channel
mmh interesting, I was not familiar with this non-blocking send
thanks :)
Teiva Harsanyi
@teivah
in any case, we need a way to publish items from the parent observable to something
either by publishing to a central channel and routing somehow to the observers
or by emitting elements directly to the observers
basically if there is two subscriptions, they call the first observer.onNext, wait for its completion then call the second observer.onNext
everything done synchronously
Artur Krysiak
@venth
If we would create a channel for each subscription (handled by publish operator) then we would achieve fan-out
Teiva Harsanyi
@teivah
yep and we could easily implement strategies like drop for example by just checking if the target channel is available or not
I'm trying to implement a first design (based on the branch implementing real cold observables)
Artur Krysiak
@venth
Awesome 😀
Teiva Harsanyi
@teivah
I feel like I'm starting to have something :)
obs := Just(1, 2, 3).Publish()
got1 := make([]interface{}, 0)

obs.Subscribe(handlers.NextFunc(func(i interface{}) {
    got1 = append(got1, i)
    time.Sleep(200 * time.Millisecond) // Pause the observer on purpose
}), options.WithDropBackpressureStrategy())
obs.Connect()

time.Sleep(500 * time.Millisecond) // Ugly wait just for the example
fmt.Printf("%v\n", got1)
=> Display 1
obs := Just(1, 2, 3).Publish()
got1 := make([]interface{}, 0)

obs.Subscribe(handlers.NextFunc(func(i interface{}) {
    got1 = append(got1, i)
    time.Sleep(200 * time.Millisecond) // Pause the observer on purpose
}), options.WithBufferBackpressureStrategy(3))
obs.Connect()

time.Sleep(500 * time.Millisecond) // Ugly wait just for the example
fmt.Printf("%v\n", got1)
=> Display 1, 2, 3
(branch hot_observable)
in the first example, the observer is not able to consume all the items because of the time.Sleep(200 * time.Millisecond) so it's just able to consume 1