Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Teiva Harsanyi
@teivah
thanks :)
Teiva Harsanyi
@teivah
in any case, we need a way to publish items from the parent observable to something
either by publishing to a central channel and routing somehow to the observers
or by emitting elements directly to the observers
basically if there is two subscriptions, they call the first observer.onNext, wait for its completion then call the second observer.onNext
everything done synchronously
Artur Krysiak
@venth
If we would create a channel for each subscription (handled by publish operator) then we would achieve fan-out
Teiva Harsanyi
@teivah
yep and we could easily implement strategies like drop for example by just checking if the target channel is available or not
I'm trying to implement a first design (based on the branch implementing real cold observables)
Artur Krysiak
@venth
Awesome 😀
Teiva Harsanyi
@teivah
I feel like I'm starting to have something :)
obs := Just(1, 2, 3).Publish()
got1 := make([]interface{}, 0)

obs.Subscribe(handlers.NextFunc(func(i interface{}) {
    got1 = append(got1, i)
    time.Sleep(200 * time.Millisecond) // Pause the observer on purpose
}), options.WithDropBackpressureStrategy())
obs.Connect()

time.Sleep(500 * time.Millisecond) // Ugly wait just for the example
fmt.Printf("%v\n", got1)
=> Display 1
obs := Just(1, 2, 3).Publish()
got1 := make([]interface{}, 0)

obs.Subscribe(handlers.NextFunc(func(i interface{}) {
    got1 = append(got1, i)
    time.Sleep(200 * time.Millisecond) // Pause the observer on purpose
}), options.WithBufferBackpressureStrategy(3))
obs.Connect()

time.Sleep(500 * time.Millisecond) // Ugly wait just for the example
fmt.Printf("%v\n", got1)
=> Display 1, 2, 3
(branch hot_observable)
in the first example, the observer is not able to consume all the items because of the time.Sleep(200 * time.Millisecond) so it's just able to consume 1
in the second example it's the same problem but we use the WithBufferBackpressureStrategy to create a buffered channel of 3 items so it can buffer and then consume 1, 2, 3
Teiva Harsanyi
@teivah
looks good don't you think ?
Artur Krysiak
@venth
It does 👍😀 what do you think of following rxjava path in case of back pressure? They introduced specialized observable - Flowable. Thanks to flowable introduction, the back pressure is applied properly through whole pipe.
Avelino
@avelino
Guys, I'm sorry I can't keep up with the project this week. It's extremely busy in my company, I believe that next week you can get close to the project again
Teiva Harsanyi
@teivah
no problem ;)
Avelino
@avelino
Thank you for being active in the project
Artur Krysiak
@venth
👍
Teiva Harsanyi
@teivah

They introduced specialized observable - Flowable. Thanks to flowable introduction, the back pressure is applied properly through whole pipe.

What do we have exactly in RxJava? Observable for cold observables, Flowable for hot observable with back-pressure and ConnectableObservable for hot observables without backpressure starting to publish once connect()is called?

Artur Krysiak
@venth
Yep. I think so.
Artur Krysiak
@venth
Plenty of issues 👍
Alexander Matyushentsev
@alexmt
Greetings! Just want to say hi. Great project 👍 Looking forward for v2 release.
Teiva Harsanyi
@teivah
just gave a fresh update to the issues and a fresh merge in the v2 branch by introducing new features (new operators, new observable type: hot vs cold, back-pressure)
this is really a first draft and any feedback is more than welcome
Konstantin Itskov
@trivigy
Hey everyone, and anyone. Just wanted to check-in on the project, introduce myself, and talk to someone about contribution and direction.
jimmytan
@jimmytan
hi everyone, just want to say hi ..i am still learning golang ..very interested in the language. very interesting project
Teiva Harsanyi
@teivah
Hey! The v2 is progressing very well. Please check at the readme to get some insights on the main structural changes compared to the v2.
Any comment/remark is welcome!
DonyChen
@donychen1134
Hi everyone, I recently try to learn rxgo and I'm confused by some problems.
  1. Install rxgo with "go get -u github.com/reactivex/rxgo/v2" always report an error, because there is no v2 folder in the project. I have to use "go get -u github.com/reactivex/rxgo" to install.
  2. The example of "BufferWithTime" will panic, because of the "nil" options input.
  3. Do we have add/delete/update observer functions in rxgo? It seems these functions exist in rxjava. I want to delete/add observer to an observable, but I haven't found them.
Wei Lun
@WLun001
anyone is using RxGo in production?
Avelino
@avelino
I started contributing to RxGo's core by keeping software in production using it - before exiting version 2 implemented by Teiva . I have 3 software in production using version 2
Wei Lun
@WLun001
Does RxGo has roadmap?
Darren
@darrensapalo
First time to use Gitter. Dropping by to say hello :)
Artur Krysiak
@venth
@teivah I've tried to find sources and issues of v2 where could I find them?
Alan D. Cabrera
@maguro
Hello all!
Alan D. Cabrera
@maguro
@teivah , iiuc, ReactiveX/RxGo#289 is good to go.
Lee sungju
@lsj4401
hello, Is this an active project?