These are chat archives for ReactiveX/RxJava

30th
Dec 2015
Javier Domingo Cansino
@txomon
Dec 30 2015 12:36
hmmm thinking about how to make a reactive repository pattern, I had thought about having the repo send a Observable<List<Item>>, and sending an Observable<Item> that would send the updates for each item
of course, in my business logic, I would need to convert the Observable<List<Item>> into a stream of Observable<Item>, and concat the updates from Observable<Item>
but I have no idea on how to do it :(
Michael Nitschinger
@daschl
Dec 30 2015 12:38
@txomon can you explain a little more?
happy to help :)
Javier Domingo Cansino
@txomon
Dec 30 2015 12:39
@daschl so I have a set of items that can be updated
Michael Nitschinger
@daschl
Dec 30 2015 12:39
okay, can you show me some simple API or so?
Javier Domingo Cansino
@txomon
Dec 30 2015 12:40
the design I have come up with would be
sure
public interface ContactRepository {
    Observable<List<Contact>> getContactList();

    Observable<Contact> getContactUpdates();
}
Michael Nitschinger
@daschl
Dec 30 2015 12:41
okay
so let me ask something
Javier Domingo Cansino
@txomon
Dec 30 2015 12:41
(I have deleted the other call which isn't interesting for the usecase)
Michael Nitschinger
@daschl
Dec 30 2015 12:41
okay gotcha I think, so whats the problem from there?
Javier Domingo Cansino
@txomon
Dec 30 2015 12:43
just in case, this is an app I am building to train myself in rx, android etc. so the idea is, the Observable<List<Contact>> getContactList() returns a cold observable and the Observable<Contact> getContactUpdates() returns a Hot observable
Michael Nitschinger
@daschl
Dec 30 2015 12:44
ah okay, so you mean that every time an update comes in a new contact is pushed?
Javier Domingo Cansino
@txomon
Dec 30 2015 12:44
yep
Michael Nitschinger
@daschl
Dec 30 2015 12:44
alright
Javier Domingo Cansino
@txomon
Dec 30 2015 12:44
the user of the interface is prepared to update contacts by having new objects sent directly
Michael Nitschinger
@daschl
Dec 30 2015 12:45
okay, and where is the trouble for you implementing?
Javier Domingo Cansino
@txomon
Dec 30 2015 12:47
so, the idea is to create a cold Observable<Contact> that, concatenates the return getContactList(), which has to be converted into Observable<Contact> items, and the buffered getContactUpdates return
lets call cl to the return of getContactList(), cu to the return of getContactUpdates
Michael Nitschinger
@daschl
Dec 30 2015 12:48
so you want, every time Contact is emitted on the hot observable, “merge” it with the data on the cold one?
Javier Domingo Cansino
@txomon
Dec 30 2015 12:48
nope
not here
that is made by the user, now I need to generate a single stream of updates
Michael Nitschinger
@daschl
Dec 30 2015 12:49
okay, so you want to know how to create a hot observable?
Javier Domingo Cansino
@txomon
Dec 30 2015 12:49
the idea is to convert cl into an Observable<Contact> that emits all the items in cl
Michael Nitschinger
@daschl
Dec 30 2015 12:50
aah gotcha
Javier Domingo Cansino
@txomon
Dec 30 2015 12:50
then concatenate that to the output of cu
this is the interface provided by the repository, meaning that I have an initial fetch, and updates to that initial fetch
Michael Nitschinger
@daschl
Dec 30 2015 12:51
So you wantfrom N List<Contact> to N * list.size() emits in the first step
for the second part - you can’t concatenate on cu since its infinite, right?
if we speak in rx concat terms
Javier Domingo Cansino
@txomon
Dec 30 2015 12:52
cl is Observable.just(List<Contact>)
Michael Nitschinger
@daschl
Dec 30 2015 12:52
why does clnot use Observable.from(List<Contact>) ?
Javier Domingo Cansino
@txomon
Dec 30 2015 12:53
hmmmm that would really simplify this xD
Michael Nitschinger
@daschl
Dec 30 2015 12:53
but even if you have many lists emitted
you can simply extract it
imagine:
Observable.just(List, List, List).flatMap(l -> Observable.from(l)).forEach(println)
that will flatten your original list into one emit for each containing element
Javier Domingo Cansino
@txomon
Dec 30 2015 12:54
oh...
I didn't know flatMap() did that
ok, so we have step one
Michael Nitschinger
@daschl
Dec 30 2015 12:55
well its not flatMap itself, but the combination of FlatMap and from
Javier Domingo Cansino
@txomon
Dec 30 2015 12:55
yes, I had done that, but with map instead of flatmap
Michael Nitschinger
@daschl
Dec 30 2015 12:56
yeah then you get the Observable<Observable>> thing
:D
Javier Domingo Cansino
@txomon
Dec 30 2015 12:56
indeed xD
Michael Nitschinger
@daschl
Dec 30 2015 12:56
and flatmap “flattens” it out for you
into one target observable
Javier Domingo Cansino
@txomon
Dec 30 2015 12:56
now the second step would be the concat
Michael Nitschinger
@daschl
Dec 30 2015 12:56
yes, but concat appens obs1 to obs2 when obs2 is done
but your hot stream, is it finite or infinite?
updates sounds infinite to me
Javier Domingo Cansino
@txomon
Dec 30 2015 12:57
cl is finite (one time) and cu is infinite
so it would be to concat initialization + updates
(initialization goes first)
Michael Nitschinger
@daschl
Dec 30 2015 13:00
ah well then did you try the concat operator?
Javier Domingo Cansino
@txomon
Dec 30 2015 13:00
    return Observable.concat(
            contactRepository.getContactList().flatMap((a) -> Observable.from(a)),
            contactRepository.getContactUpdates()
    );
that was what I had done
Michael Nitschinger
@daschl
Dec 30 2015 13:01
okay
Javier Domingo Cansino
@txomon
Dec 30 2015 13:01
the thing is that I don't have really clear where should I do the buffering
Michael Nitschinger
@daschl
Dec 30 2015 13:01
what do you mean with buffering? who should buffer what?
Javier Domingo Cansino
@txomon
Dec 30 2015 13:02
sorry, I am mixing stuff
Michael Nitschinger
@daschl
Dec 30 2015 13:02
:)
Javier Domingo Cansino
@txomon
Dec 30 2015 13:02
so contactRepository is the data layer
now I want that the business layer (where this function is), merges all the items until someone subscribes
Michael Nitschinger
@daschl
Dec 30 2015 13:03
aah now I see where you are going
Javier Domingo Cansino
@txomon
Dec 30 2015 13:04
that would be using contact repository to create a hot observable with all the items
Michael Nitschinger
@daschl
Dec 30 2015 13:04
so first, even if one of the inner observables is hot, the outer one is cold so you’ll only see events if someone subscribes.. and what happens to the hot one below depends on the subject that you use
Javier Domingo Cansino
@txomon
Dec 30 2015 13:04
yeah, the concat needs something after it
Michael Nitschinger
@daschl
Dec 30 2015 13:04
keep in mind that hot observables, when wrapped, can become cold again (check Observable.defer for example)
Javier Domingo Cansino
@txomon
Dec 30 2015 13:05
sure
Michael Nitschinger
@daschl
Dec 30 2015 13:05
so you want to cache it basically?
Javier Domingo Cansino
@txomon
Dec 30 2015 13:05
I want to cache, deleting outdated items
Michael Nitschinger
@daschl
Dec 30 2015 13:05
there is a cache operator ;)
Javier Domingo Cansino
@txomon
Dec 30 2015 13:05
oh
let me see..
replay I think is the stuff you want
and you can bound it so it throws away old stuff
Javier Domingo Cansino
@txomon
Dec 30 2015 13:07
but what I need to do is to create a cache that caches the latest items of every Contact id
.cache() doesn't let me filter them
Michael Nitschinger
@daschl
Dec 30 2015 13:08
aaah now I get what you want :D
Javier Domingo Cansino
@txomon
Dec 30 2015 13:08
I would need to hold a list of contacts within my business logic that caches all the contacts, and updates them cached
Michael Nitschinger
@daschl
Dec 30 2015 13:08
you always want access to the latest contact informating including updates
latest one for every contact
Javier Domingo Cansino
@txomon
Dec 30 2015 13:08
indeed, this is now the second layer
Michael Nitschinger
@daschl
Dec 30 2015 13:09
well, how many contacts do you have? because if you store aggregated list, this could get huuuge right?
Javier Domingo Cansino
@txomon
Dec 30 2015 13:09
I need to hold them anyway, so it doens't matter
anyway, this is going to be a listview later in the app
Michael Nitschinger
@daschl
Dec 30 2015 13:10
well, there is a way to do it
remember, there is always a way in rx :D
Javier Domingo Cansino
@txomon
Dec 30 2015 13:10
yeah
I was thinking on distinct
Michael Nitschinger
@daschl
Dec 30 2015 13:10
so then before the cache() (which is your last operator then), you add
Javier Domingo Cansino
@txomon
Dec 30 2015 13:11
btw, I think the documentation is incomplete...
Michael Nitschinger
@daschl
Dec 30 2015 13:12
no, you want scan()
Javier Domingo Cansino
@txomon
Dec 30 2015 13:12
documented I mean
Michael Nitschinger
@daschl
Dec 30 2015 13:12
sorry I meant replay instead of cache for the latest
but before that put scan
initialize it with an empty Map (or set) and then on every call you get access to it and add it
then it will be emitted
Javier Domingo Cansino
@txomon
Dec 30 2015 13:13
hmmm ok
Michael Nitschinger
@daschl
Dec 30 2015 13:13
check out the docs for it.. scan is normally used to add values for example, but since the Map will be stored as state
its pretty neat
let me see maybe I can show it quickly
Javier Domingo Cansino
@txomon
Dec 30 2015 13:15
I will be using just a list
Michael Nitschinger
@daschl
Dec 30 2015 13:15
don’t use a list
Javier Domingo Cansino
@txomon
Dec 30 2015 13:15
I don't really need a Map, do I?
why?
Michael Nitschinger
@daschl
Dec 30 2015 13:15
well, you only want the last version of each
if you append to a list you get an ever growing list (unless you cut it down) of changes
Javier Domingo Cansino
@txomon
Dec 30 2015 13:16
yes, so I remove the item and append the new one
Michael Nitschinger
@daschl
Dec 30 2015 13:16
I thought you only want the latest version of each contact
Javier Domingo Cansino
@txomon
Dec 30 2015 13:16
that way I can show them by update time
Michael Nitschinger
@daschl
Dec 30 2015 13:22
try this
    public static void main(String... args) throws Exception {
        Observable
          .interval(1, TimeUnit.SECONDS)
          .map(num -> "user" + (num%5) + "-v" + num)
          .scan(new HashMap<>(), (map, value) -> {
              String[] info = value.split("-");
              map.put(info[0], info[1]);
              return map;
          })
          .filter(map -> !map.isEmpty())
          .toBlocking()
          .forEach(System.out::println);
Javier Domingo Cansino
@txomon
Dec 30 2015 13:22
    return Observable.concat(
            contactRepository.getContactList().flatMap((a) -> Observable.from(a)),
            contactRepository.getContactUpdates()
    ).scan(
            new ArrayList<Contact>(),
            (list, contact) -> {
                for(Contact contactInList : list){
                    if(contactInList.getUserId() == contact.getUserId()){
                        list.remove(contactInList);
                    }
                }
                list.add(contact);
                return list;
            }
    );
Michael Nitschinger
@daschl
Dec 30 2015 13:23
in my sample code, you’ll see how the map builds up and then refreshes:
{user0=v0}
{user1=v1, user0=v0}
{user1=v1, user2=v2, user0=v0}
{user1=v1, user2=v2, user0=v0, user3=v3}
{user1=v1, user2=v2, user0=v0, user3=v3, user4=v4}
{user1=v1, user2=v2, user0=v5, user3=v3, user4=v4}
{user1=v6, user2=v2, user0=v5, user3=v3, user4=v4}
{user1=v6, user2=v7, user0=v5, user3=v3, user4=v4}
{user1=v6, user2=v7, user0=v5, user3=v8, user4=v4}
{user1=v6, user2=v7, user0=v5, user3=v8, user4=v9}
{user1=v6, user2=v7, user0=v10, user3=v8, user4=v9}
Javier Domingo Cansino
@txomon
Dec 30 2015 13:24
hmmm
Michael Nitschinger
@daschl
Dec 30 2015 13:24
so it always keeps the latest state emitted for the object
@txomon your for loop has o(N) complexity while the hash replace is just O(1)
Javier Domingo Cansino
@txomon
Dec 30 2015 13:25
@daschl yeah, but I need it in order
Michael Nitschinger
@daschl
Dec 30 2015 13:25
okay, then maybe use a queue where you can push/pop?
Javier Domingo Cansino
@txomon
Dec 30 2015 13:25
but items are not updated in order
Michael Nitschinger
@daschl
Dec 30 2015 13:25
so what order do you need?
Javier Domingo Cansino
@txomon
Dec 30 2015 13:26
yes, I have just realized I haven't described the app
Michael Nitschinger
@daschl
Dec 30 2015 13:26
Note that there is also a TreeMap which sorts the keys by its Comparator
Javier Domingo Cansino
@txomon
Dec 30 2015 13:27
the app is meant to be a view of your contacts, where instead of being ordered by alphabetical order or so, it's ordered by the latest updated
such as the call app in android, which shows the recently contacted ones
Michael Nitschinger
@daschl
Dec 30 2015 13:29
@txomon implement the comparable on that property and use a https://docs.oracle.com/javase/7/docs/api/java/util/TreeSet.html
so your set will be ordered by the update date
Dorus
@Dorus
Dec 30 2015 13:29
@txomon This somehow remind me of DynamicData. It's in C# but it also has observable collections, might give you some ideas.
Javier Domingo Cansino
@txomon
Dec 30 2015 13:29
oh so instead of ordering them by the Container you use, you order them by a property of the object
ok
Michael Nitschinger
@daschl
Dec 30 2015 13:30
so you get an automatic updating list of contacts which emits a new version of itself on every change!
Javier Domingo Cansino
@txomon
Dec 30 2015 13:30
also, @daschl how may I have that hashmap in my business logic?
Michael Nitschinger
@daschl
Dec 30 2015 13:31
like in my example above
instead of your scan use mine, but use the TreeSet
Javier Domingo Cansino
@txomon
Dec 30 2015 13:31
hmmm but that doesn't let me create a layer above that...
Michael Nitschinger
@daschl
Dec 30 2015 13:31
why?
Javier Domingo Cansino
@txomon
Dec 30 2015 13:31
let me think about it
Javier Domingo Cansino
@txomon
Dec 30 2015 13:32
so ListViewAdapter needs a list underneath, which I should ping when the dataset is updated
@daschl but isn't generating a new list...
ohhhh
ok
so I am not really generating a new list each time
is just that I am getting pinged when it updates
ok
let me have a look on that...
@daschl so can that list be stored in the class rather than in the pipeline?
I suppose it can...
Michael Nitschinger
@daschl
Dec 30 2015 13:35
what do you mean?
Javier Domingo Cansino
@txomon
Dec 30 2015 13:36
nah, just java problems, I can substitute the new HashMap by a reference to an internal variable =)
Michael Nitschinger
@daschl
Dec 30 2015 13:36
heh okay.. gotta run for now I hope I could help
Javier Domingo Cansino
@txomon
Dec 30 2015 13:36
indeed! thank you very much!
Michael Nitschinger
@daschl
Dec 30 2015 13:37
cheers
if you are looking for a database which has an rx SDK, ping me ;)
Javier Domingo Cansino
@txomon
Dec 30 2015 13:37
haha ok!
@Dorus indeed it looks like it