Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Teiva Harsanyi
@teivah
basically if there is two subscriptions, they call the first observer.onNext, wait for its completion then call the second observer.onNext
everything done synchronously
Artur Krysiak
@venth
If we would create a channel for each subscription (handled by publish operator) then we would achieve fan-out
Teiva Harsanyi
@teivah
yep and we could easily implement strategies like drop for example by just checking if the target channel is available or not
I'm trying to implement a first design (based on the branch implementing real cold observables)
Artur Krysiak
@venth
Awesome 😀
Teiva Harsanyi
@teivah
I feel like I'm starting to have something :)
obs := Just(1, 2, 3).Publish()
got1 := make([]interface{}, 0)

obs.Subscribe(handlers.NextFunc(func(i interface{}) {
    got1 = append(got1, i)
    time.Sleep(200 * time.Millisecond) // Pause the observer on purpose
}), options.WithDropBackpressureStrategy())
obs.Connect()

time.Sleep(500 * time.Millisecond) // Ugly wait just for the example
fmt.Printf("%v\n", got1)
=> Display 1
obs := Just(1, 2, 3).Publish()
got1 := make([]interface{}, 0)

obs.Subscribe(handlers.NextFunc(func(i interface{}) {
    got1 = append(got1, i)
    time.Sleep(200 * time.Millisecond) // Pause the observer on purpose
}), options.WithBufferBackpressureStrategy(3))
obs.Connect()

time.Sleep(500 * time.Millisecond) // Ugly wait just for the example
fmt.Printf("%v\n", got1)
=> Display 1, 2, 3
(branch hot_observable)
in the first example, the observer is not able to consume all the items because of the time.Sleep(200 * time.Millisecond) so it's just able to consume 1
in the second example it's the same problem but we use the WithBufferBackpressureStrategy to create a buffered channel of 3 items so it can buffer and then consume 1, 2, 3
Teiva Harsanyi
@teivah
looks good don't you think ?
Artur Krysiak
@venth
It does 👍😀 what do you think of following rxjava path in case of back pressure? They introduced specialized observable - Flowable. Thanks to flowable introduction, the back pressure is applied properly through whole pipe.
Avelino
@avelino
Guys, I'm sorry I can't keep up with the project this week. It's extremely busy in my company, I believe that next week you can get close to the project again
Teiva Harsanyi
@teivah
no problem ;)
Avelino
@avelino
Thank you for being active in the project
Artur Krysiak
@venth
👍
Teiva Harsanyi
@teivah

They introduced specialized observable - Flowable. Thanks to flowable introduction, the back pressure is applied properly through whole pipe.

What do we have exactly in RxJava? Observable for cold observables, Flowable for hot observable with back-pressure and ConnectableObservable for hot observables without backpressure starting to publish once connect()is called?

Artur Krysiak
@venth
Yep. I think so.
Artur Krysiak
@venth
Plenty of issues 👍
Alexander Matyushentsev
@alexmt
Greetings! Just want to say hi. Great project 👍 Looking forward for v2 release.
Teiva Harsanyi
@teivah
just gave a fresh update to the issues and a fresh merge in the v2 branch by introducing new features (new operators, new observable type: hot vs cold, back-pressure)
this is really a first draft and any feedback is more than welcome
Konstantin Itskov
@trivigy
Hey everyone, and anyone. Just wanted to check-in on the project, introduce myself, and talk to someone about contribution and direction.
jimmytan
@jimmytan
hi everyone, just want to say hi ..i am still learning golang ..very interested in the language. very interesting project
Teiva Harsanyi
@teivah
Hey! The v2 is progressing very well. Please check at the readme to get some insights on the main structural changes compared to the v2.
Any comment/remark is welcome!
DonyChen
@donychen1134
Hi everyone, I recently try to learn rxgo and I'm confused by some problems.
  1. Install rxgo with "go get -u github.com/reactivex/rxgo/v2" always report an error, because there is no v2 folder in the project. I have to use "go get -u github.com/reactivex/rxgo" to install.
  2. The example of "BufferWithTime" will panic, because of the "nil" options input.
  3. Do we have add/delete/update observer functions in rxgo? It seems these functions exist in rxjava. I want to delete/add observer to an observable, but I haven't found them.
Wei Lun
@WLun001
anyone is using RxGo in production?
Avelino
@avelino
I started contributing to RxGo's core by keeping software in production using it - before exiting version 2 implemented by Teiva . I have 3 software in production using version 2
Wei Lun
@WLun001
Does RxGo has roadmap?
Darren
@darrensapalo
First time to use Gitter. Dropping by to say hello :)
Artur Krysiak
@venth
@teivah I've tried to find sources and issues of v2 where could I find them?
Alan D. Cabrera
@maguro
Hello all!
Alan D. Cabrera
@maguro
@teivah , iiuc, ReactiveX/RxGo#289 is good to go.
Lee sungju
@lsj4401
hello, Is this an active project?