Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • Sep 29 14:27
  • Sep 28 22:00
  • Sep 28 08:35

    Neverlord on neverlord

    (compare)

  • Sep 28 08:34

    Neverlord on 0.18-ci

    (compare)

  • Sep 28 08:33

    Neverlord on 0.18

    Update CI Docker images (cherr… (compare)

  • Sep 28 08:33

    Neverlord on master

    Update CI Docker images Merge topic/neverlord/ci-update (compare)

  • Sep 28 08:13

    Neverlord on 0.18-ci

    Update CI Docker images (cherr… (compare)

  • Sep 28 07:56

    Neverlord on neverlord

    Update CI Docker images (compare)

  • Sep 28 07:49
    Neverlord assigned #1347
  • Sep 28 07:49
    Neverlord milestoned #1347
  • Sep 28 07:49
    Neverlord assigned #1341
  • Sep 28 07:49
    Neverlord assigned #1298
  • Sep 28 07:47
    Neverlord milestoned #1341
  • Sep 28 07:47
    Neverlord milestoned #1299
  • Sep 28 07:40

    Neverlord on neverlord

    Update CI Docker images (compare)

  • Sep 28 07:36

    Neverlord on neverlord

    Update CI Docker images (compare)

  • Sep 28 07:29

    Neverlord on neverlord

    (compare)

  • Sep 28 07:29

    Neverlord on master

    Implement buffer flow operator Merge topic/neverlord/flow-batc… (compare)

  • Sep 28 07:27

    Neverlord on master

    Fix typo: interruptable -> inte… (compare)

  • Sep 28 07:23

    Neverlord on neverlord

    (compare)

Dominik Charousset
@Neverlord
Depending on how long the migration goes, maybe you can skip 0.18 entirely and jump to 0.19. ;)
Good luck!
Dominik Lohmann
@dominiklohmann
Probably another 1-3 months, depending on customer needs :)
Dominik Lohmann
@dominiklohmann

Totally separate question:

self->set_down_handler(...);
auto handle = self->spawn<caf::monitored>(my_typed_actor, args...);
typed_actor<...>::behavior_type my_typed_actor(typed_actor<...>::pointer self, Args... args) {
  if (!validate(args...)) {
    self->quit(make_error(...));
    return typed_actor<...>::behavior_type::make_empty_behavior();
  }
  return {
    // ... 
  };
}

If the validation inside the my_typed_actor function fails the down handler will never be triggered, despite the actor shutting down correctly. Is this a bug, or are we simply abusing the API here?

Dominik Charousset
@Neverlord
I'd say it should trigger down messages. Do you have a minimal example that reproduces this?
Dominik Lohmann
@dominiklohmann
I'll open a PR with a unit test, assuming it still fails with 0.18 you can take it from there.
Dominik Charousset
@Neverlord
👍
Yifei Yang
@Yifei-yang7
Hi, may I know if message communication between cross-node actors is able to saturate the network bandwidth? For example if I try to transfer a large Arrow table between cross-node actors. Thank you!
Dominik Charousset
@Neverlord
The general advise we give is preferring small messages. Utilizing bandwidth shouldn't be an issue, but if you force the middleman to serialize KB or even MB worth of data it will 'block' him for quite a bit. So you could see latency spikes from such 'jumbo messages'. I'd say build a small prototype and measure how much latency you actually cause. The Prometheus metrics for the middleman should give you a good idea. Limiting the arrow tables to some reasonable size could also be a good idea if that's reasonable for your application.
Yifei Yang
@Yifei-yang7
Thanks for your advice. For serialization of middleman, as Arrow provide built in serialization methods, I call it inside inspect. So I guess the middleman can just transfer the serialized bytes from Arrow API, without incurring the serialization latency inside the middleman itself. May I know if this is true? But yeah for sure I will benchmark a bit to see the real effect. :)
Dominik Charousset
@Neverlord
Even if the serialization itself takes no time, you still have to copy data into the socket buffer. If that's a couple MB worth of data, it still can cause quite a bit of delay. Measurements are your friends. :)
Yifei Yang
@Yifei-yang7
got it, thanks! :-)
Kyle Klenk
@KyleKlenk
Hi, I would like to now if CAF can be configured to use green threads instead of hardware threads? We are running into issues where CAF is spawning more threads than cores on some of our systems and our admins are recommending that we consider using green threads.
Dominik Charousset
@Neverlord
CAF usually starts more threads than cores since slightly "overbooking" the system usually results in better performance. Some threads invariably have to wait at some point (e.g. due to memory access) and the extra threads can fill the gaps. The analogous abstraction to green threads in CAF are the actors, they all share a single thread pool for execution. The size of that thread pool can be configured with the parameter caf.scheduler.max-threads (https://actor-framework.readthedocs.io/en/stable/ConfiguringActorApplications.html).
Kyle Klenk
@KyleKlenk
Thanks for the explanation and pointing me to the configuration for specifying the number of threads.
Dominik Lohmann
@dominiklohmann
I've been looking at Flow recently, and was wondering how result types of observables integrate with typed actors? Is there an example for this, or are they just always lazily typed via caf::message?
Ah nvm, just came across caf::async::{consumer,producer}_resource<T>. That neatly solves what I wanted to do.
Dominik Charousset
@Neverlord
In general: an observable is private to an actor, so that wouldn't go into an actor interface. That's what the new stream<T> is going to be all about. The async building blocks only work for concurrency.
There's also some documentation available, btw.
Just do sphinx-build in the manual directory and you'll have the version that has some initial documentation for the new flow API. :)
Or take the zip file.
Dominik Lohmann
@dominiklohmann

I see. I can see how a new stream<T> built on-top of data flows can work much better than the existing streaming API, which was a pain to debug and had lots of shutdown order issues.

If I understand data flows correctly they must be fully configured before their hosting actors launch. This is a limitation compared to the existing experimental streaming API which was fully configurable externally, and integrated into the typed actor API. Will this limitation also exist for the new stream<T>? I'm curious about your plans on the streaming front in general.

Dominik Charousset
@Neverlord
You can spin up flows at any time. You've probably seen the new spawn_inactive setups? You only need those if you're using observe_on, because the target actor for this operator must not run yet.
And, yeah. The old streams are going away for a reason. ;)
Dominik Lohmann
@dominiklohmann
I think there's a bug in the image for the concat operator: The concatenated stream can only start after the first stream is done, right?
Nvm, it actually can do the first one eagerly, that issue only occurs with at least three observables
Dominik Lohmann
@dominiklohmann
Currently just trying to get a feel of the API and what it's use cases and limitations are, so I started by comparing it to how we use the current streaming APIs. Kinda excited to toy around with it. I assume the new streaming stuff on top of it is all targeted for CAF 0.19?
Dominik Charousset
@Neverlord

Let me know if you do find bugs. The current flow branch is pretty much on its way back to master.

For CAF 0.19 we'll have flows and websockets (with caf_net), I'd like to see the new streams making the cut as well.

Dominik Lohmann
@dominiklohmann

New streams would likely be required for us from my experiments because flows cannot replace the old streams yet, unless that is staying for 0.19. The lack of dynamic setup with flows between actors after they've started will be a major hassle for us, because it requires rewriting how we start our ingestion pipeline. I'll let you know if I encounter any major issues, but so far I've seen none in the playground I've set up for my experiments.

If you need any design input for the new streams I'm always open to chat since we're really deep into the streaming internals by now. Would be really interested in some design doc if you have something like that.

Dominik Charousset
@Neverlord

There's no design document, but here's a rough outline for how I picture the new API at the moment:

An actor creates a new stream simply from a factory object. Something like:

auto out = self->make_stream([self](observer<int> new_listener) {
  return self->make_observable().iota(1).take(100).subscribe(new_listener); });
// decltype(out) = stream<int>

The stream handle itself is just a reference to the actor plus an ID. On the consumer side, you'd do self->observe_stream(hdl) to obtain an observable<T> and then you can apply all the operators you'd want.

Dominik Lohmann
@dominiklohmann
How does the stream handshake work in the new model? Mostly interested in how I would translate our current sources, stages, and sinks.
Also, this probably means that an actor can finally host multiple streams, right?
Dominik Charousset
@Neverlord
If you do observe_stream, CAF sends an open handshake. On the receiver side, this creates an observer<T> for the sender and calls the factory. There may be some optional arguments that allow you to fine-tune how the messaging after that looks like (micro-batch sizes, max. delay, etc.).
You can multiplex inside one actor however you choose. In practice, you'd probably first create a (hot) observable and then bind that to the make_stream factory to just call my_observable.subscribe(new_listener). You can also open up multiple streams and then merge or zip_with on the inputs.
Dominik Lohmann
@dominiklohmann
Is the "open handshake" a special message again like for the current streaming similar to down or exit messages, or is it a regular message such that it can also be delegated? The latter would be really nice to have.
Dominik Charousset
@Neverlord
It needs to be a special message because otherwise the user would need to provide a handler.
Dominik Lohmann
@dominiklohmann
I'd actually prefer that, because that means it'd also be in the signature of typed actors and that the message can be delegated or responded to at a later point in time, or even rejected with an error.
Dominik Charousset
@Neverlord
I don't think that's feasible, though. It would force each actor that wants to use streams to add the same boilerplate over and over again.
Maybe overriding the default handler is an option.
Dominik Lohmann
@dominiklohmann

How much boilerplate would that be? I imagine it to just be a single function call, which seems acceptable for embedding it in the typed actor signature:

[self](caf::stream<T> handle) -> caf::result<caf::stream_ack<T>>> {
  auto [observable, ack] = self->observe_stream(handle);
  std::move(observable).for_each(...);
  return ack; // could also return an error, or a delegated<stream_ack<T>> here
},

If I understand correctly, a stream in the new model is just a shared spmc buffer resource with a few extra features built on top to allow for some routing and back pressure propagation. Any actor can create a hot observable from stream, and similar any actor can turn a hot observable into a stream. Is this mental model correct?

Maybe there's not even need for special handlers, because it's every actor's responsibility to handle the hot observable <-> stream itself, which would allow for some very flexible setups.
Dominik Charousset
@Neverlord
I think we talk about different things. :)
That's just sending a stream to the actor and then calling observe_stream. That's going to be fine. I was talking about the internal workings of observe_stream.
Dominik Lohmann
@dominiklohmann
I totally understand that that can fail and requires a special handler. So we're on the same page then. :)
Dominik Charousset
@Neverlord
Looks like it.
Dominik Lohmann
@dominiklohmann
Is there an easy way to detect for a given response promise whether the request associated with it timed out already?
Dominik Charousset
@Neverlord
No, the actor only knows once it sees the timeout message.
Dominik Charousset
@Neverlord

I've just assembled a new web_socket::accept function that nicely integrates with flows: https://github.com/actor-framework/actor-framework/blob/070b3ae157ed7866cf3c9785ca8fceead415ae92/examples/web_socket/echo.cpp.

I wouldn't recommend checking out the caf-net branch quite yet, though. 🙂

nohous
@nohous
is this a right place to talk about possible CAF applications?
Dominik Charousset
@Neverlord
@nohous sure, welcome to the chat! :)