Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
    I think toList() will make it a blocking observable?
    actually nevermind that isn't right
    Hello! Is it possbile to collect items from ReplaySubject into List?
    Justin Tuchek
    @xgear-public do you mean like ReplaySubject->getValues(…) http://reactivex.io/RxJava/javadoc/rx/subjects/ReplaySubject.html
    Mikhail Mustakimov
    What do you think about this MVP sketch? What's good, what's bad? https://github.com/Mikhail57/Irknet
    Dmitriy Zaitsev
    Is there a fundamental difference between these two snippets?
    Serban Balamaci
    Hi, I'm trying to convert rxjava v1 code to v2, and I'm kinda stuck on
    I get a compile error: Error:(111, 49) java: incompatible types: no instance(s) of type variable(s) R,K,V exist so that io.reactivex.Single<R> conforms to org.reactivestreams.Publisher<? extends R>
    the error is highlighted over the map. The count() returned Single<Long> but I don't see what the problem is to map it to a Pair, and then the result of flatMap is Flowable<Pair<String, Long>> right?
    Serban Balamaci
    I think it's probably because Single is not extending Publisher inside flatMap
    Serban Balamaci
    was clarified here ReactiveX/RxJava#4788
    Alessandro Vermeulen
    I thought flatMap and concatMap where memory friendly. However, when I run the code below for a while I end up with many rx.internal.operators.OperatorMerge$MergeSubscriber and Object[] in memory, with all kind of Operators in memory as well. Is there a way I can implement the functionality below in a more memory-friendly manner?
    public class Pinging {
      public Observable<Int> ping(final Integer errors) {
        final rx.Observable<Integer> pingObservable =
                  rx.Observable.just(occurredErrors).delay(pingInterval, pingIntervalTimeUnit);
        pingObservable.flatMap(numberOfOccurredErrors ->
          final Observable<Response> pingResponse =
          pingResponse.flatMap( response ->
            if (response isOk) {
              ping(numberOfOccurredErrors + 1)
            } else {
    Alessandro Vermeulen
    My fear is that I have to do something like trampoline out of the recursion, but it seems like that shouldn't be necessary for something as simple as this
    Alessandro Vermeulen
    I now solved this 'manually' by using a publish subject that is used as workqueue, a delayed stream based on the work-queue and the worker that places work items back on the queue.
    Mark Paluch
    @here RxJava 1 Single is pure [1|error], not [0|1|error], right?
    Hi all, I think i'm doing something wrong with RxJava. I have a complex flow, and its very buggy.
    I dont know what i'm missing, can someone help me 1-on-1?
    @mp911de. Single returns a single result or error.
    hi do u know any chatting site for hbase
    Amir HMovahed
    Hi im very new to this topic do you know any good source for testing rx java?
    Amir HMovahed
    hey do you know how to use the subject to test our observable?\
    @AmirHMovahed You should not use subjects to test your observable. Look into the testScheduler.
    Or a quick goodle leads me to TestSubscriber.
    Amir HMovahed
    @Dorus i create an interface which receive Iterable and return Observable
    @AmirHMovahed You mean the existing function from?
    Amir HMovahed
    I want to test the implementation to make sure this object emit items in order he receive so i come up with the idea of using ReplaySubject
    I use the from function
    Then i want to add more complex scenario
    To have reactive functionality
    Using create
    Add schedular
    I want to use the tdd approach
    So i start from the simplest approach
    @Dorus am i on the right path?
    There are also plenty of blogs to find on the subject if you google
    i dont have any good links myself, not usually on RxJava, hopefully some others here can provide good ones.
    i'm not even sure if i'm linking the right tests here :)
    Ah there, line 101 is good.
    Amir HMovahed
    @Dorus thanks
    Justin Tuchek
    Is there a way to test observables with back pressure? such that I expose an observable and can verify it behaves as intended when it’s publisher overwhelms its subscriber?
    ```Observable.create(new ObservableOnSubscribe<User>() {
        public void subscribe(ObservableEmitter<User> emitter) throws Exception {
            emitter.onNext(new User("Even201314", 14));
    }).repeatUntil(new BooleanSupplier() {
        public boolean getAsBoolean() throws Exception {
            repeatCount += 1;
            Log.d(TAG, "count: " + repeatCount);
            return repeatCount > 10;
    I would like to know ,if I use Observable.create() , would the method repeatUntil() be executed?
    David Karnok
    @jtuchek The Reactive-Streams TCK has such test infrastructure but we can only use it for RxJava 2 Flowables. There is no compact test support for 1.x Observable and you have to manually write TestSubscriber.requestMore calls and verify you got exactly the right amount after.
    @Even201314 You need an emitter.onComplete() otherwise the repeat won't get triggered.
    @akarnokd Thx
    Justin Tuchek
    @akarnokd I’ll take a look, thanks for the guidance :thumbsup:
    Hi! I'm trying to implement very simple cache manager using RxJava2. Is there a way to simplify fromServer fun by removing ReplaySubject? Thanks!
    class ConfigManager() {
        @Volatile private var isDirty = true
        @Volatile private var cachedConfig: String? = null
        private var replay: ReplaySubject<String>? = null
        fun getConfig(): Single<String> = Observable.concat(fromCache().toObservable(), fromServer().toObservable()).firstOrError()
        private fun fromCache(): Maybe<String> = Maybe.create { if (isCacheExists()) it.onSuccess(cachedConfig) else it.onComplete() }
        private fun isCacheExists() = !isDirty && cachedConfig != null
        private fun fromServer(): Single<String> {
            if (replay == null) {
                replay = ReplaySubject.create()
                return Single.fromCallable { hardOperation() }
                        .doOnSuccess {
                        .doOnError {
            return replay?.firstOrError()!!
        private fun setCache(cache: String) {
            cachedConfig = cache
            isDirty = false
        fun reload() {
            isDirty = true
            replay = null
    Hey there. I would like to know how can I use the "distinct rxjava-async" module in Android Studio in order to be able to use the start operator.