Neverlord on 1368
Provide `*_weak` variants of `s… Merge pull request #1368 Documentation nitpicks (compare)
Neverlord on 1370
Neverlord on master
address unsafe sprintf usage Merge pull request #1370 Style nitpick and 1 more (compare)
Neverlord on 1370
address unsafe sprintf usage Merge pull request #1370 Style nitpick (compare)
Neverlord on neverlord
Neverlord on master
Enable Cirrus while our Jenkins… Disable async.blocking_consumer… Merge branch 'topic/neverlord/c… (compare)
Neverlord on neverlord
Enable Cirrus while our Jenkins… Disable async.blocking_consumer… (compare)
self->current_mailbox_element()->move_content_to_message()
to have something to hold onto.
caf::binary_serializer
, and the output of that changed between 0.17 and 0.18, so we're in the process of transitioning our database state in a forward-compatible way.
Totally separate question:
self->set_down_handler(...);
auto handle = self->spawn<caf::monitored>(my_typed_actor, args...);
typed_actor<...>::behavior_type my_typed_actor(typed_actor<...>::pointer self, Args... args) {
if (!validate(args...)) {
self->quit(make_error(...));
return typed_actor<...>::behavior_type::make_empty_behavior();
}
return {
// ...
};
}
If the validation inside the my_typed_actor
function fails the down handler will never be triggered, despite the actor shutting down correctly. Is this a bug, or are we simply abusing the API here?
inspect
. So I guess the middleman can just transfer the serialized bytes from Arrow API, without incurring the serialization latency inside the middleman itself. May I know if this is true? But yeah for sure I will benchmark a bit to see the real effect. :)
caf.scheduler.max-threads
(https://actor-framework.readthedocs.io/en/stable/ConfiguringActorApplications.html).
caf::async::{consumer,producer}_resource<T>
. That neatly solves what I wanted to do.
sphinx-build
in the manual
directory and you'll have the version that has some initial documentation for the new flow API. :)
I see. I can see how a new stream<T>
built on-top of data flows can work much better than the existing streaming API, which was a pain to debug and had lots of shutdown order issues.
If I understand data flows correctly they must be fully configured before their hosting actors launch. This is a limitation compared to the existing experimental streaming API which was fully configurable externally, and integrated into the typed actor API. Will this limitation also exist for the new stream<T>
? I'm curious about your plans on the streaming front in general.
New streams would likely be required for us from my experiments because flows cannot replace the old streams yet, unless that is staying for 0.19. The lack of dynamic setup with flows between actors after they've started will be a major hassle for us, because it requires rewriting how we start our ingestion pipeline. I'll let you know if I encounter any major issues, but so far I've seen none in the playground I've set up for my experiments.
If you need any design input for the new streams I'm always open to chat since we're really deep into the streaming internals by now. Would be really interested in some design doc if you have something like that.
There's no design document, but here's a rough outline for how I picture the new API at the moment:
An actor creates a new stream simply from a factory object. Something like:
auto out = self->make_stream([self](observer<int> new_listener) {
return self->make_observable().iota(1).take(100).subscribe(new_listener); });
// decltype(out) = stream<int>
The stream handle itself is just a reference to the actor plus an ID. On the consumer side, you'd do self->observe_stream(hdl)
to obtain an observable<T>
and then you can apply all the operators you'd want.
observe_stream
, CAF sends an open handshake. On the receiver side, this creates an observer<T>
for the sender and calls the factory. There may be some optional arguments that allow you to fine-tune how the messaging after that looks like (micro-batch sizes, max. delay, etc.).
observable
and then bind that to the make_stream
factory to just call my_observable.subscribe(new_listener)
. You can also open up multiple streams and then merge
or zip_with
on the inputs.
How much boilerplate would that be? I imagine it to just be a single function call, which seems acceptable for embedding it in the typed actor signature:
[self](caf::stream<T> handle) -> caf::result<caf::stream_ack<T>>> {
auto [observable, ack] = self->observe_stream(handle);
std::move(observable).for_each(...);
return ack; // could also return an error, or a delegated<stream_ack<T>> here
},
If I understand correctly, a stream in the new model is just a shared spmc buffer resource with a few extra features built on top to allow for some routing and back pressure propagation. Any actor can create a hot observable from stream, and similar any actor can turn a hot observable into a stream. Is this mental model correct?
observe_stream
. That's going to be fine. I was talking about the internal workings of observe_stream
.