These are chat archives for pozadi/kefir

24th
Mar 2017
Reko Tiira
@rekotiira
Mar 24 2017 13:22

@rpominov I bumped to a situation like this with Kefir:

stream
    .flatMapConcat(it => Kefir.fromPromise(funcThatProducesPromise(it)))
    .log('value')

One would expect that there would only be one funcThatProducesPromise running at any given time, but for some reason Kefir calls the function that produces the flatmapped stream eagerly, and buffers the produced streams instead, effectively calling funcThatProducesPromise as soon as stream produces new value, even if the previous one is still running. To me this seems pretty confusing design choice, I think it'd be more logical to buffer the inputs and only call the producer function when the previous stream has ended (this obviously affects flatMapConcurLimit as well). I tested the same thing with Bacon to see whether it's just me who feels this is wrong, but seems like at least Bacon does it the way I'd expect. I "fixed" it by making the promise-function calling lazy by doing:

.flatMapConcat(it => Kefir.constant().flatMap(() => Kefir.fromPromise(funcThatProducesPromise(it))))

But still, I'm wondering if this can be considered as a "bug". It can lead to quite substantial bugs as well, for example in my case I was implementing a worker queue with concurrency limit using flatMapConcurLimit, and it worked fine when there weren't new jobs coming that often, but sometimes when multiple jobs started in short time, the worker queue ended up draining all connections from postgre pool, which caused the whole worker process to completely hang and not do anything. Nasty.