Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Dorus
    @Dorus
    There are also plenty of blogs to find on the subject if you google
    i dont have any good links myself, not usually on RxJava, hopefully some others here can provide good ones.
    i'm not even sure if i'm linking the right tests here :)
    Ah there, line 101 is good.
    Amir HMovahed
    @amirhmd
    @Dorus thanks
    Justin Tuchek
    @justintuchek
    Is there a way to test observables with back pressure? such that I expose an observable and can verify it behaves as intended when it’s publisher overwhelms its subscriber?
    Monkey
    @Even201314
    ```Observable.create(new ObservableOnSubscribe<User>() {
        @Override
        public void subscribe(ObservableEmitter<User> emitter) throws Exception {
            emitter.onNext(new User("Even201314", 14));
        }
    }).repeatUntil(new BooleanSupplier() {
        @Override
        public boolean getAsBoolean() throws Exception {
            repeatCount += 1;
            Log.d(TAG, "count: " + repeatCount);
            return repeatCount > 10;
        }
    })
    I would like to know ,if I use Observable.create() , would the method repeatUntil() be executed?
    David Karnok
    @akarnokd
    @jtuchek The Reactive-Streams TCK has such test infrastructure but we can only use it for RxJava 2 Flowables. There is no compact test support for 1.x Observable and you have to manually write TestSubscriber.requestMore calls and verify you got exactly the right amount after.
    @Even201314 You need an emitter.onComplete() otherwise the repeat won't get triggered.
    Monkey
    @Even201314
    @akarnokd Thx
    Justin Tuchek
    @justintuchek
    @akarnokd I’ll take a look, thanks for the guidance :thumbsup:
    pavel.shackih
    @pavelshackih
    Hi! I'm trying to implement very simple cache manager using RxJava2. Is there a way to simplify fromServer fun by removing ReplaySubject? Thanks!
    class ConfigManager() {
    
        @Volatile private var isDirty = true
        @Volatile private var cachedConfig: String? = null
        private var replay: ReplaySubject<String>? = null
    
        fun getConfig(): Single<String> = Observable.concat(fromCache().toObservable(), fromServer().toObservable()).firstOrError()
    
        private fun fromCache(): Maybe<String> = Maybe.create { if (isCacheExists()) it.onSuccess(cachedConfig) else it.onComplete() }
    
        private fun isCacheExists() = !isDirty && cachedConfig != null
    
        private fun fromServer(): Single<String> {
            if (replay == null) {
                replay = ReplaySubject.create()
                return Single.fromCallable { hardOperation() }
                        .doOnSuccess {
                            replay?.onNext(it)
                            replay?.onComplete()
                            setCache(it)
                        }
                        .doOnError {
                            replay?.onError(it)
                        }
            }
            return replay?.firstOrError()!!
        }
    
        private fun setCache(cache: String) {
            cachedConfig = cache
            isDirty = false
        }
    
        fun reload() {
            isDirty = true
            replay = null
        }
    }
    Eido95
    @Eido95
    Hey there. I would like to know how can I use the "distinct rxjava-async" module in Android Studio in order to be able to use the start operator.
    I forgot to mention, I'm currently using the following libraries:
    compile 'io.reactivex.rxjava2:rxjava:2.0.1'
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    Eido95
    @Eido95
    Never mind, I found the solution on this thread:
    RxJava. Where are the operators?
    Thanks anyway.
    David Karnok
    @akarnokd
    The 2.0 compatible version lives in RxJava2Extensions: https://github.com/akarnokd/RxJava2Extensions#asynchronous-jumpstarting-a-sequence
    Eido95
    @Eido95
    Thank you for your link @akarnokd, I'll inspect it.
    Yannick Lecaillez
    @ylecaillez
    Hi guys. I had a look at the code available at https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#backpressure-and-cancellation If think there is a bug for empty stream (onComplete() called without onNext()) if subscriber does not request() items. Shouldn't be the statement if (e != r) { ... } could actually be if (e == r) { ... } ?
    David Karnok
    @akarnokd
    @ylecaillez thanks, fixed
    DavidMihola
    @DavidMihola
    Just a quick question: In RxJava 1.2.x, what's the difference between a BehaviorSubject.create() and a ReplaySubject.createWithSize(1)?
    David Karnok
    @akarnokd
    BehaviorSubject doesn't retain the last emitted value if it gets terminated, ReplaySubject does.
    DavidMihola
    @DavidMihola
    And "if it get's terminated" in this context means - that the number of Subscribers goes back down to 0? If so, does that also apply if I subscribe "into" the Subject, i. e. myObservable.subscribe(mySubject)?
    David Karnok
    @akarnokd
    terminated == onError or onCompleted gets called
    DavidMihola
    @DavidMihola
    Oh, yes, I am sorry. OK, so after onError/onCompleted in a ReplaySubject.createWithSize(1) I get the last regular item AND the terminating event; in a BehaviorSubject.create() I get ONLY the terminating event. Right?
    David Karnok
    @akarnokd
    Yes.
    DavidMihola
    @DavidMihola
    Perfect! As always, thank you very much for your help!
    Yannick Lecaillez
    @ylecaillez
    Hi guys ! What is the correct way to notify the second subscriber that a publisher is actually supporting only one subscriber ?
    Yannick Lecaillez
    @ylecaillez
    Oh, 1.9: "... he only legal way to signal failure (or reject the Subscriber) is by calling onError (after calling onSubscribe)."
    Ronen
    @ronenhamias
    Hello Rx java i am using Rx Java for the open source project http://scalecube.io/ and got the following exception Caused by: rx.exceptions.MissingBackpressureException: PublishSubject: could not emit value due to lack of requests. what would be the best Backpressure strategy to use when i cant afford loosing messages and dont want to buffer to many messages ? can someone better explain this exception? Thanks in advance !
    David Karnok
    @akarnokd
    @ronenhamias What is your data source? For what chain do you get the error? You could switch to a built-in source that does honor backpressure. Otherwise, you have to buffer or drop values.
    Ronen
    @ronenhamias

    i am accepting messages on netty listener ```@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
    Message message = (Message) msg;
    if (LOGGER.isDebugEnabled()) {
    LOGGER.debug("Received: {}", message);
    }

    incomingMessagesSubject.onNext(message);

    }```

    ```
    and consumers subscribe on the subject
     @Nonnull
      @Override
      public final Observable<Message> listen() {
        checkState(!stopped, "Transport is stopped");
        return incomingMessagesSubject.onBackpressureBuffer(DEFAULT_BUFFER_LIMIT).asObservable();
      }
    currently using onBackpressureBuffer
    David Karnok
    @akarnokd
    I see. I don't know enough about Netty but RxNetty does seem to work without backpressure problems. Otherwise, you have to tie the reading of a channel to the request amount of the downstream by building a custom operator and not using PublishSubject.
    Ronen
    @ronenhamias
    i will look deeper into RxNetty
    Stanislav Shakirov
    @punksta

    Hello! I need to make request after complete of changing ui.
    pseudo-code

    someUiEvents()
    .debounce()
    .filter { it is EditComplete }
    .flatMap { doRequest }
    .doOnNext { updateUi }

    Can I cancel(unsubscribe) from doRequest if got new event?

    I can create subscription inside flatMap and cancel it every time I got emitting. But can it be solved ussing operators only?
    Dorus
    @Dorus
    @punksta yeah you can use switch for that.
    Ronen
    @ronenhamias
    does anyone knows good implementation for RX java with RabbitMQ?
    Renan Ferrari
    @renanferrari

    Hey guys!

    I'm trying to figure out how to wrap listeners that always calls its callbacks from a specific thread into an Observable that conforms to the Scheduler defined by .subscribeOn(). I have a detailed StackOverflow question here: http://stackoverflow.com/q/40853783/518179

    I've been trying to figure this one out for a couple of weeks now. Any help would be highly appreciated.

    Thank you!

    Dorus
    @Dorus
    @renanferrari try observeOn, or pass a scheduler to one of your operators.
    Renan Ferrari
    @renanferrari
    @Dorus That does not solve the problem.
    Dorus
    @Dorus
    @renanferrari Sorry i didnt read that close enough, will see if i can find some answers tomorrow (doesnt look like anyone else is around, weird normally you get more replies here). One question, do you want to change Firebase's SDK to use a different thread, or do you want to change Rx to make downstream operators use a different thread?
    Dorus
    @Dorus
    @renanferrari I looked into it a bit more, and it seems it's 100% pointless to use subscribeOn as firebase handles calling stuff in the background itself. However if you need to process data returned from firebase elsewhere (on another thread), you need to schedule it yourself. Use .observeOn for that.
    Can you explain why observeOn does not solve your problem?