loop
and switchLatest/chain/mergeConcurrently depending on the concurrency you need. Something like this to do the nth
of a Stream might helpconst nth = <A>(n: number, stream: Stream<A>) =>
switchLatest(loop(
(acc, a) => acc === n
? { seed: 0, value: now(a) }
: { seed: acc + 1, value: empty() },
0,
stream
))
runEffects
of a perpetual stream? I'm using this big giant aggregate stream that I just pass into runEffects
and am leaking memory somehow, at a rate of about ~3MB/hour, maybe because of how I'm generating random values using patterns likeperiodic(t)
|> filter(() => Math.random() < 0.4)
|> delay(d)
|> map(() => fnThatGeneratesLotsOfObjects()
|> ... // and more stuff that generates objects
SchedulerImpl.scheduleTask
, so I assume maybe it's the scheduler's Timeline
growing unboundedly
runEffects
periodically
now
, at
, and some of the other synchronous constructors use it only once to schedule the start of the stream, periodic
being the expeception which schedules over and over again. On the transformation side though, delay
, debounce
, and throttle
utilize the scheduler pretty aggressively but it'll only ever have a single scheduled task at a time per execution context
@TylorS the heap chart seems to agree with your observations, if I'm reading it right. DelaySink.event
has the most scheduleTask
memory usage, followed by TapSink.event
and then FilterSink.event
.
The thing I don't get is just exactly how streams that have ended get cleaned up. If I schedule the streams in the exact same way, except that I give them an end and run them consecutively, the heap allocations get garbage collected.
The only thing I can think is happening is that maybe scheduled events are hanging around in the Timeline
and this is why memory usage is accumulating? Maybe somehow running cancel
for past events would solve my problem?
runEffects
via this Sink instance. When sink.end
or sink.error
is called it will dispose of the Disposable
returned by calling Stream.run(sink, scheduler)
stream.run(sink, scheduler)
and collect disposables into their own graph of sorts, and that allows ultimately collecting them into a single Disposable
instance when running the resulting Stream
ScheduledTask
instances that are returned by the scheduler implement the same Disposable
interface, so they also get kept in that Disposable graph as well
tap(stream => runEffects(stream, scheduler))
dispose
is getting called more regularly!
switchLatest
to get that effect, but I'm seeing now how that's probably deferring dispose
indefinitely!)
Disposable
stuff works in practice
Hi,
We have been struggling with some strange behaviour regarding combineArray and sample. Could you take a look at this test and let me know if you have any idea why result$ is not triggering? pipe function is from fp-ts. The test passes if I remove the hold from s2$
it('should trigger with result', () => {
const scheduler = defaultScheduler();
const s$ = at(100, 'test');
const s2$ = pipe(
map(() => 'test-2', s$),
hold
);
const result$ = sample(
combineArray(
(x, y) => ({ x, y, }),
[s2$, s$]
), s2$);
const expected = {
x: 'test-2',
y: 'test',
};
const test_X$ = tap((x) => {
expect(x).to.eql(expected);
}, result$);
runEffects(test_X$, scheduler);
})
stream$ <-- has a (outer) dispose function
stream$
.takeWhile <--- can end here, and (outer) dispose called here.
.continueWith <--- [1] logic to deal with early termination
.chain <--- logic here ...
.takeWhile <--- [2] ... can be used to end stream$ and call (outer) dispose.
would want it to look like thisstream$ <-- has a (outer) dispose function
stream$
.takeWhile <--- can end here, and (outer) dispose called here.
.chain <--- logic here ...
.takeWhile <--- [2] ... can be used to end stream$ and call (outer) dispose.
.subscribe (complete:....) <--- would want logic in [1] to go here, but [2] gets in the way
stream$