These are chat archives for ReactiveX/RxJava

8th
Mar 2016
Roland Tepp
@luolong
Mar 08 2016 12:44
Hey, I need advice of how would it be to proceed the best
I have been playing around with RxJava for a week now and I’ve gotten some basic workflow working
What I am currently doing: Reading files from a specified location line by line, then parsing those lines to a somewhat nicer structure and doing some analysis on that structure.
What I want - since the whole operation can take a while, I want to present a nice progress bar while the part that reads files is processing.
Dorus
@Dorus
Mar 08 2016 13:09
How do you know the progress? Do lines contain information on that? Do you know the total nr of lines? filesize?
If lines contain that info you could publish the results and subscribe boththe data analyse and the progress bar.
If you need to read the filesize, might as well combine it with the file reader logic.
I m assuming you have an observable that publish every line one at the time trough onNextand do the processing on those lines inside map or other operators?
Dorus
@Dorus
Mar 08 2016 13:15
I would do something along these lines (pseudo code)
public Observable readMyFile(File file, IProgress progress) {
    return Observable.create(ob => {
        int lines = 0;
        string line;
        while(!ob.isUnsubscribed() && file.hasNextLine()) {
            try {
                line = file.readLine();
            } catch (exception e) {
                 ob.onError(e);
                 return;
            }
            ob.onNext(line);
            progress.set(lines++ / file.totalLines);
        }
        ob.onCompleted();
    });
}
@luolong Is that helpfull?
Roland Tepp
@luolong
Mar 08 2016 13:19
well, I do have an observable that walks the directory and emits series of paths
(using Files.walk underneath)
then I have a transformation that flatmaps the files to strings using call to Files.lines
I know the total length of all the files (simple matter of walking over the same set of files and summing their sizes)
The whole analyses flow can be expressed in this simple snippet:
        analyser.paths()
            .compose(PathObservable.toLines())
            .map(LogLine::parse)
            .subscribe(stats.asSubscriber());
Dorus
@Dorus
Mar 08 2016 13:24
aah okay
So if i understand it right, you know the file size, and how many bytes are processed (by looking at line length), and want to extract progress from that?
Roland Tepp
@luolong
Mar 08 2016 13:26
so after processing each line, I can detect the progress quite easily by using length of a line as a number of ticks in the progress
well, extracting progressas a value is no problem
but I do not seem to be able to read the progress while the analysis is running.
Dorus
@Dorus
Mar 08 2016 13:29
mm, interesting problem. I got a few ideas but i m not sure if they fit.
Roland Tepp
@luolong
Mar 08 2016 13:29
in Java 8 streams version of this I used Stream.peek to intercept each line and calculate the progress
Dorus
@Dorus
Mar 08 2016 13:30
you could use .do or multicast.
to intercept the lines.
Roland Tepp
@luolong
Mar 08 2016 13:30
multicast … sounds promising...
Roland Tepp
@luolong
Mar 08 2016 13:32
aha .. seems like what I need ...
khm …. .dooperator seems also kind of useful…
which one would you recommend taking a look at first?
Dorus
@Dorus
Mar 08 2016 13:34
do is probably easier
With publish i wonder if the overload publish(func1<>) does what i think it does. If it s similar to RxJS let it could be easy also.
source.publish(ob =>
    Observable.zip(
        ob.countLines()
      , ob.map(LogLine::parse)
      , (count, line) => { ... }
    )
)
dwursteisen
@dwursteisen
Mar 08 2016 13:53
When building an Observable using Observable.create(...), is the backpressure support mandatory ? (ie: should I set a producer ?)
Dorus
@Dorus
Mar 08 2016 14:09
I doubt it, considering the existence of MissingBackpressureException (and that many sources can't implement it either).
dwursteisen
@dwursteisen
Mar 08 2016 14:11
Yup, but is it better if I add backpressure support ? I wonder if adding it in a webservice call made any sense : a call like this is "one shoot".
David Stemmer
@weefbellington
Mar 08 2016 16:01
If you want back pressure support for your ObSubscribe
I haven't used it but a lot of people are talking about it in the ASG (Android Study Group) slack grouo
dwursteisen
@dwursteisen
Mar 08 2016 16:15
thanks !