pawelpacana on redesign
pawelpacana on aggregate-root-handle-concurrent-writes
pawelpacana on locking_friendly
pawelpacana on master
Indicate this repository has be… (compare)
@gottfrois my understanding of CQRS & Event Sourcing is:
let's try to apply this to your example:
We have a graph of tasks, after each task completion the Task publishes domain event TaskCompleted with some attributes that allow to decide if next task should be run or not.
Everything is started by first command (initiated by a user?) StartProcess what starts the first task in a graph.
If you decide to model a graph of tasks as saga it should be created when first task of graph is started, then it could respond to events TaskCompleted and if validation will be successful send a command RunTask(X) where X is next task from graph to run. Of course, there could be more events that might have an impact on the saga (graph) state like TaskFailed, TaskTimeouted etc.
Especially TaskTimeouted might be useful. Saga could send it to itself with some defined delay Timeout when it sends command RunTask. If it gets TaskCompleted or TaskFailed before TaskTimeouted it should act as usual (no timeout), if TaskTimeouted is first saga should react to this and all next (i.e. TaskCompleted) events should be ignored.
I hope that this will clarify some things, ping me if I could help more :)
@mpraglowski thanks for this great explanation. So far I see CQRS & Event Sourcing the same way as you do (awesome). Some of my commands are triggered by user actions and some of them will be triggered by internal processes (saga).
The idea behind my "graph of tasks" is to explicitly create the first one (the root node) by some command trigged by a user action (UI), execute it and if the task get "completed" call an internal process to build the next task if necessary. So in my usecase, the saga is a "task manager" that determine if it reached the end of the chain or if it should append a new task. But I think you got the basic idea right and the timeout event is a good idea.
Now what I am not sure of, is how I should implement the saga. What does it looks like in terms of ruby code. As far as I understand this, I think it should be a state machine that can transition from one state to another depending on some conditions. Each transitions will trigger a specific command that will be run against an aggregate. But since all this CQRS & Event Sourcing journey is new to me and you guys seems to have more experience with it than me, it would be awesome to see a basic implementation of a saga on one of your blog posts :)
In the meantime, i'll give this a try and will come back to you.
Yes sure, i'll be happy to put something together that could represent a subset of what i am doing. From what I have seen already, my domain model is pretty big :-/ I am not sure if it's me that is missing an intermediate object or if it's ok to have quite big domain models.
Since events need to be stored at the aggregate root level, all my user actions translate to a method in the domain model in order to trigger an event. Correct me if i'm wrong, but that can lead to having a huge domain model (in terms of number of lines). Mine is already ~300 lines long.
Also the second pain points that I have is that I had to create a lot of custom exception classes. I use them to perform business validations in my domain model. Since the domain model internals are private, it makes it impossible to extract business validations in a PolicyObject for example :-/ So I endup with code liek this:
def assign_author_task!(assignor_id, assignee_id) fail AuthorTaskNotCreated if author_task.draft? fail AuthorTaskAlreadyCompleted if author_task.completed? fail AuthorTaskAlreadyAssigned if author_task.assigned? apply Events::Document::AuthorTask::Assigned.create(id, assignor_id, assignee_id) end
Whereas I would have prefer to do:
def assign_author_task!(assignor_id, assignee_id) fail 'some custom class' PolicyObjects::AuthorTaskAssignable.new(self).valid? apply Events::Document::AuthorTask::Assigned.create(id, assignor_id, assignee_id) end
But then I loose the specific custom exception classes which was nice I guess. If you guys have thoughts and feedbacks on this it would be welcomed :)
Well the issue with splitting is that generated events will not be stored under the same uuid anymore. What I want in the end, is to be able to look at all events applied to my domain model so i can understand what happened to it, what user changed during its life cycle. So with that in mind, the only way I see to achieve this is to put everything into that domain model.
However they are things that in regular OO programing is generally a code smell which makes me having serious doubt. For example here is a subset of my "Document" domain model:
def create # apply event here end def create_author_task! # apply event here end def assign_author_task! # apply event here end def reassign_author_task! # apply event here end def complete_author_task! # apply event here end def create_qc_task! # apply event here end def assign_qc_task! # apply event here end def reassign_qc_task! # apply event here end def approve_qc_task! # apply event here end
Usually when you have methods with same prefix/suffix it's a hint that you have a hidden object right? But here I really do want the applied event to be applied on the same aggregate root. Because from the business perspective, we want to follow the life cycle of that "Document", not only a particular "author_task" or "qc_task".
Assignment, approvals and rejections are user actions. That's why I want to manage them using events. A QcTask is just an admin task to review if the translation is correct and fix it otherwise before sending the translation to the client for final review. While I understand that the author task and the qc task are fundamentally the same, they have different business rules and side effects that would be easy to manage if they are 2 different tasks I think.
Right now I am moving task's events outside the document domain model into their own domain models. Common task features (creation, assignation, completion) will trigger common events (TaskCreated, TaskAssigned, TaskCompleted). Specific task feature will trigger their own set of events (there are special events for the author task that does not exists on the other tasks).
I'll let you know if this solution fits better :) Thanks for the feedback by the way
Hi! Thanks for great resources on Event Sourcing! I'm trying to get a grip on the concept and I was looking at your gem with sample app (https://github.com/mpraglowski/cqrs-es-sample-with-res).
I have a question:
Is there a reason why loading events by EventRepository returns RailsEventStore::EventEntity instances instead of user-created events? Those are later passed to domain objects apply_<event_name> methods and in the sample app an assumption is made that event argument here:
def apply_events_order_created(event) @customer_id = event.customer_id @number = event.order_number @state = :created end
is an Events::OrderCreated instance, as it uses its accessors.
Is there a reason why loading events by EventRepository returns RailsEventStore::EventEntity instances instead of user-created events?
because RES does not know about event classes, and I need to check it - IMHO it should return RailsEventStore::Event instead of RailsEventStore::EventEntity
event_store_service_1and the other one would hit
event_store_service_2. Since we load past events in the aggregate root to build domain's current state we might apply 2 times the event generated by the command. How is this supposed to be handled? Simply by optimistic event versions? Would love to see a concrete example on your great blog at some point. Thx!
@gottfrois "Since we can have multiple instances of the event store (running on multiple machines for scalability)" - don't know if RES is the best thing to run on several machines ;) I would rather go with Greg Young's EventStore (http://geteventstore.com) and use our HttpEventStore client to connect to it (https://github.com/arkency/http_eventstore/). Also I'm not sure if I understand your problem here.
You said you have 2 commands - but this are 2 identical commands? or this is the same command send to 2 instances of ... what? the command should be send to your write model - RES has nothing to do with handling commands (yep I know AggregateRoot and all that stuff should really be moved out of RES).
Don't know if this will help but there are several concepts described in competing consumers section of GetEventStore docs http://docs.geteventstore.com/introduction/competing-consumers/
could you write more how you use and why you need 2 instances of RES ?
@mpraglowski thanks for the answer. Talking about a multi-threaded (or simply load balanced) rails app running RES. Somehow 2 identical commands are issued (poor user interface that allow a user to double click on the "create order" button for example).
You end-up with 2 events stored instead of one thread throwing the "AlreadyCreated" domain exception defined here.
A gist to illustrate https://gist.github.com/gottfrois/98acaa73e71c616c38b6.
That's why i'm asking about the optimistic locking. Currently I don't know how you can know the previous event id when you create your command and use that previous event id to perform the optimistic locking. Any guide lines? I'm talking about implementation details, like how/where can we get the previous event id? Am I missing something? Thanks.
@gottfrois it is missing a current version (last replayed event) in aggregate. It should be added to publish_event when storing aggregate state. Then you will get WrongExpectedEventVersion when trying to store event to the stream (we have a stream per aggregate) that has been changed in meantime.
It should be easy to implement - send us a PR ;) (or wait till next week when I will be back home).
@mpraglowski I have already tried to add the last replayed event to the gem. It seems to work at first glance but after several concurrent tests (using threads like code above) it turns out that 2 threads can "read" from the database at the exact same time and figure that the expected version they have is correct, ordering the database to "write" their events. Which cause the issue of having twice the same event stored.
Anyway, the gem is missing that last event, i'll be working on it first thing on Monday and submit a PR so that we can discuss this further with concrete code and specs :) Thanks man, enjoy your weekend.
Hello @mpraglowski_twitter, i have an issue with canceling events. I know there isn't a method to delete events so what i do is to delete my aggregate stream, link again only the events i want to keep and then I start a rebuild from stream with
RailsEventStore::Projection. But somehow after i add a new event to my aggregate, in duplicates it in the stream even though my aggregate looks nice. I put pry inside the gem files to find out where it is happening and it seems to happen in
append_to_stream_serialized_events. The input for me looks fine but somehow the storage of events into stream fails by adding the new event twice (once at the beginning of the stream and another time at the end).
I would like to join your slack in order to talk more about this or we can do it here, as you wish. Thanks in advance !