Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • Sep 29 14:27
  • Sep 28 22:00
  • Sep 28 08:35

    Neverlord on neverlord

    (compare)

  • Sep 28 08:34

    Neverlord on 0.18-ci

    (compare)

  • Sep 28 08:33

    Neverlord on 0.18

    Update CI Docker images (cherr… (compare)

  • Sep 28 08:33

    Neverlord on master

    Update CI Docker images Merge topic/neverlord/ci-update (compare)

  • Sep 28 08:13

    Neverlord on 0.18-ci

    Update CI Docker images (cherr… (compare)

  • Sep 28 07:56

    Neverlord on neverlord

    Update CI Docker images (compare)

  • Sep 28 07:49
    Neverlord assigned #1347
  • Sep 28 07:49
    Neverlord milestoned #1347
  • Sep 28 07:49
    Neverlord assigned #1341
  • Sep 28 07:49
    Neverlord assigned #1298
  • Sep 28 07:47
    Neverlord milestoned #1341
  • Sep 28 07:47
    Neverlord milestoned #1299
  • Sep 28 07:40

    Neverlord on neverlord

    Update CI Docker images (compare)

  • Sep 28 07:36

    Neverlord on neverlord

    Update CI Docker images (compare)

  • Sep 28 07:29

    Neverlord on neverlord

    (compare)

  • Sep 28 07:29

    Neverlord on master

    Implement buffer flow operator Merge topic/neverlord/flow-batc… (compare)

  • Sep 28 07:27

    Neverlord on master

    Fix typo: interruptable -> inte… (compare)

  • Sep 28 07:23

    Neverlord on neverlord

    (compare)

Dominik Charousset
@Neverlord
Rich Henry
@phaistos
fixed, thanks a lot
i didnt see any discussion of that method, or the passing of arguments in the docs, but i didnt read them very closely either
and on closer inspection i see it
Dominik Lohmann
@dominiklohmann
If I have an actor behavior that takes a parameter by reference, would it theoretically be safe to store that reference elsewhere if I also manually keep a reference count to the current message as long as I keep the reference? How would I go about getting a strong reference to the current message from a "self" pointer?
Dominik Charousset
@Neverlord
self->current_mailbox_element()-> payload gives you the message in a handler. Keeping a reference/pointer into that message is only safe for read-only access because you otherwise break COW (changing a tuple with ref count > 1).
Dominik Lohmann
@dominiklohmann
It is for read-only access purposes, thanks! Not sure if you've seen my discussion comment regarding coroutines w/ CAF result (open to chat about it any time!), but I kind of need to extend the current message lifetime until the coroutine frame is destroyed because by-reference function parameters are only stored by-reference in the coroutine frame.
Dominik Charousset
@Neverlord
I've seen the notification, I'll get back to that if I can make the time. Are you still on 0.17? In that case it's a bit more complicated. Because in 0.17 CAF can fuse the content into the mailbox element itself. You could do something like self->current_mailbox_element()->move_content_to_message() to have something to hold onto.
But that of course invalidates the original reference.
Dominik Lohmann
@dominiklohmann
Yeah we sadly still are. The upgrade had to move back due to limited engineering resources. We still have some stuff serialized with the caf::binary_serializer, and the output of that changed between 0.17 and 0.18, so we're in the process of transitioning our database state in a forward-compatible way.
Invalidating the original reference is not a problem as I need to have a lifting function around every behavior callback anyways, I can move it in there. Thanks!
Dominik Charousset
@Neverlord
Depending on how long the migration goes, maybe you can skip 0.18 entirely and jump to 0.19. ;)
Good luck!
Dominik Lohmann
@dominiklohmann
Probably another 1-3 months, depending on customer needs :)
Dominik Lohmann
@dominiklohmann

Totally separate question:

self->set_down_handler(...);
auto handle = self->spawn<caf::monitored>(my_typed_actor, args...);
typed_actor<...>::behavior_type my_typed_actor(typed_actor<...>::pointer self, Args... args) {
  if (!validate(args...)) {
    self->quit(make_error(...));
    return typed_actor<...>::behavior_type::make_empty_behavior();
  }
  return {
    // ... 
  };
}

If the validation inside the my_typed_actor function fails the down handler will never be triggered, despite the actor shutting down correctly. Is this a bug, or are we simply abusing the API here?

Dominik Charousset
@Neverlord
I'd say it should trigger down messages. Do you have a minimal example that reproduces this?
Dominik Lohmann
@dominiklohmann
I'll open a PR with a unit test, assuming it still fails with 0.18 you can take it from there.
Dominik Charousset
@Neverlord
👍
Yifei Yang
@Yifei-yang7
Hi, may I know if message communication between cross-node actors is able to saturate the network bandwidth? For example if I try to transfer a large Arrow table between cross-node actors. Thank you!
Dominik Charousset
@Neverlord
The general advise we give is preferring small messages. Utilizing bandwidth shouldn't be an issue, but if you force the middleman to serialize KB or even MB worth of data it will 'block' him for quite a bit. So you could see latency spikes from such 'jumbo messages'. I'd say build a small prototype and measure how much latency you actually cause. The Prometheus metrics for the middleman should give you a good idea. Limiting the arrow tables to some reasonable size could also be a good idea if that's reasonable for your application.
Yifei Yang
@Yifei-yang7
Thanks for your advice. For serialization of middleman, as Arrow provide built in serialization methods, I call it inside inspect. So I guess the middleman can just transfer the serialized bytes from Arrow API, without incurring the serialization latency inside the middleman itself. May I know if this is true? But yeah for sure I will benchmark a bit to see the real effect. :)
Dominik Charousset
@Neverlord
Even if the serialization itself takes no time, you still have to copy data into the socket buffer. If that's a couple MB worth of data, it still can cause quite a bit of delay. Measurements are your friends. :)
Yifei Yang
@Yifei-yang7
got it, thanks! :-)
Kyle Klenk
@KyleKlenk
Hi, I would like to now if CAF can be configured to use green threads instead of hardware threads? We are running into issues where CAF is spawning more threads than cores on some of our systems and our admins are recommending that we consider using green threads.
Dominik Charousset
@Neverlord
CAF usually starts more threads than cores since slightly "overbooking" the system usually results in better performance. Some threads invariably have to wait at some point (e.g. due to memory access) and the extra threads can fill the gaps. The analogous abstraction to green threads in CAF are the actors, they all share a single thread pool for execution. The size of that thread pool can be configured with the parameter caf.scheduler.max-threads (https://actor-framework.readthedocs.io/en/stable/ConfiguringActorApplications.html).
Kyle Klenk
@KyleKlenk
Thanks for the explanation and pointing me to the configuration for specifying the number of threads.
Dominik Lohmann
@dominiklohmann
I've been looking at Flow recently, and was wondering how result types of observables integrate with typed actors? Is there an example for this, or are they just always lazily typed via caf::message?
Ah nvm, just came across caf::async::{consumer,producer}_resource<T>. That neatly solves what I wanted to do.
Dominik Charousset
@Neverlord
In general: an observable is private to an actor, so that wouldn't go into an actor interface. That's what the new stream<T> is going to be all about. The async building blocks only work for concurrency.
There's also some documentation available, btw.
Just do sphinx-build in the manual directory and you'll have the version that has some initial documentation for the new flow API. :)
Or take the zip file.
Dominik Lohmann
@dominiklohmann

I see. I can see how a new stream<T> built on-top of data flows can work much better than the existing streaming API, which was a pain to debug and had lots of shutdown order issues.

If I understand data flows correctly they must be fully configured before their hosting actors launch. This is a limitation compared to the existing experimental streaming API which was fully configurable externally, and integrated into the typed actor API. Will this limitation also exist for the new stream<T>? I'm curious about your plans on the streaming front in general.

Dominik Charousset
@Neverlord
You can spin up flows at any time. You've probably seen the new spawn_inactive setups? You only need those if you're using observe_on, because the target actor for this operator must not run yet.
And, yeah. The old streams are going away for a reason. ;)
Dominik Lohmann
@dominiklohmann
I think there's a bug in the image for the concat operator: The concatenated stream can only start after the first stream is done, right?
Nvm, it actually can do the first one eagerly, that issue only occurs with at least three observables
Dominik Lohmann
@dominiklohmann
Currently just trying to get a feel of the API and what it's use cases and limitations are, so I started by comparing it to how we use the current streaming APIs. Kinda excited to toy around with it. I assume the new streaming stuff on top of it is all targeted for CAF 0.19?
Dominik Charousset
@Neverlord

Let me know if you do find bugs. The current flow branch is pretty much on its way back to master.

For CAF 0.19 we'll have flows and websockets (with caf_net), I'd like to see the new streams making the cut as well.

Dominik Lohmann
@dominiklohmann

New streams would likely be required for us from my experiments because flows cannot replace the old streams yet, unless that is staying for 0.19. The lack of dynamic setup with flows between actors after they've started will be a major hassle for us, because it requires rewriting how we start our ingestion pipeline. I'll let you know if I encounter any major issues, but so far I've seen none in the playground I've set up for my experiments.

If you need any design input for the new streams I'm always open to chat since we're really deep into the streaming internals by now. Would be really interested in some design doc if you have something like that.

Dominik Charousset
@Neverlord

There's no design document, but here's a rough outline for how I picture the new API at the moment:

An actor creates a new stream simply from a factory object. Something like:

auto out = self->make_stream([self](observer<int> new_listener) {
  return self->make_observable().iota(1).take(100).subscribe(new_listener); });
// decltype(out) = stream<int>

The stream handle itself is just a reference to the actor plus an ID. On the consumer side, you'd do self->observe_stream(hdl) to obtain an observable<T> and then you can apply all the operators you'd want.

Dominik Lohmann
@dominiklohmann
How does the stream handshake work in the new model? Mostly interested in how I would translate our current sources, stages, and sinks.
Also, this probably means that an actor can finally host multiple streams, right?
Dominik Charousset
@Neverlord
If you do observe_stream, CAF sends an open handshake. On the receiver side, this creates an observer<T> for the sender and calls the factory. There may be some optional arguments that allow you to fine-tune how the messaging after that looks like (micro-batch sizes, max. delay, etc.).
You can multiplex inside one actor however you choose. In practice, you'd probably first create a (hot) observable and then bind that to the make_stream factory to just call my_observable.subscribe(new_listener). You can also open up multiple streams and then merge or zip_with on the inputs.
Dominik Lohmann
@dominiklohmann
Is the "open handshake" a special message again like for the current streaming similar to down or exit messages, or is it a regular message such that it can also be delegated? The latter would be really nice to have.
Dominik Charousset
@Neverlord
It needs to be a special message because otherwise the user would need to provide a handler.
Dominik Lohmann
@dominiklohmann
I'd actually prefer that, because that means it'd also be in the signature of typed actors and that the message can be delegated or responded to at a later point in time, or even rejected with an error.
Dominik Charousset
@Neverlord
I don't think that's feasible, though. It would force each actor that wants to use streams to add the same boilerplate over and over again.
Maybe overriding the default handler is an option.