These are chat archives for ReactiveX/RxJava

7th
Jun 2017
Ivan Schütz
@i-schuetz
Jun 07 2017 14:15
hello... is there a method to compare the last and current value of an observable? similar to this: https://github.com/ReactiveCocoa/ReactiveSwift/blob/45c2bd44835060a80fa23cdc7600a1554e6788f7/Sources/Property.swift#L140
Rémon S.
@Coffee2CodeNL
Jun 07 2017 15:35
Mark Elston
@melston
Jun 07 2017 15:43
@i-schuetz, you might also be able to use Scan, where you can pass on a tuple of the previous and current value.
Ivan Schütz
@i-schuetz
Jun 07 2017 15:48
@iSDP @melston thanks! I'll give it a look later today. I just started using RxJava! I tried with scan but wasn't sure how to use it. I'll come back with this later
Rémon S.
@Coffee2CodeNL
Jun 07 2017 15:52
I've published my code yesterday, if anyone wants to take a peek, it can be found here: https://github.com/iSDP/Telebot
Mark Elston
@melston
Jun 07 2017 18:26

@i-schuetz, take a look at this:

    private static String[] inputStrings = {
        "abc", "def",
        "ghi", "jkl",
        "mno"};

    // Take 2 Pairs and return a new Pair with the second element of first Pair and
    // the second element of second Pair.
    // When using this in a scan operator the first Pair represents the previous output
    // of the scan and the second Pair represents the current value coming into the scan.
    Func2<Pair<String, String>, Pair<String, String>, Pair<String, String>> scanFunc =
            (t1, t2) -> new Pair(t1.getValue(), t2.getValue());

    Observable<Pair<String, String>> o1 = Observable.from(inputStrings)
            .map(s -> new Pair<String, String>("<noval>", s))  // Initial pair has no previous value
            .scan(scanFunc); // result is a pair of previous, current values

    o1.subscribe(t -> System.out.println("Result: " + t.getKey() + " : " + t.getValue()));

The result is:

Result: <noval> : abc
Result: abc : def
Result: def : ghi
Result: ghi : jkl
Result: jkl : mno
You can now do anything you want with the Pair as it contains the previous value and the current values.
Ivan Schütz
@i-schuetz
Jun 07 2017 20:35
@melston I'm using Kotlin... getting Error:(20, 52) Type mismatch: inferred type is Observable<Pair<String, Array<String>!>!>! but Observable<Pair<String, String>> was expected
```
private val inputStrings = arrayOf("abc", "def", "ghi", "jkl", "mno")

init {

    val scanFunc = { t1: Pair<String, String> , t2: Pair<String, String> -> Pair(t1.second, t2.second) }

    val o1: Observable<Pair<String, String>> = Observable.fromArray(inputStrings)
            .map({ s -> Pair("<noval>", s) })
            .scan(scanFunc)

    o1.subscribe({ t -> System.out.println("Result: " + t.first + " : " + t.second) })
}
what am I doing wrong? Sorry for the ignorance...
Ivan Schütz
@i-schuetz
Jun 07 2017 20:45
is it fromArray? This causes map to receive the array as a parameter and not the elements
but there was no from
and just has the same result...
Rémon S.
@Coffee2CodeNL
Jun 07 2017 20:47
RxJava and Kotlin don't really like eachother
As far as i've understood
Ivan Schütz
@i-schuetz
Jun 07 2017 20:48
I'm working with Android... the choice is either that or the current Java7/8 hybrid that Android supports...
haven't noticed complaints about Rx with Kotlin so far. Where did you hear/read that?
Ivan Schütz
@i-schuetz
Jun 07 2017 20:55
anyway, Observable.from seems not to exist in RxJava 2 at least and fromArrayshould be correct. http://reactivex.io/RxJava/2.x/javadoc/ hmm
ahh it's because of the the varargs
Ivan Schütz
@i-schuetz
Jun 07 2017 21:00
this works
val o1: Observable<Pair<String, String>> = Observable.fromArray("abc", "def", "ghi", "jkl", "mno")
    .map({ s -> Pair("<noval>", s) })  // Initial pair has no previous value
    .scan(scanFunc) // result is a pair of previous, current values

or this

val o1: Observable<Pair<String, String>> = Observable.fromArray(*inputStrings)
    .map({ s -> Pair("<noval>", s) })  // Initial pair has no previous value
    .scan(scanFunc)

(sorry also new to Kotlin so figuring this out too :P)

ViTORossonero
@ViTORossonero
Jun 07 2017 21:06
Observable#fromArray has varargs as param
so you should use spread operator when pass Array<String> into this function
https://stackoverflow.com/documentation/kotlin/5835/vararg-parameters-in-functions/20551/spread-operator-passing-arrays-into-vararg-functions#t=201706072105225874107
Observable.fromArray(*inputStrings)
Mark Elston
@melston
Jun 07 2017 21:07
@i-schuetz, sorry to take so long to get back to you. I was in a meeting. I am testing with RxJava 1.3.0. I wasn't aware that from isn't available anymore. What varargs are you refering to?
Oh, just saw your post.
ViTORossonero
@ViTORossonero
Jun 07 2017 21:12

RxJava and Kotlin don't really like eachother

I don't think that's true, one thing I heard is that there're some issues with SAM convertions
ReactiveX/RxKotlin#103

Ivan Schütz
@i-schuetz
Jun 07 2017 21:14
@ViTORossonero thanks! (You seem to not have seen my message where I solved it, though?)
ViTORossonero
@ViTORossonero
Jun 07 2017 21:17
@i-schuetz yeah, missed it somehow :)
Ivan Schütz
@i-schuetz
Jun 07 2017 21:17
thanks anyway, it's not bad to have a confirmation and I have to read those interop docs
Derek Wickern
@dwickern
Jun 07 2017 21:29
I have an Observable[Event]. On each event, I want to subscribe to some Observable[Value] (similar to flatmap/concatmap). Except when the next Event occurs I want to immediately unsubscribe from the current Values and subscribe to the new ones. Is there an operator like that?
Mark Elston
@melston
Jun 07 2017 21:30
@dwickern, IIUC take a look at switch.
Derek Wickern
@dwickern
Jun 07 2017 21:31
ah, I haven't used that one
TIL flatmap and concatmap have a friend, switchmap
Ivan Schütz
@i-schuetz
Jun 07 2017 21:42
@melston thanks for the solution, btw. I need to append lastElement() (or similar) at the end to get the last 2
that converts the Observable to a Maybe though...
ah well otherwise I use e.g. map and get the last element in the block
to "keep" the Observable
ah not quite... map is of course called for each pair
takeLast(1) seems to be what I need
Mark Elston
@melston
Jun 07 2017 21:46
@i-schuetz, I'm not sure what you mean. Are you just looking to keep track of the final value-pair from the scan, after the initial Observable completes?
Oh. I see.
Ivan Schütz
@i-schuetz
Jun 07 2017 21:52
@melston I'm not sure though that this solves my problem... my observable isn't an array but a single object and I need to know its previous state
Heikki Vesalainen
@hvesalai
Jun 07 2017 21:53
@i-schuetz there's tons of ways to do it. As said scan is one, but then is zip.
first = myObservable.share();
second = first.drop(1);
firstAndSecondAreTheSame = zip(first, second, (f, s) -> f == s);
i.e. zip the stream with it self delayed by one
Ivan Schütz
@i-schuetz
Jun 07 2017 22:08
I'm guess I'm confused where the "previous" state comes from in my current use case. I don't see anything explicit about in the code I'm looking at except the combinePrevious call
and the model of the observable just knows its current state. The "history" handling is done by the rx library
so it seems that I have to implement this functionality myself? Maybe what @iSDP suggested here: :point_up: June 7, 2017 5:35 PM
Ivan Schütz
@i-schuetz
Jun 07 2017 22:14
or am I missing something...?
Mark Elston
@melston
Jun 07 2017 22:17
@i-schuetz, I may not understand your question, but I would have thought that both my approach and the one shown by @hvesalai provide the 'previous' state (and I rather like his solution better as it is simpler). But in either case the previous state comes from the Observable.
I thought your question was how to compare a current value with the previous value. Both solutions do that. Are you looking to do something else?
Ivan Schütz
@i-schuetz
Jun 07 2017 22:17
yes, if I have e.g. an Observable of an integer
it's changed from 1 to 2
I need a method that tells me "the integer is now 2" and "it was 1 before"
but I just linked to an SO post with the same question
in both your solutions there's a "data source" which contains all the states, I don't have that
Mark Elston
@melston
Jun 07 2017 22:21
In both solutions you have (1,2) passed on to the next stage. Well, in @hvesalai's solution you actually have a boolean passed on since the function does a comparison. But he could have constructed a Pair just as easily.
Ivan Schütz
@i-schuetz
Jun 07 2017 22:22
but in e.g. your solution, you have an array, which contains all the strings
Mark Elston
@melston
Jun 07 2017 22:22
If your Observable is emitting the values then you do have that.
Ivan Schütz
@i-schuetz
Jun 07 2017 22:22
I just have the current string
Mark Elston
@melston
Jun 07 2017 22:22
The array was a convenient way of creating the Observable.
Ivan Schütz
@i-schuetz
Jun 07 2017 22:24
but where is the previous state retrieved from if the content of the observable is now overwritten?
Mark Elston
@melston
Jun 07 2017 22:24
That is what scan is doing in my example and zip is doing in @hvesalai's.
Take a look at the Scan marble diagram and the Zip marble diagram and that may help.
In the zip case you are using the same observable but from two perspectives. The second one is 'delayed' by one entry so you can zip the two perspectives and get successive indices into the source Observable.
Ivan Schütz
@i-schuetz
Jun 07 2017 22:28
I get scan, but I still see no way to get the previous value with it if the source is not an iterable or similar
concerning the other example I think the trick is in share()?
Mark Elston
@melston
Jun 07 2017 22:32
share is a bit harder to explain as it depends on understanding the various types of Observables but try here to start.
Ivan Schütz
@i-schuetz
Jun 07 2017 22:35
ok... but coming back to scan and zip I think that this is separate from the state management
if I don't track the state information myself, scan and zip can't help
Mark Elston
@melston
Jun 07 2017 22:36
I guess I don't really understand your problem, then. What state are you trying to manage, if not the values emitted by the Observable?
Ivan Schütz
@i-schuetz
Jun 07 2017 22:37
how would you get the previous value of an observable from only a string, in your example, instead of an array of strings?
without any additional data structures or helpers
(and without having access to the code that triggers the change in the observable)
Mark Elston
@melston
Jun 07 2017 22:40
Go back and take a look at the result of the scan example. You have every state change in the output stream. Is that not what you are looking for?
So, just for completeness, here is a sample using zip (similar to @hvesalai's example):
public static void main(String[] args) {

    String[] inputStrings = {
            "abc", "def",
            "ghi", "jkl",
            "mno"};

    Observable<String> o1 = Observable.from(inputStrings);
    Observable<String> o2 = o1.skip(1);
    Observable<Pair<String, String>> o3 = Observable.zip(o1, o2, (v1, v2) -> new Pair(v1, v2));

    o3.subscribe(t -> System.out.println("Result: " + t.getKey() + " : " + t.getValue()));
}
This prints out the same results as the scan example.
Ivan Schütz
@i-schuetz
Jun 07 2017 22:43
no that's not my point :D I'm asking how you would do that if you have only a string, not an array of strings
how do you get the previous state of the string
Mark Elston
@melston
Jun 07 2017 22:43
And it doesn't need share :)
Which string are you talking about?
Ivan Schütz
@i-schuetz
Jun 07 2017 22:44
replace array of strings with one string
Mark Elston
@melston
Jun 07 2017 22:44
You have an Observable emitting strings. Strings don't have state.
So, if the observable emits one string and exits there is no previous state - ever.
Ivan Schütz
@i-schuetz
Jun 07 2017 22:45
just a stateful object
it's instance A, then it's updated and now it's instance B
Mark Elston
@melston
Jun 07 2017 22:45
Where does Rx enter into this?
I'm using this in redux, the observable is the app's state, which is an immutable object that gets "switched" each time that the app state changes
and when that happens I need the previous and the new state
Mark Elston
@melston
Jun 07 2017 22:48
Sure. But BehaviorSubject is, from the consumer's side, an Observable emitting stuff. Just replace the Observable.from in the example with your BehaviorSubject(...) and the rest is the same.
Except you have to generate the elements the BehaviorSubject emits on your own.
Ivan Schütz
@i-schuetz
Jun 07 2017 22:50
perhaps key there is the "replay" method
and why wouldn't it be possible to observe a string ? The string can be replaced with another instance
and I maybe want to see the old instance in the event handling
Mark Elston
@melston
Jun 07 2017 22:51
But that isn't the nature of an Observable. Observables emit things. They don't change.
Ivan Schütz
@i-schuetz
Jun 07 2017 22:54
and what kind of things allow for this kind of behaviour in RxJava? In RxSwift (which is from where I have combinePrevious) they use "Property" which seems to be basically an observer with "state"
they don't have Observable but Property instead
Mark Elston
@melston
Jun 07 2017 22:57
You may be trying to fit a round peg into a square hole. I am not familiar with Properties as you describe them. Observable appears to be a different thing. It could be that Property is a kind of wrapper around a BehaviorSubject but I don't know.
Ivan Schütz
@i-schuetz
Jun 07 2017 22:58
I'm not trying to fit anything :D just asking what I have to use...
I can change Observable into something else, if Observable is wrong for this
but at first sight doesn't feel wrong as I'm just trying to observe the state of an (immutable) object (i.e. if it gets replaced with a new instance)
probably that's what ReplaySubjector BehaviourSubject.replay are for
Ivan Schütz
@i-schuetz
Jun 07 2017 23:04
(both of which are observables...)
Ivan Schütz
@i-schuetz
Jun 07 2017 23:10
probably the cleanest implementation would be to manage the state myself, i.e. observe an array of states... but that's what ReplaySubject seems to do, so...
Mark Elston
@melston
Jun 07 2017 23:10
OK. Here is an updated example using BehaviorSubject that shows how it could be used (with zip). No need for replay.
public static void main(String[] args) {

    String[] inputStrings = {
            "abc", "def",
            "ghi", "jkl",
            "mno"};

    BehaviorSubject<String> o1 = BehaviorSubject.create();
    Observable<String> o2 = o1.skip(1);
    Observable<Pair<String, String>> o3 = Observable.zip(o1, o2, (v1, v2) -> new Pair(v1, v2));

    o3.subscribe(t -> System.out.println("Result: " + t.getKey() + " : " + t.getValue()));

    for (String s: inputStrings) {
        o1.onNext(s);
    }
    o1.onComplete();
}
This is updated for RxJava 2.0.4.
Ivan Schütz
@i-schuetz
Jun 07 2017 23:13
but you're still using an array
Mark Elston
@melston
Jun 07 2017 23:15
But that was to just simulate the changes to a value. I'm just iterating over the values and pushing them into the BehaviorSubject. So, in your case, every time a variable changes you have to call onNext() on the associated BehaviorSubject.
It doesn't happen automatically.
You could create a class that handles these kind of state changes pretty easily, though.
Ivan Schütz
@i-schuetz
Jun 07 2017 23:16
but you're not getting in each call the old and new state of the complete array...
yeah, I'd need to keep track of the history of the object
Mark Elston
@melston
Jun 07 2017 23:17
Forget the array. You said you wanted to look at 'state changes' on a string. The array just simulates the various states the string goes through.
Ivan Schütz
@i-schuetz
Jun 07 2017 23:18
try to do your example with one single string and you may understand what I mean - you'd need to store the states somewhere to be able to pass the old and new state together to onNext
Mark Elston
@melston
Jun 07 2017 23:18
This is why the Observable/BehaviorSubject is parameterized on a String
The old/new states are handled in o3
Ivan Schütz
@i-schuetz
Jun 07 2017 23:19
yeah because your array is basically the "history"
so you don't need to keep it additionally
if you just had one string you'd need to store the states in an array like the one you're using
Mark Elston
@melston
Jun 07 2017 23:20
Right. If you set up your observable chain (as we did here) you get the old/new states as part of the chain.
Ivan Schütz
@i-schuetz
Jun 07 2017 23:20
not without storing those states somewhere
or at least without the observable emitting the old and the new state but that's something, which as I mentioned before I don't have
Ivan Schütz
@i-schuetz
Jun 07 2017 23:26
@melston thanks for your time... I'm going to sleep now.
Mark Elston
@melston
Jun 07 2017 23:29
@i-schuetz, one more time. Here is a "Property" that you can get an Observable from to compare changes to its values:
static class Property<T> {
    BehaviorSubject<T> subject;

    public Property() {
        subject = BehaviorSubject.create();
    }

    public void setValue(T val) {
        subject.onNext(val);
    }

    public Observable<T> getObservable() {
        return subject;
    }
}

public static void main(String[] args) {

    String[] inputStrings = {
            "abc", "def",
            "ghi", "jkl",
            "mno"};

    //BehaviorSubject<String> o1 = BehaviorSubject.create();
    Property<String> prop = new Property<String>();

    Observable<String> o1 = prop.getObservable();
    Observable<String> o2 = o1.skip(1);
    Observable<Pair<String, String>> o3 = Observable.zip(o1, o2, (v1, v2) -> new Pair(v1, v2));

    o3.subscribe(t -> System.out.println("Result: " + t.getKey() + " : " + t.getValue()));

    // Simulate changes to the Property's value
    for (String s: inputStrings) {
        prop.setValue(s);
    }
}