Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Amir HMovahed
    @amirhmd
    Add schedular
    I want to use the tdd approach
    So i start from the simplest approach
    @Dorus am i on the right path?
    There are also plenty of blogs to find on the subject if you google
    i dont have any good links myself, not usually on RxJava, hopefully some others here can provide good ones.
    i'm not even sure if i'm linking the right tests here :)
    Ah there, line 101 is good.
    Amir HMovahed
    @amirhmd
    @Dorus thanks
    Justin Tuchek
    @justintuchek
    Is there a way to test observables with back pressure? such that I expose an observable and can verify it behaves as intended when it’s publisher overwhelms its subscriber?
    Monkey
    @Even201314
    ```Observable.create(new ObservableOnSubscribe<User>() {
        @Override
        public void subscribe(ObservableEmitter<User> emitter) throws Exception {
            emitter.onNext(new User("Even201314", 14));
        }
    }).repeatUntil(new BooleanSupplier() {
        @Override
        public boolean getAsBoolean() throws Exception {
            repeatCount += 1;
            Log.d(TAG, "count: " + repeatCount);
            return repeatCount > 10;
        }
    })
    I would like to know ,if I use Observable.create() , would the method repeatUntil() be executed?
    David Karnok
    @akarnokd
    @jtuchek The Reactive-Streams TCK has such test infrastructure but we can only use it for RxJava 2 Flowables. There is no compact test support for 1.x Observable and you have to manually write TestSubscriber.requestMore calls and verify you got exactly the right amount after.
    @Even201314 You need an emitter.onComplete() otherwise the repeat won't get triggered.
    Monkey
    @Even201314
    @akarnokd Thx
    Justin Tuchek
    @justintuchek
    @akarnokd I’ll take a look, thanks for the guidance :thumbsup:
    pavel.shackih
    @pavelshackih
    Hi! I'm trying to implement very simple cache manager using RxJava2. Is there a way to simplify fromServer fun by removing ReplaySubject? Thanks!
    class ConfigManager() {
    
        @Volatile private var isDirty = true
        @Volatile private var cachedConfig: String? = null
        private var replay: ReplaySubject<String>? = null
    
        fun getConfig(): Single<String> = Observable.concat(fromCache().toObservable(), fromServer().toObservable()).firstOrError()
    
        private fun fromCache(): Maybe<String> = Maybe.create { if (isCacheExists()) it.onSuccess(cachedConfig) else it.onComplete() }
    
        private fun isCacheExists() = !isDirty && cachedConfig != null
    
        private fun fromServer(): Single<String> {
            if (replay == null) {
                replay = ReplaySubject.create()
                return Single.fromCallable { hardOperation() }
                        .doOnSuccess {
                            replay?.onNext(it)
                            replay?.onComplete()
                            setCache(it)
                        }
                        .doOnError {
                            replay?.onError(it)
                        }
            }
            return replay?.firstOrError()!!
        }
    
        private fun setCache(cache: String) {
            cachedConfig = cache
            isDirty = false
        }
    
        fun reload() {
            isDirty = true
            replay = null
        }
    }
    Eido95
    @Eido95
    Hey there. I would like to know how can I use the "distinct rxjava-async" module in Android Studio in order to be able to use the start operator.
    I forgot to mention, I'm currently using the following libraries:
    compile 'io.reactivex.rxjava2:rxjava:2.0.1'
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    Eido95
    @Eido95
    Never mind, I found the solution on this thread:
    RxJava. Where are the operators?
    Thanks anyway.
    David Karnok
    @akarnokd
    The 2.0 compatible version lives in RxJava2Extensions: https://github.com/akarnokd/RxJava2Extensions#asynchronous-jumpstarting-a-sequence
    Eido95
    @Eido95
    Thank you for your link @akarnokd, I'll inspect it.
    Yannick Lecaillez
    @ylecaillez
    Hi guys. I had a look at the code available at https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0#backpressure-and-cancellation If think there is a bug for empty stream (onComplete() called without onNext()) if subscriber does not request() items. Shouldn't be the statement if (e != r) { ... } could actually be if (e == r) { ... } ?
    David Karnok
    @akarnokd
    @ylecaillez thanks, fixed
    DavidMihola
    @DavidMihola
    Just a quick question: In RxJava 1.2.x, what's the difference between a BehaviorSubject.create() and a ReplaySubject.createWithSize(1)?
    David Karnok
    @akarnokd
    BehaviorSubject doesn't retain the last emitted value if it gets terminated, ReplaySubject does.
    DavidMihola
    @DavidMihola
    And "if it get's terminated" in this context means - that the number of Subscribers goes back down to 0? If so, does that also apply if I subscribe "into" the Subject, i. e. myObservable.subscribe(mySubject)?
    David Karnok
    @akarnokd
    terminated == onError or onCompleted gets called
    DavidMihola
    @DavidMihola
    Oh, yes, I am sorry. OK, so after onError/onCompleted in a ReplaySubject.createWithSize(1) I get the last regular item AND the terminating event; in a BehaviorSubject.create() I get ONLY the terminating event. Right?
    David Karnok
    @akarnokd
    Yes.
    DavidMihola
    @DavidMihola
    Perfect! As always, thank you very much for your help!
    Yannick Lecaillez
    @ylecaillez
    Hi guys ! What is the correct way to notify the second subscriber that a publisher is actually supporting only one subscriber ?
    Yannick Lecaillez
    @ylecaillez
    Oh, 1.9: "... he only legal way to signal failure (or reject the Subscriber) is by calling onError (after calling onSubscribe)."
    Ronen
    @ronenhamias
    Hello Rx java i am using Rx Java for the open source project http://scalecube.io/ and got the following exception Caused by: rx.exceptions.MissingBackpressureException: PublishSubject: could not emit value due to lack of requests. what would be the best Backpressure strategy to use when i cant afford loosing messages and dont want to buffer to many messages ? can someone better explain this exception? Thanks in advance !
    David Karnok
    @akarnokd
    @ronenhamias What is your data source? For what chain do you get the error? You could switch to a built-in source that does honor backpressure. Otherwise, you have to buffer or drop values.
    Ronen
    @ronenhamias

    i am accepting messages on netty listener ```@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
    Message message = (Message) msg;
    if (LOGGER.isDebugEnabled()) {
    LOGGER.debug("Received: {}", message);
    }

    incomingMessagesSubject.onNext(message);

    }```

    ```
    and consumers subscribe on the subject
     @Nonnull
      @Override
      public final Observable<Message> listen() {
        checkState(!stopped, "Transport is stopped");
        return incomingMessagesSubject.onBackpressureBuffer(DEFAULT_BUFFER_LIMIT).asObservable();
      }
    currently using onBackpressureBuffer
    David Karnok
    @akarnokd
    I see. I don't know enough about Netty but RxNetty does seem to work without backpressure problems. Otherwise, you have to tie the reading of a channel to the request amount of the downstream by building a custom operator and not using PublishSubject.
    Ronen
    @ronenhamias
    i will look deeper into RxNetty
    Stanislav Shakirov
    @punksta

    Hello! I need to make request after complete of changing ui.
    pseudo-code

    someUiEvents()
    .debounce()
    .filter { it is EditComplete }
    .flatMap { doRequest }
    .doOnNext { updateUi }

    Can I cancel(unsubscribe) from doRequest if got new event?

    I can create subscription inside flatMap and cancel it every time I got emitting. But can it be solved ussing operators only?
    Dorus
    @Dorus
    @punksta yeah you can use switch for that.
    Ronen
    @ronenhamias
    does anyone knows good implementation for RX java with RabbitMQ?