Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Artur Krysiak
@venth
Like in for e.g. in rxjava the reponsibility would be on connect and publish operator. I don’t remember the exact implementation but connect would buffer sequence elements and publish for each subscription create output channel. Then each time an element would appear it will be passed to all created output channels
I think that a specific operator could be a challenge like connect, publish pair or window. For me question is how to deal with error sequence. What is an error in golang? Imagine tha a right result of a function indicates error. What shall be emitted? Error with which argument? What about panic?
Teiva Harsanyi
@teivah

I don’t remember the exact implementation but connect would buffer sequence elements and publish for each subscription create output channel. Then each time an element would appear it will be passed to all created output channels

I understand. I believe, it's also time to think about back-pressure. Like having different strategies: buffer everything, drop messages not consumed on time etc.

Artur Krysiak
@venth
Perhaps we can tackle it by extending emitent interface with request operation. If a source would handle back pressure then each time request operation is called it would adapt and delegate call to the handled source
Imagine, that at first you use create operator that wraps a channel. Without back pressure, each time something is on channel you call emitter.next with an element
With back pressure , first when there is request called on the observable constructed by create operator, you fetch up to requested number of elements to an intermediate buffer. In a emission loop you read from the buffer instead of wrapped channel
Teiva Harsanyi
@teivah
I understand but for me the problem remains at the parent observable level. I don't really see how we could use channels to manage hot observables.
Let's say we have one hot observable and two observers
we may decide to publish each item in both channels
but it the publication in one channel may block the second one if the first observer is under pressure
we can decide to spawn the publication to a channel in a dedicated goroutine but this would break the guarantee sequentiality
before to even think about how to handle things at observer level I mean (even though I agree your solution could be appropriate)
Artur Krysiak
@venth
Hmm, if you have hot observable and you are not subscribed then nothing will be pushed to channel.
Ahh, I see, you mean that the channel blocks push till someone read from it
I’d say that now the buffer and buffer strategy comes into a play
so In case of hot observable a chosen strategy could be drop, if we cannot push into channel
Draft implementation would be probably more descriptive 😀
Teiva Harsanyi
@teivah

so In case of hot observable a chosen strategy could be drop, if we cannot push into channel

How do you check whether you can push or not into the channel?

you mean to wrap it in a kind of method handling mutexes to check whether the channel is free?
Artur Krysiak
@venth
I would use select method and check whether channel is ready
if it's ready it means that I can push at least one element
I believe
like: we can use select with a default clause to implement non-blocking sends, receives, and even non-blocking multi-way selects.
Artur Krysiak
@venth
Still an exploratory test implementation would be nice. I didn’t verify whether I can send to a channel and what happens if I send and nobody listens
Teiva Harsanyi
@teivah

Still an exploratory test implementation would be nice. I didn’t verify whether I can send to a channel and what happens if I send and nobody listens

It actually works

I mean if we take the link, it actually prints "no message sent" if there is no receiver at the time where we try to send a message to a channel
mmh interesting, I was not familiar with this non-blocking send
thanks :)
Teiva Harsanyi
@teivah
in any case, we need a way to publish items from the parent observable to something
either by publishing to a central channel and routing somehow to the observers
or by emitting elements directly to the observers
basically if there is two subscriptions, they call the first observer.onNext, wait for its completion then call the second observer.onNext
everything done synchronously
Artur Krysiak
@venth
If we would create a channel for each subscription (handled by publish operator) then we would achieve fan-out
Teiva Harsanyi
@teivah
yep and we could easily implement strategies like drop for example by just checking if the target channel is available or not
I'm trying to implement a first design (based on the branch implementing real cold observables)
Artur Krysiak
@venth
Awesome 😀
Teiva Harsanyi
@teivah
I feel like I'm starting to have something :)
obs := Just(1, 2, 3).Publish()
got1 := make([]interface{}, 0)

obs.Subscribe(handlers.NextFunc(func(i interface{}) {
    got1 = append(got1, i)
    time.Sleep(200 * time.Millisecond) // Pause the observer on purpose
}), options.WithDropBackpressureStrategy())
obs.Connect()

time.Sleep(500 * time.Millisecond) // Ugly wait just for the example
fmt.Printf("%v\n", got1)
=> Display 1
obs := Just(1, 2, 3).Publish()
got1 := make([]interface{}, 0)

obs.Subscribe(handlers.NextFunc(func(i interface{}) {
    got1 = append(got1, i)
    time.Sleep(200 * time.Millisecond) // Pause the observer on purpose
}), options.WithBufferBackpressureStrategy(3))
obs.Connect()

time.Sleep(500 * time.Millisecond) // Ugly wait just for the example
fmt.Printf("%v\n", got1)
=> Display 1, 2, 3
(branch hot_observable)
in the first example, the observer is not able to consume all the items because of the time.Sleep(200 * time.Millisecond) so it's just able to consume 1
in the second example it's the same problem but we use the WithBufferBackpressureStrategy to create a buffered channel of 3 items so it can buffer and then consume 1, 2, 3
Teiva Harsanyi
@teivah
looks good don't you think ?
Artur Krysiak
@venth
It does 👍😀 what do you think of following rxjava path in case of back pressure? They introduced specialized observable - Flowable. Thanks to flowable introduction, the back pressure is applied properly through whole pipe.
Avelino
@avelino
Guys, I'm sorry I can't keep up with the project this week. It's extremely busy in my company, I believe that next week you can get close to the project again
Teiva Harsanyi
@teivah
no problem ;)
Avelino
@avelino
Thank you for being active in the project