Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Brian Cavalier
    @briancavalier
    The sink API is simply a way to inform a sink that an event happened at a particular time. Actually, scheduling events is the responsibility of a scheduler.
    Micah
    @micahscopes
    This makes sense now! The experiment was a failure but I still like the high level "warp" API attempted there. If it was possible to implement an API like that for streams with prescheduled/preschedulable events, I'd be interested in understanding how. Seems like you'd need to get a Stream's Scheduler from that stream and somehow create a duplicate of the Scheduler itself with just the re-scheduled (and transformed) events just from that stream. Sounds complicated but maybe it wouldn't be too hard to do.
    Antti Niemenpää
    @niant
    I have a question about the sample and the adapter. If I call adapter right after runEffects it seems to never trigger the dependent sample stream, unless I put it around some async operation (setTimeout).. Does someone know why so? I have some theories, but no idea if they are correct.
    const x$ = now('x');
    const [setY, y$] = createAdapter();
    
    const test$ = tap(
      (x) => {
        console.log('never here', x);
      },
      sample(x$, y$)
    );
    
    runEffects(test$, newDefaultScheduler());
    
    setY('y'); // never triggers sample stream
    
    // This async would work
    // setTimeout(() => setY('y'))
    Tylor Steinberger
    @TylorS
    @micahscopes It's not as interesting as you're looking to do, but our mergeMapCorruently/chain implementation uses relative schedulers when creating new streams to augment the Scheduler instance, and may be of some interest to you - https://github.com/mostjs/core/blob/master/packages/core/src/combinator/mergeConcurrently.ts
    Tylor Steinberger
    @TylorS

    @niant You're running into the impedance mismatch of @most/adapter and the rest of @most/core. @most/core will always start emitting events in the next tick of the event loop. Here are the associated docs. It helps avoid race conditions within the library and will allow a Stream to synchronously return a Disposable instance before emitting any events so they can always be canceled. @most/adapter does not follow this rule, and it is up to the user to use it "appropriately" for the sake of adapting to external libraries that don't lend themselves to a more declarative approach that most favors.

    So what's actually happening? When you call setY synchronously, no other x$ values have occurred so there are no events to sample from. When you call setY asynchronously, the scheduled x value will have had the time to asynchronously emit and calling setY does have a value to sample from.

    Antti Niemenpää
    @niant
    Thanks @TylorS for the explanation! I read the "always async" part from the documentation before and that's why I got confused because it seemed it didn't apply in this example. But since adapter does not follow that rule, it explains it perfectly.
    1 reply
    Micah
    @micahscopes
    @TylorS I hope you'll find it flattering and won't be offended that I basically just stole your "most-virtual-scheduler" code and changed all the abstractions to make a media oriented scheduler :sweat_smile: ... it's basically perfect for what I need. I'm planning on adding some more features, but for now I just renamed everything :grimacing:
    Micah
    @micahscopes
    Also planning on adding some Web Audio based scheduling tools to progress through the scheduler at a dynamic, precise rate
    gonna use it with this: https://www.npmjs.com/package/wa-metro
    (Where the metronome callback will just pulse the timer by 1 "tick" at a time. Standard midi files usually schedule events at something like 96 ticks/beat)
    Tylor Steinberger
    @TylorS
    Definitely not offended, OSS is meant to be shared and reused IMO :smile:
    I'm glad you were able to make some progress on what you need
    Micah
    @micahscopes
    @TylorS I tried it out and it worked great! Except for the timing issues caused by doing timing in the main thread where all the heavy react stuff and game animation were also being computed. So my next step is to attempt using the @most stuff in an audioworklet processor... stay tuned, I'll share more when I've made some progress. I'm really pumped to be able to do generative music stuff with @most and lodash or whatever, it's a long time dream come true
    If I'm successful, it seems like it'd be useful to share any extra utilities I make for doing that in my little fork
    Micah
    @micahscopes
    I have a periodic stream and I'd like to make a new periodic stream derived from it that only includes every Nth event. How can I do this?
    Tylor Steinberger
    @TylorS
    Hey @micahscopes you might be able to do what you need using loop and switchLatest/chain/mergeConcurrently depending on the concurrency you need. Something like this to do the nth of a Stream might help
    const nth = <A>(n: number, stream: Stream<A>) => 
      switchLatest(loop(
        (acc, a) => acc === n  
          ? { seed: 0, value: now(a) } 
          : { seed: acc + 1, value: empty() }, 
        0, 
        stream
      ))
    Tylor Steinberger
    @TylorS
    From there you should be able to construct a higher-order stream for the periodic portions
    const stream = chain(
      () => nth(n2, periodic(n3)),
      periodic(n1)
    )
    Micah
    @micahscopes
    awesome, thanks @TylorS
    Micah
    @micahscopes
    by the way, I got my scheduler working inside of an audioworklet processor, sending MIDI events to another audioworklet processor! it's awesome
    Micah
    @micahscopes
    just wanted to share that I've pretty much finished prototyping using @most/core inside of an AudioWorkletProcessor to load, generate, schedule and transform MIDI events that get played by this synth
    I'm planning on releasing this stuff as a little toolkit once I get a chance to clean it up and organize it!
    Micah
    @micahscopes
    Is there a way to manually clean up references from old events in a scheduler during a runEffects of a perpetual stream? I'm using this big giant aggregate stream that I just pass into runEffects and am leaking memory somehow, at a rate of about ~3MB/hour, maybe because of how I'm generating random values using patterns like
    periodic(t)
      |> filter(() => Math.random() < 0.4)
      |> delay(d)
      |> map(() => fnThatGeneratesLotsOfObjects()
      |> ... // and more stuff that generates objects
    ^ there's the heap profile, you can see that most of the pileup is in SchedulerImpl.scheduleTask, so I assume maybe it's the scheduler's Timeline growing unboundedly
    Maybe the simplest way to deal with this would be to just call runEffects periodically
    Micah
    @micahscopes
    I did do a test and found that there's nothing sticking around after runEffects finishes, so it's airtight on that level
    (a screenshot of the heapprofile shared above): Screenshot from 2021-09-04 11-58-19.png
    Tylor Steinberger
    @TylorS

    Hey @micahscopes :wave:

    I'm glad to hear you've got your prototype working, congrats!

    There is 1 method for cleanup up a Scheduled task from the Timeline here on the scheduler, but oftentimes you won't have direct access to those scheduled events https://github.com/mostjs/core/blob/master/packages/types/index.d.ts#L50
    I'm not familiar enough with the heap profile to decipher and help from there, but only a few operators make heavy use of the scheduler
    Tylor Steinberger
    @TylorS
    now, at, and some of the other synchronous constructors use it only once to schedule the start of the stream, periodic being the expeception which schedules over and over again. On the transformation side though, delay, debounce, and throttle utilize the scheduler pretty aggressively but it'll only ever have a single scheduled task at a time per execution context
    Tylor Steinberger
    @TylorS
    Actually, that's not true of delay - it creates n number of tasks per n number of events. So if you're encountering a lot of memory usage it's probably most likely from delay
    Micah
    @micahscopes

    @TylorS the heap chart seems to agree with your observations, if I'm reading it right. DelaySink.event has the most scheduleTask memory usage, followed by TapSink.event and then FilterSink.event.

    The thing I don't get is just exactly how streams that have ended get cleaned up. If I schedule the streams in the exact same way, except that I give them an end and run them consecutively, the heap allocations get garbage collected.

    The only thing I can think is happening is that maybe scheduled events are hanging around in the Timeline and this is why memory usage is accumulating? Maybe somehow running cancel for past events would solve my problem?

    Tylor Steinberger
    @TylorS
    It's pretty much built into runEffects via this Sink instance. When sink.end or sink.error is called it will dispose of the Disposablereturned by calling Stream.run(sink, scheduler)
    As you construct a stream graph, many combinators will call stream.run(sink, scheduler) and collect disposables into their own graph of sorts, and that allows ultimately collecting them into a single Disposableinstance when running the resulting Stream
    Those ScheduledTask instances that are returned by the scheduler implement the same Disposable interface, so they also get kept in that Disposable graph as well
    Micah
    @micahscopes
    Mmm, I see. So when I'm using mergeArray to collect all my little streams into one giant stream, I'm also collecting their Disposables into a single Disposable instance
    Something I've thought about doing instead of that mega merged stream is using higher order streams that get passed to tap(stream => runEffects(stream, scheduler))
    Seems like that should solve my issue if it means dispose is getting called more regularly!
    Micah
    @micahscopes
    So for example if I wanted to loop a little melody over 12 bars I might do something like:
    periodic(12*BAR)
    |> map(() => melody$ |> until(12*BAR)
    |> tap(stream => runEffects(stream, scheduler))
    (currently I'm using switchLatest to get that effect, but I'm seeing now how that's probably deferring dispose indefinitely!)
    @TylorS wow... okay thank you! I think I'm starting to understand how this Disposable stuff works in practice
    Micah
    @micahscopes
    K so I have verified that this ^ is saving me at least 4x on accumulated heap allocations over ~10 hours of simulated music play!
    That's huge
    matt penrice
    @elmpp
    Hi there. I'd love someone's opinion on this mostjs question - cujojs/most#551
    jarkkoskyttala
    @jarkkoskyttala

    Hi,

    We have been struggling with some strange behaviour regarding combineArray and sample. Could you take a look at this test and let me know if you have any idea why result$ is not triggering? pipe function is from fp-ts. The test passes if I remove the hold from s2$

    it('should trigger with result', () => {
            const scheduler = defaultScheduler();
            const s$ = at(100, 'test');
    
            const s2$ = pipe(
              map(() => 'test-2', s$),
              hold
            );
    
            const result$ = sample(
              combineArray(
                (x, y) => ({ x, y, }),
                [s2$, s$]
              ), s2$);
    
            const expected = {
              x: 'test-2',
              y: 'test',
            };
    
            const test_X$ = tap((x) => {
                expect(x).to.eql(expected);
              }, result$);
    
            runEffects(test_X$, scheduler);
          })
    jarkkoskyttala
    @jarkkoskyttala
    I can also make the test pass by switching s2$ and s$ locations in the combineArray array, so it will take [s$, s2$].
    Micah
    @micahscopes
    nothing like a couple of well placed multicast calls to fix all the bugs with side effects happening multiple times
    Galileo Sanchez
    @galileopy
    Intuitively I sense that event streams should lead to a natural pattern for rate limiting, but I haven't give this too much though yet. Has anybody ever tried this?