Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Serban Balamaci
    @balamaci
    the error is highlighted over the map. The count() returned Single<Long> but I don't see what the problem is to map it to a Pair, and then the result of flatMap is Flowable<Pair<String, Long>> right?
    Serban Balamaci
    @balamaci
    I think it's probably because Single is not extending Publisher inside flatMap
    Serban Balamaci
    @balamaci
    was clarified here ReactiveX/RxJava#4788
    Alessandro Vermeulen
    @spockz
    I thought flatMap and concatMap where memory friendly. However, when I run the code below for a while I end up with many rx.internal.operators.OperatorMerge$MergeSubscriber and Object[] in memory, with all kind of Operators in memory as well. Is there a way I can implement the functionality below in a more memory-friendly manner?
    public class Pinging {
    
      public Observable<Int> ping(final Integer errors) {
        final rx.Observable<Integer> pingObservable =
                  rx.Observable.just(occurredErrors).delay(pingInterval, pingIntervalTimeUnit);
    
        pingObservable.flatMap(numberOfOccurredErrors ->
          final Observable<Response> pingResponse =
            callMethod(..)
    
          pingResponse.flatMap( response ->
            ...
            if (response isOk) {
              ping(numberOfOccurredErrors + 1)
            } else {
              ping(0)
            }
          )
        )      
      }
    
      ..
      ping(0)
    }
    Alessandro Vermeulen
    @spockz
    My fear is that I have to do something like trampoline out of the recursion, but it seems like that shouldn't be necessary for something as simple as this
    Alessandro Vermeulen
    @spockz
    I now solved this 'manually' by using a publish subject that is used as workqueue, a delayed stream based on the work-queue and the worker that places work items back on the queue.
    Mark Paluch
    @mp911de
    @here RxJava 1 Single is pure [1|error], not [0|1|error], right?
    AriMeidan
    @MeidanAri_twitter
    Hi all, I think i'm doing something wrong with RxJava. I have a complex flow, and its very buggy.
    I dont know what i'm missing, can someone help me 1-on-1?
    @mp911de. Single returns a single result or error.
    ganasenit
    @ganasenit
    hi do u know any chatting site for hbase
    Amir HMovahed
    @amirhaghighim_twitter
    Hi im very new to this topic do you know any good source for testing rx java?
    Amir HMovahed
    @amirhmd
    hey do you know how to use the subject to test our observable?\
    Dorus
    @Dorus
    @AmirHMovahed You should not use subjects to test your observable. Look into the testScheduler.
    Or a quick goodle leads me to TestSubscriber.
    Amir HMovahed
    @amirhmd
    @Dorus i create an interface which receive Iterable and return Observable
    Dorus
    @Dorus
    @AmirHMovahed You mean the existing function from?
    Amir HMovahed
    @amirhmd
    I want to test the implementation to make sure this object emit items in order he receive so i come up with the idea of using ReplaySubject
    Yes
    I use the from function
    Then i want to add more complex scenario
    To have reactive functionality
    Using create
    Add schedular
    I want to use the tdd approach
    So i start from the simplest approach
    @Dorus am i on the right path?
    There are also plenty of blogs to find on the subject if you google
    i dont have any good links myself, not usually on RxJava, hopefully some others here can provide good ones.
    i'm not even sure if i'm linking the right tests here :)
    Ah there, line 101 is good.
    Amir HMovahed
    @amirhmd
    @Dorus thanks
    Justin Tuchek
    @justintuchek
    Is there a way to test observables with back pressure? such that I expose an observable and can verify it behaves as intended when it’s publisher overwhelms its subscriber?
    Monkey
    @Even201314
    ```Observable.create(new ObservableOnSubscribe<User>() {
        @Override
        public void subscribe(ObservableEmitter<User> emitter) throws Exception {
            emitter.onNext(new User("Even201314", 14));
        }
    }).repeatUntil(new BooleanSupplier() {
        @Override
        public boolean getAsBoolean() throws Exception {
            repeatCount += 1;
            Log.d(TAG, "count: " + repeatCount);
            return repeatCount > 10;
        }
    })
    I would like to know ,if I use Observable.create() , would the method repeatUntil() be executed?
    David Karnok
    @akarnokd
    @jtuchek The Reactive-Streams TCK has such test infrastructure but we can only use it for RxJava 2 Flowables. There is no compact test support for 1.x Observable and you have to manually write TestSubscriber.requestMore calls and verify you got exactly the right amount after.
    @Even201314 You need an emitter.onComplete() otherwise the repeat won't get triggered.
    Monkey
    @Even201314
    @akarnokd Thx
    Justin Tuchek
    @justintuchek
    @akarnokd I’ll take a look, thanks for the guidance :thumbsup:
    pavel.shackih
    @pavelshackih
    Hi! I'm trying to implement very simple cache manager using RxJava2. Is there a way to simplify fromServer fun by removing ReplaySubject? Thanks!
    class ConfigManager() {
    
        @Volatile private var isDirty = true
        @Volatile private var cachedConfig: String? = null
        private var replay: ReplaySubject<String>? = null
    
        fun getConfig(): Single<String> = Observable.concat(fromCache().toObservable(), fromServer().toObservable()).firstOrError()
    
        private fun fromCache(): Maybe<String> = Maybe.create { if (isCacheExists()) it.onSuccess(cachedConfig) else it.onComplete() }
    
        private fun isCacheExists() = !isDirty && cachedConfig != null
    
        private fun fromServer(): Single<String> {
            if (replay == null) {
                replay = ReplaySubject.create()
                return Single.fromCallable { hardOperation() }
                        .doOnSuccess {
                            replay?.onNext(it)
                            replay?.onComplete()
                            setCache(it)
                        }
                        .doOnError {
                            replay?.onError(it)
                        }
            }
            return replay?.firstOrError()!!
        }
    
        private fun setCache(cache: String) {
            cachedConfig = cache
            isDirty = false
        }
    
        fun reload() {
            isDirty = true
            replay = null
        }
    }
    Eido95
    @Eido95
    Hey there. I would like to know how can I use the "distinct rxjava-async" module in Android Studio in order to be able to use the start operator.
    I forgot to mention, I'm currently using the following libraries:
    compile 'io.reactivex.rxjava2:rxjava:2.0.1'
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    Eido95
    @Eido95
    Never mind, I found the solution on this thread:
    RxJava. Where are the operators?
    Thanks anyway.
    David Karnok
    @akarnokd
    The 2.0 compatible version lives in RxJava2Extensions: https://github.com/akarnokd/RxJava2Extensions#asynchronous-jumpstarting-a-sequence
    Eido95
    @Eido95
    Thank you for your link @akarnokd, I'll inspect it.
    Yannick Lecaillez
    @ylecaillez
    Hi guys. I had a look at the code available at https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#backpressure-and-cancellation If think there is a bug for empty stream (onComplete() called without onNext()) if subscriber does not request() items. Shouldn't be the statement if (e != r) { ... } could actually be if (e == r) { ... } ?
    David Karnok
    @akarnokd
    @ylecaillez thanks, fixed
    DavidMihola
    @DavidMihola
    Just a quick question: In RxJava 1.2.x, what's the difference between a BehaviorSubject.create() and a ReplaySubject.createWithSize(1)?
    David Karnok
    @akarnokd
    BehaviorSubject doesn't retain the last emitted value if it gets terminated, ReplaySubject does.