Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
Dominik Lohmann
@dominiklohmann
Currently just trying to get a feel of the API and what it's use cases and limitations are, so I started by comparing it to how we use the current streaming APIs. Kinda excited to toy around with it. I assume the new streaming stuff on top of it is all targeted for CAF 0.19?
Dominik Charousset
@Neverlord

Let me know if you do find bugs. The current flow branch is pretty much on its way back to master.

For CAF 0.19 we'll have flows and websockets (with caf_net), I'd like to see the new streams making the cut as well.

Dominik Lohmann
@dominiklohmann

New streams would likely be required for us from my experiments because flows cannot replace the old streams yet, unless that is staying for 0.19. The lack of dynamic setup with flows between actors after they've started will be a major hassle for us, because it requires rewriting how we start our ingestion pipeline. I'll let you know if I encounter any major issues, but so far I've seen none in the playground I've set up for my experiments.

If you need any design input for the new streams I'm always open to chat since we're really deep into the streaming internals by now. Would be really interested in some design doc if you have something like that.

Dominik Charousset
@Neverlord

There's no design document, but here's a rough outline for how I picture the new API at the moment:

An actor creates a new stream simply from a factory object. Something like:

auto out = self->make_stream([self](observer<int> new_listener) {
  return self->make_observable().iota(1).take(100).subscribe(new_listener); });
// decltype(out) = stream<int>

The stream handle itself is just a reference to the actor plus an ID. On the consumer side, you'd do self->observe_stream(hdl) to obtain an observable<T> and then you can apply all the operators you'd want.

Dominik Lohmann
@dominiklohmann
How does the stream handshake work in the new model? Mostly interested in how I would translate our current sources, stages, and sinks.
Also, this probably means that an actor can finally host multiple streams, right?
Dominik Charousset
@Neverlord
If you do observe_stream, CAF sends an open handshake. On the receiver side, this creates an observer<T> for the sender and calls the factory. There may be some optional arguments that allow you to fine-tune how the messaging after that looks like (micro-batch sizes, max. delay, etc.).
You can multiplex inside one actor however you choose. In practice, you'd probably first create a (hot) observable and then bind that to the make_stream factory to just call my_observable.subscribe(new_listener). You can also open up multiple streams and then merge or zip_with on the inputs.
Dominik Lohmann
@dominiklohmann
Is the "open handshake" a special message again like for the current streaming similar to down or exit messages, or is it a regular message such that it can also be delegated? The latter would be really nice to have.
Dominik Charousset
@Neverlord
It needs to be a special message because otherwise the user would need to provide a handler.
Dominik Lohmann
@dominiklohmann
I'd actually prefer that, because that means it'd also be in the signature of typed actors and that the message can be delegated or responded to at a later point in time, or even rejected with an error.
Dominik Charousset
@Neverlord
I don't think that's feasible, though. It would force each actor that wants to use streams to add the same boilerplate over and over again.
Maybe overriding the default handler is an option.
Dominik Lohmann
@dominiklohmann

How much boilerplate would that be? I imagine it to just be a single function call, which seems acceptable for embedding it in the typed actor signature:

[self](caf::stream<T> handle) -> caf::result<caf::stream_ack<T>>> {
  auto [observable, ack] = self->observe_stream(handle);
  std::move(observable).for_each(...);
  return ack; // could also return an error, or a delegated<stream_ack<T>> here
},

If I understand correctly, a stream in the new model is just a shared spmc buffer resource with a few extra features built on top to allow for some routing and back pressure propagation. Any actor can create a hot observable from stream, and similar any actor can turn a hot observable into a stream. Is this mental model correct?

Maybe there's not even need for special handlers, because it's every actor's responsibility to handle the hot observable <-> stream itself, which would allow for some very flexible setups.
Dominik Charousset
@Neverlord
I think we talk about different things. :)
That's just sending a stream to the actor and then calling observe_stream. That's going to be fine. I was talking about the internal workings of observe_stream.
Dominik Lohmann
@dominiklohmann
I totally understand that that can fail and requires a special handler. So we're on the same page then. :)
Dominik Charousset
@Neverlord
Looks like it.
Dominik Lohmann
@dominiklohmann
Is there an easy way to detect for a given response promise whether the request associated with it timed out already?
Dominik Charousset
@Neverlord
No, the actor only knows once it sees the timeout message.
Dominik Charousset
@Neverlord

I've just assembled a new web_socket::accept function that nicely integrates with flows: https://github.com/actor-framework/actor-framework/blob/070b3ae157ed7866cf3c9785ca8fceead415ae92/examples/web_socket/echo.cpp.

I wouldn't recommend checking out the caf-net branch quite yet, though. 🙂

nohous
@nohous
is this a right place to talk about possible CAF applications?
Dominik Charousset
@Neverlord
@nohous sure, welcome to the chat! :)
Agor Maxime
@4rzael
Hi everyone !
I'm working on a fairly large embedded project running on an MCU (esp32), where it seems like an actor model would make a lot of sense. Has anyone here used CAF on similar environments ?
I was wondering how applicable it is to embedded programming ? More specifically, on the memory usage aspect:
  • I'm trying to avoid dynamic allocations after initialization. Are there operations that would not be usable in this context ? Actor instantiation ? Message passing ? Promise / Delaying ?
  • Is there any way to provide custom allocators to the framework ?
  • What is the memory footprint of an event-based actor ? The docs seems to state "A few hundred of bytes". Is that correct ?
    I know this question is quite broad, and I'm sorry for that. I'm just trying to understand feasibility before going a lot deeper in how the framework works
Dominik Charousset
@Neverlord
I don't have experience for running in such an environment, but regarding the other questions: Spawning an actor as well as message passing rely on memory allocations. There's no support for passing custom allocators to the framework. That allocator would have to support a wide range of different use scenarios (messages vs actors vs per-actor state), so it would probably not give much value to add support a allocator. And passing different allocators to different system components would get tricky. Few hundred bytes: that should still hold (unless you subtype one of the actors and make it huge). Haven't measured it in a while, though. An actor also may allocate during initialization depending on what you do, e.g., setting custom handlers. The behavior is also heap-allocated, so there's a couple of allocations per spawn.
2 replies
CAF uses ref counting, so at least it generally releases memory as soon as possible.
Farafonov Alexey
@farafonov-alexey

Hello, @Neverlord! We have library(dll) that exports some functions. Inside this functions we have this pseudo code

caf::scoped_actor self{system_->system()};
self
      ->request(actor1, atom1, ...)
      .receive(
        [&](....) {
          ....
        },
        [&](caf::error& err) {
          ....
        });

Cause this library methods should return result we use blocking behavior.

If we use this library from Qt application actor1 recieves atom1 almost instantly (1-2ms) after calling request. Otherwise if we call this same method from console app actor1 recieves atom1 after a delay of 10-15ms. Cause we call this function multiple times this delay seems critical. We don't understand the reason of this behavior nor how to overcome this :(.

CAF version 0.17.4
Dominik Charousset
@Neverlord
10-15ms seems like a lot. I don't really see what "Qt app" vs "console app" entails, though. Did you maybe reproduce this with some small-ish code that I could look at?
Farafonov Alexey
@farafonov-alexey
@Neverlord Well not really smallish =), but as minimal as i can do right now. https://github.com/farafonov-alexey/caf-request-recieve
From request till actor recieve right now i get approx 15-20ms. Could you please look maybe I'm missing something.
image.png
Dominik Charousset
@Neverlord
How are your scheduler settings? Relaxed sleep is 10ms by default: https://actor-framework.readthedocs.io/en/stable/ConfiguringActorApplications.html#configuration-files
Farafonov Alexey
@farafonov-alexey
Well i think its default, how can I set this value programmatically?)
Dominik Charousset
@Neverlord
If you're using the default scaffold with CAF_MAIN, you can pass --dump-config or in source code get_or(<key>, <default>). But if you're not setting anything then it's 10ms. It seems like your example does ultimately the same loop, though. So it's weird that it behaves differently when calling it from a Qt thread.
Farafonov Alexey
@farafonov-alexey
And from begin till end it gives 15 ms
image.png

10-15ms seems like a lot. I don't really see what "Qt app" vs "console app" entails, though. Did you maybe reproduce this with some small-ish code that I could look at?

@Neverlord this is ok or "a lot"?

Dominik Charousset
@Neverlord
@farafonov-alexey 15ms is definitely more latency than I'd expect. Since you can reproduce this even with the hello world example, let me double-check this on my end to see if the 15ms may be related to OS/HW combinations.
Farafonov Alexey
@farafonov-alexey
I change relaxed-sleep-duration but without effect
Dominik Charousset
@Neverlord
Ok. Let me play around with this a bit and get back to you. :)
Dominik Charousset
@Neverlord
@farafonov-alexey I can't speak to your HW setup, but at least on my machine, message passing performance looks how it ought to. Here's the test code:
#include <string>
#include <iostream>

#include "caf/actor_ostream.hpp"
#include "caf/actor_system.hpp"
#include "caf/caf_main.hpp"
#include "caf/event_based_actor.hpp"
#include "caf/scoped_actor.hpp"
#include "caf/timestamp.hpp"

using namespace caf;

behavior testee() {
  return {
    [](timestamp t0) {
      auto t1 = make_timestamp();
      auto delta = t1 - t0;
      std::cout << "t0: " << caf::deep_to_string(t0) << '\n'
                << "t1: " << caf::deep_to_string(t1) << '\n'
                << "delta: " << caf::deep_to_string(delta) << '\n';
    },
  };
}

void caf_main(actor_system& sys) {
  scoped_actor self{sys};
  auto aut = self->spawn(testee);
  self->send(aut, make_timestamp());
}

CAF_MAIN()
And a couple of runs:
@iMacWork: ~/workspace # master $ ./build/caf/release/examples/hello_world
t0: "2022-05-09T19:13:01.901"
t1: "2022-05-09T19:13:01.901"
delta: 52us
@iMacWork: ~/workspace # master $ ./build/caf/release/examples/hello_world
t0: "2022-05-09T19:14:22.067"
t1: "2022-05-09T19:14:22.067"
delta: 26us
@iMacWork: ~/workspace # master $ ./build/caf/release/examples/hello_world
t0: "2022-05-09T19:14:22.945"
t1: "2022-05-09T19:14:22.945"
delta: 19us
@iMacWork: ~/workspace # master $ ./build/caf/release/examples/hello_world
t0: "2022-05-09T19:14:23.442"
t1: "2022-05-09T19:14:23.442"
delta: 89us
@iMacWork: ~/workspace # master $ ./build/caf/release/examples/hello_world
t0: "2022-05-09T19:14:23.921"
t1: "2022-05-09T19:14:23.922"
delta: 94us
@iMacWork: ~/workspace # master $ ./build/caf/release/examples/hello_world
t0: "2022-05-09T19:14:24.403"
t1: "2022-05-09T19:14:24.403"
delta: 26us
@iMacWork: ~/workspace # master $ ./build/caf/release/examples/hello_world
t0: "2022-05-09T19:14:24.827"
t1: "2022-05-09T19:14:24.827"
delta: 68us
@iMacWork: ~/workspace # master $ ./build/caf/release/examples/hello_world
t0: "2022-05-09T19:14:25.242"
t1: "2022-05-09T19:14:25.242"
delta: 23us
Release build with current master.
Dominik Charousset
@Neverlord
And here's a version with request-response:
#include <string>
#include <iostream>

#include "caf/actor_ostream.hpp"
#include "caf/actor_system.hpp"
#include "caf/caf_main.hpp"
#include "caf/event_based_actor.hpp"
#include "caf/scoped_actor.hpp"
#include "caf/timestamp.hpp"

using namespace caf;

using namespace std::literals;

behavior clock_actor() {
  return {
    [](get_atom) {
      return make_timestamp();
    }
  };
}

behavior testee() {
  return {
    [](timestamp t0) {
      auto t1 = make_timestamp();
      auto delta = t1 - t0;
      std::cout << "async:\n"
                << "t0: " << caf::deep_to_string(t0) << '\n'
                << "t1: " << caf::deep_to_string(t1) << '\n'
                << "delta: " << caf::deep_to_string(delta) << '\n';
    },
  };
}

void caf_main(actor_system& sys) {
  scoped_actor self{sys};
  // request-response
  auto clk = self->spawn(clock_actor);
  auto t0 = make_timestamp();
  self->request(clk, 5s, get_atom_v)
    .receive(
      [&](timestamp t1) {
        auto t2 = make_timestamp();
        std::cout << "request-response:\n"
                  << "t0: " << caf::deep_to_string(t0) << '\n'
                  << "t1: " << caf::deep_to_string(t1) << '\n'
                  << "t2: " << caf::deep_to_string(t2) << '\n'
                  << "delta (full): " << caf::deep_to_string(t2 - t0) << '\n'
                  << "delta (self -> clock): " << caf::deep_to_string(t1 - t0) << '\n'
                  << "delta (clock -> self): " << caf::deep_to_string(t2 - t1) << '\n';
      },
      [](const error&) {});
  // async
  auto aut = self->spawn(testee);
  self->send(aut, make_timestamp());
}

CAF_MAIN()
The times also are in microsecond range:
@iMacWork: ~/workspace # master $ ./build/caf/release/examples/hello_world
request-response:
t0: "2022-05-09T19:24:10.369"
t1: "2022-05-09T19:24:10.369"
t2: "2022-05-09T19:24:10.369"
delta (full): 52us
delta (self -> clock): 43us
delta (clock -> self): 9us
async:
t0: "2022-05-09T19:24:10.370"
t1: "2022-05-09T19:24:10.370"
delta: 73us
@iMacWork: ~/workspace # master $ ./build/caf/release/examples/hello_world
request-response:
t0: "2022-05-09T19:24:11.077"
t1: "2022-05-09T19:24:11.077"
t2: "2022-05-09T19:24:11.077"
delta (full): 51us
delta (self -> clock): 35us
delta (clock -> self): 16us
async:
t0: "2022-05-09T19:24:11.077"
t1: "2022-05-09T19:24:11.078"
delta: 70us
@iMacWork: ~/workspace # master $ ./build/caf/release/examples/hello_world
request-response:
t0: "2022-05-09T19:24:11.693"
t1: "2022-05-09T19:24:11.693"
t2: "2022-05-09T19:24:11.693"
delta (full): 111us
delta (self -> clock): 20us
delta (clock -> self): 91us
async:
t0: "2022-05-09T19:24:11.694"
t1: "2022-05-09T19:24:11.694"
delta: 16us
@iMacWork: ~/workspace # master $ ./build/caf/release/examples/hello_world
request-response:
t0: "2022-05-09T19:24:12.277"
t1: "2022-05-09T19:24:12.277"
t2: "2022-05-09T19:24:12.277"
delta (full): 67us
delta (self -> clock): 51us
delta (clock -> self): 16us
async:
t0: "2022-05-09T19:24:12.278"
t1: "2022-05-09T19:24:12.278"
delta: 61us
@iMacWork: ~/workspace # master $ ./build/caf/release/examples/hello_world
request-response:
t0: "2022-05-09T19:24:12.847"
t1: "2022-05-09T19:24:12.847"
t2: "2022-05-09T19:24:12.847"
delta (full): 40us
delta (self -> clock): 30us
delta (clock -> self): 10us
async:
t0: "2022-05-09T19:24:12.848"
t1: "2022-05-09T19:24:12.848"
delta: 44us
@iMacWork: ~/workspace # master $ ./build/caf/release/examples/hello_world
request-response:
t0: "2022-05-09T19:24:13.390"
t1: "2022-05-09T19:24:13.390"
t2: "2022-05-09T19:24:13.390"
delta (full): 56us
delta (self -> clock): 46us
delta (clock -> self): 10us
async:
t0: "2022-05-09T19:24:13.391"
t1: "2022-05-09T19:24:13.391"
delta: 38us
@iMacWork: ~/workspace # master $ ./build/caf/release/examples/hello_world
request-response:
t0: "2022-05-09T19:24:13.963"
t1: "2022-05-09T19:24:13.963"
t2: "2022-05-09T19:24:13.963"
delta (full): 128us
delta (self -> clock): 118us
delta (clock -> self): 10us
async:
t0: "2022-05-09T19:24:13.963"
t1: "2022-05-09T19:24:13.963"
delta: 15us
Btw, CAF timestamps internally store nanosecond resolution. The system clock seems to have microsecond resolution. The timesamps only show milliseconds in the ISO-format, that's why they all look the same.
Farafonov Alexey
@farafonov-alexey
@Neverlord Thank's for measuring