These are chat archives for ReactiveX/RxJava

Feb 2017
Feb 17 2017 14:37
Is it possible to map one Observable to another using the scheduler specified with subscribeOn?
Pavel Meledin
Feb 17 2017 15:12

Hi everyone, could somebody please give me a hint what a sequence of operators to use in order to have the following task done:

  1. As input for a stream I have pairs of (time and price)
  2. Then I need to group them per each 5 seconds based on time column, so I need to get a pair of (time and a list of prices)
  3. Then I need to convert each of pair of time and prices into the following format: case class Bar(time: Int, o: Int, h: Int, l: Int, c: Int, v: Int) where o - first price during this period of time, h - max price, l - lower price and c - just latest price appeared in a list.
  4. Then each such Bar needs to be sent further by stream to further subscribers.

Very important to get an updated bar from point 4 for each time when new pair of time and price were pushed into the stream.

Appreciate any help :-)

Feb 17 2017 18:24
@btbvoy you can use smth like groupBy + scan
            .groupBy {
                it.time / TimeUnit.SECONDS.toMillis(5)
            .flatMap {
                it.scan(ArrayList<Item>()) {
                    acc, el -> acc.add(el); acc
                        .filter { it.isNotEmpty() }

although there're possible memory issues if items() is infinite
Pavel Meledin
Feb 17 2017 20:04
@ViTORossonero thanks for example. would it work with window operation ?
Feb 17 2017 23:07
@btbvoy i think that depends on what and when input stream emits:
does time from input stream event pair somehow correlate with actual time when given event emits?
If yes, window is what you need