Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Harris
    @hjubb
    Hello all, does anyone know if the marble diagrams used in the javadoc are handled through the RxJava repo or are they maintained from a broader Rx source?
    xgear-public
    @xgear-public

    Hello! I have following construction:

    ...flatMapIterable(users)
    .map(user -> new UserUiWrapper(user))
    .toList()

    But after toList() doOnNext doesn't work, how to fix that?

    Harris
    @hjubb
    I think toList() will make it a blocking observable?
    actually nevermind that isn't right
    xgear-public
    @xgear-public
    Hello! Is it possbile to collect items from ReplaySubject into List?
    Justin Tuchek
    @justintuchek
    @xgear-public do you mean like ReplaySubject->getValues(…) http://reactivex.io/RxJava/javadoc/rx/subjects/ReplaySubject.html
    Mikhail Mustakimov
    @Mikhail57
    What do you think about this MVP sketch? What's good, what's bad? https://github.com/Mikhail57/Irknet
    Dmitriy Zaitsev
    @DmitriyZaitsev
    Is there a fundamental difference between these two snippets?
    stream.subscribe(subscriber)
    stream.subscribeOn(Schedulers.immediate()).subscribe(subscriber)
    Serban Balamaci
    @balamaci
    Hi, I'm trying to convert rxjava v1 code to v2, and I'm kinda stuck on
    https://gist.github.com/balamaci/802ce99d910bdbfc7a98ec0f9f5de45b
    I get a compile error: Error:(111, 49) java: incompatible types: no instance(s) of type variable(s) R,K,V exist so that io.reactivex.Single<R> conforms to org.reactivestreams.Publisher<? extends R>
    the error is highlighted over the map. The count() returned Single<Long> but I don't see what the problem is to map it to a Pair, and then the result of flatMap is Flowable<Pair<String, Long>> right?
    Serban Balamaci
    @balamaci
    I think it's probably because Single is not extending Publisher inside flatMap
    Serban Balamaci
    @balamaci
    was clarified here ReactiveX/RxJava#4788
    Alessandro Vermeulen
    @spockz
    I thought flatMap and concatMap where memory friendly. However, when I run the code below for a while I end up with many rx.internal.operators.OperatorMerge$MergeSubscriber and Object[] in memory, with all kind of Operators in memory as well. Is there a way I can implement the functionality below in a more memory-friendly manner?
    public class Pinging {
    
      public Observable<Int> ping(final Integer errors) {
        final rx.Observable<Integer> pingObservable =
                  rx.Observable.just(occurredErrors).delay(pingInterval, pingIntervalTimeUnit);
    
        pingObservable.flatMap(numberOfOccurredErrors ->
          final Observable<Response> pingResponse =
            callMethod(..)
    
          pingResponse.flatMap( response ->
            ...
            if (response isOk) {
              ping(numberOfOccurredErrors + 1)
            } else {
              ping(0)
            }
          )
        )      
      }
    
      ..
      ping(0)
    }
    Alessandro Vermeulen
    @spockz
    My fear is that I have to do something like trampoline out of the recursion, but it seems like that shouldn't be necessary for something as simple as this
    Alessandro Vermeulen
    @spockz
    I now solved this 'manually' by using a publish subject that is used as workqueue, a delayed stream based on the work-queue and the worker that places work items back on the queue.
    Mark Paluch
    @mp911de
    @here RxJava 1 Single is pure [1|error], not [0|1|error], right?
    AriMeidan
    @MeidanAri_twitter
    Hi all, I think i'm doing something wrong with RxJava. I have a complex flow, and its very buggy.
    I dont know what i'm missing, can someone help me 1-on-1?
    @mp911de. Single returns a single result or error.
    ganasenit
    @ganasenit
    hi do u know any chatting site for hbase
    Amir HMovahed
    @amirhaghighim_twitter
    Hi im very new to this topic do you know any good source for testing rx java?
    Amir HMovahed
    @amirhmd
    hey do you know how to use the subject to test our observable?\
    Dorus
    @Dorus
    @AmirHMovahed You should not use subjects to test your observable. Look into the testScheduler.
    Or a quick goodle leads me to TestSubscriber.
    Amir HMovahed
    @amirhmd
    @Dorus i create an interface which receive Iterable and return Observable
    Dorus
    @Dorus
    @AmirHMovahed You mean the existing function from?
    Amir HMovahed
    @amirhmd
    I want to test the implementation to make sure this object emit items in order he receive so i come up with the idea of using ReplaySubject
    Yes
    I use the from function
    Then i want to add more complex scenario
    To have reactive functionality
    Using create
    Add schedular
    I want to use the tdd approach
    So i start from the simplest approach
    @Dorus am i on the right path?
    There are also plenty of blogs to find on the subject if you google
    i dont have any good links myself, not usually on RxJava, hopefully some others here can provide good ones.
    i'm not even sure if i'm linking the right tests here :)
    Ah there, line 101 is good.
    Amir HMovahed
    @amirhmd
    @Dorus thanks
    Justin Tuchek
    @justintuchek
    Is there a way to test observables with back pressure? such that I expose an observable and can verify it behaves as intended when it’s publisher overwhelms its subscriber?
    Monkey
    @Even201314
    ```Observable.create(new ObservableOnSubscribe<User>() {
        @Override
        public void subscribe(ObservableEmitter<User> emitter) throws Exception {
            emitter.onNext(new User("Even201314", 14));
        }
    }).repeatUntil(new BooleanSupplier() {
        @Override
        public boolean getAsBoolean() throws Exception {
            repeatCount += 1;
            Log.d(TAG, "count: " + repeatCount);
            return repeatCount > 10;
        }
    })
    I would like to know ,if I use Observable.create() , would the method repeatUntil() be executed?
    David Karnok
    @akarnokd
    @jtuchek The Reactive-Streams TCK has such test infrastructure but we can only use it for RxJava 2 Flowables. There is no compact test support for 1.x Observable and you have to manually write TestSubscriber.requestMore calls and verify you got exactly the right amount after.
    @Even201314 You need an emitter.onComplete() otherwise the repeat won't get triggered.
    Monkey
    @Even201314
    @akarnokd Thx
    Justin Tuchek
    @justintuchek
    @akarnokd I’ll take a look, thanks for the guidance :thumbsup:
    pavel.shackih
    @pavelshackih
    Hi! I'm trying to implement very simple cache manager using RxJava2. Is there a way to simplify fromServer fun by removing ReplaySubject? Thanks!
    class ConfigManager() {
    
        @Volatile private var isDirty = true
        @Volatile private var cachedConfig: String? = null
        private var replay: ReplaySubject<String>? = null
    
        fun getConfig(): Single<String> = Observable.concat(fromCache().toObservable(), fromServer().toObservable()).firstOrError()
    
        private fun fromCache(): Maybe<String> = Maybe.create { if (isCacheExists()) it.onSuccess(cachedConfig) else it.onComplete() }
    
        private fun isCacheExists() = !isDirty && cachedConfig != null
    
        private fun fromServer(): Single<String> {
            if (replay == null) {
                replay = ReplaySubject.create()
                return Single.fromCallable { hardOperation() }
                        .doOnSuccess {
                            replay?.onNext(it)
                            replay?.onComplete()
                            setCache(it)
                        }
                        .doOnError {
                            replay?.onError(it)
                        }
            }
            return replay?.firstOrError()!!
        }
    
        private fun setCache(cache: String) {
            cachedConfig = cache
            isDirty = false
        }
    
        fun reload() {
            isDirty = true
            replay = null
        }
    }