by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Dan Di Spaltro
    @dispalt
    @notxcain you around by chance?
    Denis Mikhaylov
    @notxcain
    @dispalt Hi! I was on vacation :it: Do you have some questions?
    Dan Di Spaltro
    @dispalt
    welcome back
    Dan Di Spaltro
    @dispalt
    @notxcain hey bunch of new stuff from the akka typed team on interfacing with the untyped persistence/cluster/etc
    Dan Di Spaltro
    @dispalt
    @notxcain more of a design question of the example than a how to use it, but where would you put something like getTransactions (specifically with the s)
    Denis Mikhaylov
    @notxcain
    @dispalt You may use JDBC or Cassandra to store read models
    Denis Mikhaylov
    @notxcain
    So basically algorithm is
    1. Receive event (with sequence number == n)
    2. Fetch corresponding state from read-side store
    3. Check that state version (v)
      3.1 If n <= v - ignore event as it is probably a duplicate
      3.2 If n == v + 1 - apply event to state and save it, incrementing version
      3.3 If n > v + 1 - terminate stream and report error, as you have missing events (should not happen normally)
    Dan Di Spaltro
    @dispalt
    right so in your system thats the distributedprocessing module? I guess I was more wondering about the situation that since its unpredictable when it will finish how would you return reliably a list that would include the new transactions. Like if I had a rest call POST /transaction and then very quickly (maybe instantly) GET /transactions, technically my POST doesn't create the read side (the distributed processing system does) so typically how do you handle that, is it just accepting the eventual consistency from an API standpoint?
    Dan Di Spaltro
    @dispalt
    @notxcain ^
    Denis Mikhaylov
    @notxcain
    @dispalt you are right. Views would be eventually consistent.
    And yes, DistributedProcessing should be used to distribute you processes across the cluster.
    Dan Di Spaltro
    @dispalt
    @notxcain so back to getTransactions would you model that under a new aggregate?
    Dan Di Spaltro
    @dispalt
    @notxcain and by getTransactions I mean in regard to your example in the repo
    Denis Mikhaylov
    @notxcain
    I would model it as a read side projection. Just subscribe to Transaction tag and fold them/
    Denis Mikhaylov
    @notxcain
    I’ve merged a rework branch, updated readme and published snapshot version to sonatype.
    @all
    Dan Di Spaltro
    @dispalt
    jeebus you changed a lot
    Paweł Kaczor
    @pawelkaczor
    Hi @notxcain, any plans to update dependencies? to make the aecor even more attractive for newcomers (like me currently)
    Denis Mikhaylov
    @notxcain
    Hi @pawelkaczor! Yes, very soon ;)
    Paweł Kaczor
    @pawelkaczor
    awesome
    Dan Di Spaltro
    @dispalt
    akka persistence journal for fdb came out
    @notxcain ^
    Dan Di Spaltro
    @dispalt
    @notxcain just looking at your new code, and I especially like the MTL style with MonadActionLift, but I can't tell looking at it, but if you call read, append, then read will the second read presumably have the newest state?
    Denis Mikhaylov
    @notxcain
    Yes, you’re right.
    Dan Di Spaltro
    @dispalt
    oh man, that was the single reason I stopped using this
    I guess I should have brought that up earlier
    Dan Di Spaltro
    @dispalt
    anyways, the MTL style has way better ergonomics so kudos
    Denis Mikhaylov
    @notxcain
    Thanks, I enjoy it too :)
    Dan Di Spaltro
    @dispalt
    @notxcain in the example app, should "SubscriptionInvoicing" and "Test" be the same thing, in other words, the scheduleName's won't match and they won't talk to each other, right?
    Denis Mikhaylov
    @notxcain
    I’ll check :)
    Denis Mikhaylov
    @notxcain
    Yeah, right. They won’t match.
    Dan Di Spaltro
    @dispalt
    I mean I assume the intention was to match, correct?
    also do you put all the schedule biz logic in the mapAsync function of runEventWatch?
    Denis Mikhaylov
    @notxcain
    @dispalt after a lot of thoughts I consider Schedule in general, i.e if it's not a part of your domain, an anti-pattern, it is here for the sake of backwards compatibility, because we use it in our projects at work. What I would suggest for a greenfield project is to use a view side of an aggregate to perform planned actions, e.g. periodically query all subscription where nextBillingDate < now, and invoice them, instead of adding an entry to schedule and react when it fires.
    Jisoo Park
    @guersam
    Hi, I have somewhat general CQRS/ES question. When a non-idempotent exeternal service call is needed by an emitted event (e.g. sending a welcome email,) how do you prevent it from being happend twice with aecor?
    Milan van der Meer
    @milanvdm
    Question on decoding messages as I have a similar problem in Kafka.
    Kafka allows you to send multiple types of messages to one topic on which you can register a handler for example.
    The problem is, how do you know which message is of which type in your code base? With Avro you can get back to a correct GenericRecord with a schema, but then to go to the correct Scala class is tricky.
    It seems you guys do something here: https://github.com/notxcain/aecor/blob/master/modules/akka-persistence-runtime/src/main/scala/aecor/runtime/akkapersistence/AkkaPersistenceRuntimeActor.scala#L123.
    I expect Akka has a similar issue where you sent certain messages using a schema to a certain Actor. How does it know how to decode it back to the correct Scala class?
    Denis Mikhaylov
    @notxcain
    @guersam it’s not possible to prevent. It’s not Aecor specific. It’s related to delivery guarantees in general. So you have to decide what is preferred from business standpoint. Send 0 or 1 email by committing before sending, or send 1 or possibly infinite emails by committing after sending? :)
    @milanvdm you need some kind of a type tag as a hint which will help you to decide what type to use to decode a message to
    Jisoo Park
    @guersam

    @notxcain Thanks for the explanation :) If I understood correctly,

    1) the former can be implemented with a command handler with side effect like this https://pavkin.ru/aecor-part-2/#gist93471703 or a Saga that orchestrates the consequent commands, and

    2) the latter can be implemented with another event handler that subscribes the original journal, filters interesting events and calls external service unless the correlation id is marked as 'done.'

    Are both ways possible with aecor, or am I missing something?

    Denis Mikhaylov
    @notxcain
    @guersam while technically you’re right I would implement both as a subscription to events from journal, the only difference is when you mark them as done, before or after the side effect. I wouldn’t recommend adding any side-effecting actions to command handler.
    Jisoo Park
    @guersam
    Thanks!
    Denis Mikhaylov
    @notxcain
    @guersam you're welcome
    Milan van der Meer
    @milanvdm
    @notxcain How would you distribute the different decoders though in a distributed context? Do you just simply keep a mapping between typetags and their respective decoders in a common library?
    How do you handle this issue in Aecor? Or does Akka Persistence handle this for you?
    Denis Mikhaylov
    @notxcain
    Please describe your problem. Where and what are you trying to encode/decode?
    Milan van der Meer
    @milanvdm
    I have UserEvent with different subtypes Created, ...
    So I send a Created event to kafka. It first gets encoded using Avro into a GenericRecord and the schema gets registered in the registry.
    When receiving the event in a different service, I can decode every event back to a GenericRecord with all the correct fields.
    But as one topic, contains several subtypes, the other service cannot decode it back to a proper Scala class (to properly aggregate your state depending on each event).
    So one solution is to include an TypeTag in every Event sent, use that information to get the right Scala class decoder to get back into the Scala types.
    This feels very weird though to have some kind of Map[TypeTag, Decoder] flying around in every consumer.
    I assume a similar problem needs to have been solved with Aecor but I cant find any specifics in the repo. Which makes me think it is or hidden in the protobuf or handled by Akka Persistence.
    Am I making sense? :)
    Milan van der Meer
    @milanvdm
    Example:
    val event = consumer.poll[UserEvent](topicName) // Gives back GenericRecord 
    def fold(e: UserEvent, aggregate: State) = { e match { case Created => .... } }
    fold(event, currentState) // Need for the correct decoding of a GenericRecord back to the correct scala type
    Denis Mikhaylov
    @notxcain
    Yes you are :) The problem is not Aecor specific. You have to come up with some kind of type tag in your serialised representation. I don't know how to do it using Avro. Never used Avro. The mapping Map[TypeTag, Decoder] seems legit solution.