These are chat archives for ReactiveX/RxJava

29th
Mar 2017
Laurent
@Crystark
Mar 29 2017 13:36

Hey there. I'm using RxNetty and receiving an Observable<ByteBuf> as I download a very large gzip file. The thing is I don't want to download the whole file before starting to read it. I'd like to be able to do the following:

Observable<ByteBuf> o = ...;
Observable<String> byLine = o
    .compose(toCompositeByteBuf) // Single<CompositeByteBuf>
    .compose(toByteBufInputStream) // Single<ByteBufInputStream>
    .compose(toGZIPInputStream) // Single<GZIPInputStream>
    .compose(splitLines); // Observable<String>

The final observable would emit as many lines as are in the original file. My problem is in toCompositeByteBufand toByteBufInputStream: I don't want to wait for the CompositeByteBuf to be collected before starting to emit the lines and I need the InputStream to be disposed of only when all ByteBuf have been added to the CompositeByteBuf.
I was looking at Observable.using but I'm not sure how it can answer my problem. Do you guys have any idea how I can solve that ?

Laurent
@Crystark
Mar 29 2017 15:19
The more I think about this the more it seems I would need an observable-backed InputStream but that seems a bit too much for my current skills.
Michael Zinn
@RedNifre
Mar 29 2017 19:50
Is it possible to only get the most recent element of an observable or null if the observable has no elements yet?
BehaviorSubject comes close but it also gives me all the following elements. I only need one.
(I'm totally new to rx)
Fabricio
@ofabricio
Mar 29 2017 20:00
it seems it's BehaviorSubject what you want
Michael Zinn
@RedNifre
Mar 29 2017 20:01
So I just unsubscribe after the first onNext to make sure I only get one element?
Fabricio
@ofabricio
Mar 29 2017 20:01
if you only need one then .take(1)
Michael Zinn
@RedNifre
Mar 29 2017 20:01
ah! However, what if the subject is empty and I don't want to do anything if the first element comes later?
Fabricio
@ofabricio
Mar 29 2017 20:02
behavior requires an initial value (it can be initialized with null)
what are you trying to do?
Michael Zinn
@RedNifre
Mar 29 2017 20:06
Open a dialog box where the user can enter a username and password. It's an app, there's a persistence that provides an Observable<Credentials>. The idea is that when the dialog gets opened the input fields get filled with the username and password, if it exists, but the text fields shouldn't update later (it would override what the user is entering).
I COULD add a synchronous "getCredentials" method to the persistence, but I hope there's a way to have the Observable<Credentials> to be the only source of credentials comming from the persistence.
Fabricio
@ofabricio
Mar 29 2017 20:07
then thats it.
getCredentials().take(1).subscribe(cred => {
  if (cred == null) return;
  nameField = cred.name; 
  etc.
});
getCred stream must be a behaviorSub
Michael Zinn
@RedNifre
Mar 29 2017 20:09
Currently, the cred can't be null, the observable only emits valid credentials at the moment.
Fabricio
@ofabricio
Mar 29 2017 20:10
but the credential is filled in the login right?
Michael Zinn
@RedNifre
Mar 29 2017 20:11
if there are persisted credentials those will be filled into the dialog when the dialog opens. When the user entered credentials and hits save, the dialog pushes those credentials into another BehaviorSubject which is also injected in the persistence. The persistence then first saves those credentials and then pushes it to the other BehaviorSubject, which is also the one the dialog subscribes to. On startup, the persistence also pushes the credentials into the BehaviorSubject, if they exist.
Fabricio
@ofabricio
Mar 29 2017 20:16
so you're afraid it'll replace the field?
Michael Zinn
@RedNifre
Mar 29 2017 20:17
It won't happen in practice because the dialog is the only thing writing to the persistence and the dialog closes when you save. It works, but it doesn't feel correct.
Fabricio
@ofabricio
Mar 29 2017 20:18
but with take(1) it won't replace
Michael Zinn
@RedNifre
Mar 29 2017 20:18
Well, if I put an initial null in.
Michael Zinn
@RedNifre
Mar 29 2017 20:24
Well, given that it works I guess I'll leave it at that.
Thank you for your help :)
Fabricio
@ofabricio
Mar 29 2017 20:26
when the user starts typing anything you could unsubscribe too
Michael Zinn
@RedNifre
Mar 29 2017 20:26
Oh, that's a good idea.
Fabricio
@ofabricio
Mar 29 2017 20:27
getCredentials().takeUntil(typing$)
Michael Zinn
@RedNifre
Mar 29 2017 20:28
Another thing: The persistence provides an Observable<Credentials> where the credentials contain a URL, username and password, right? I'd like to make retrofit calls to that url with username and password as basic auth parameters. How do I map that Observable<Credentials> to an Observable<StuffYouCanGetFromThatRetrofitCall> ?
Hm, that might be more of a Retrofit question than an RxJava question I guess...
Fabricio
@ofabricio
Mar 29 2017 20:31
If I understood, that's a simple .mergeMap(cred => anotherCall(cred)), the stuff abt retrofit I dunno :smile:
Denis Stoyanov
@xgrommx
Mar 29 2017 21:59
@ofabricio do u use js examples? :smile:
Fabricio
@ofabricio
Mar 29 2017 22:00
@xgrommx it's universal :laughing:
Denis Stoyanov
@xgrommx
Mar 29 2017 22:04
I know that it is true (as u remember I also use js for .net and java rx)
one interface and contract and too much implementations :smile:
Denis Udovcic
@dudovcic
Mar 29 2017 22:58
Hey, Im in need to learn Java for a job interview soon, any recommendations in fastest way to learn most of Java OOP + SQL implementation with the language ?
Im already familiar with Nodejs and PHP + frameworks for those two so Im not a complete beginner