These are chat archives for ReactiveX/RxJava

31st
Aug 2018
blocklinkrob
@blocklinkrob
Aug 31 2018 19:35
Hello folks, I'm currently trying to port an application I had built in Electron using node.js to JavaFX. I relied heavily on RX in general for the initial effort (rxjs in particular of course) and I am having some trouble getting the semantics right for RxJava. Should mention I am using Kotlin, but I am not (intentionally, anyways) using RxKotlin per se.
I had written a lot of code to extend the Observable prototype in rxjs, and I am trying to translate these into extension methods in Kotlin.
function debounceOnMissedHeartbeat<TKey, TValue> (dueTime:number, onDebounceItemFactory:(key:TKey) => TValue, scheduler:Rx.Scheduler) {
  let sources:Rx.GroupedObservable = this
  return Rx.Observable.create(o => {
    return sources.subscribe(innerSource => {
      let key:TKey = innerSource.key
      let debouncedStream = innerSource.debounceWithSelector(dueTime, () => onDebounceItemFactory(key), scheduler)
      o.onNext(debouncedStream)
    },
    ex => o.onError(ex),
    () => o.onCompleted()
    )
  })
}
Rx.Observable.prototype.debounceOnMissedHeartbeat = debounceOnMissedHeartbeat
function debounceWithSelector (dueTime:number, itemSelector, scheduler:Rx.Scheduler) {
  let source = this
  return Rx.Observable.create(o => {
    let disposables = new Rx.CompositeDisposable()
    let debounceDisposable = new Rx.SerialDisposable()
    disposables.add(debounceDisposable)
    let debounce = () => {
      debounceDisposable.setDisposable(
        scheduler.scheduleFuture(
          '',
          dueTime,
          () => {
            let debouncedItem = itemSelector()
            o.onNext(debouncedItem)
          }
        )
      )
    }
    disposables.add(
      source.subscribe(
        item => {
          debounce()
          o.onNext(item)
        },
        ex => {
          try {
            o.onError(ex)
          } catch (err1) {
          }
        },
        () => o.onCompleted()
      )
    )
    debounce()
    return disposables
  })
}
Rx.Observable.prototype.debounceWithSelector = debounceWithSelector
blocklinkrob
@blocklinkrob
Aug 31 2018 19:42
In particular, in the first piece of code, where I call o.onNext(debouncedStream), this does not seem to be valid in RxJava...but works fine in rxjs
Here's my attempt in Kotlin btw:
fun <TKey, Any> Observable<GroupedObservable<TKey, Any>>.debounceOnMissedHeartbeat(dueTime: Long, onDebounceItemFactory:(key: TKey) -> Any, scheduler: Scheduler) =
        Observable.create<Any> { o ->
                this.subscribe({innerSource ->
                        val key: TKey? = innerSource.key
                        val debouncedStream = innerSource.debounceWithSelector(dueTime, onDebounceItemFactory(key), scheduler)
                        o.onNext(debouncedStream)
                },
                { ex: Throwable -> o.onError(ex) },
                { o.onComplete() })
        }