by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Denis Mikhaylov
    @notxcain
    @dispalt after a lot of thoughts I consider Schedule in general, i.e if it's not a part of your domain, an anti-pattern, it is here for the sake of backwards compatibility, because we use it in our projects at work. What I would suggest for a greenfield project is to use a view side of an aggregate to perform planned actions, e.g. periodically query all subscription where nextBillingDate < now, and invoice them, instead of adding an entry to schedule and react when it fires.
    Jisoo Park
    @guersam
    Hi, I have somewhat general CQRS/ES question. When a non-idempotent exeternal service call is needed by an emitted event (e.g. sending a welcome email,) how do you prevent it from being happend twice with aecor?
    Milan van der Meer
    @milanvdm
    Question on decoding messages as I have a similar problem in Kafka.
    Kafka allows you to send multiple types of messages to one topic on which you can register a handler for example.
    The problem is, how do you know which message is of which type in your code base? With Avro you can get back to a correct GenericRecord with a schema, but then to go to the correct Scala class is tricky.
    It seems you guys do something here: https://github.com/notxcain/aecor/blob/master/modules/akka-persistence-runtime/src/main/scala/aecor/runtime/akkapersistence/AkkaPersistenceRuntimeActor.scala#L123.
    I expect Akka has a similar issue where you sent certain messages using a schema to a certain Actor. How does it know how to decode it back to the correct Scala class?
    Denis Mikhaylov
    @notxcain
    @guersam it’s not possible to prevent. It’s not Aecor specific. It’s related to delivery guarantees in general. So you have to decide what is preferred from business standpoint. Send 0 or 1 email by committing before sending, or send 1 or possibly infinite emails by committing after sending? :)
    @milanvdm you need some kind of a type tag as a hint which will help you to decide what type to use to decode a message to
    Jisoo Park
    @guersam

    @notxcain Thanks for the explanation :) If I understood correctly,

    1) the former can be implemented with a command handler with side effect like this https://pavkin.ru/aecor-part-2/#gist93471703 or a Saga that orchestrates the consequent commands, and

    2) the latter can be implemented with another event handler that subscribes the original journal, filters interesting events and calls external service unless the correlation id is marked as 'done.'

    Are both ways possible with aecor, or am I missing something?

    Denis Mikhaylov
    @notxcain
    @guersam while technically you’re right I would implement both as a subscription to events from journal, the only difference is when you mark them as done, before or after the side effect. I wouldn’t recommend adding any side-effecting actions to command handler.
    Jisoo Park
    @guersam
    Thanks!
    Denis Mikhaylov
    @notxcain
    @guersam you're welcome
    Milan van der Meer
    @milanvdm
    @notxcain How would you distribute the different decoders though in a distributed context? Do you just simply keep a mapping between typetags and their respective decoders in a common library?
    How do you handle this issue in Aecor? Or does Akka Persistence handle this for you?
    Denis Mikhaylov
    @notxcain
    Please describe your problem. Where and what are you trying to encode/decode?
    Milan van der Meer
    @milanvdm
    I have UserEvent with different subtypes Created, ...
    So I send a Created event to kafka. It first gets encoded using Avro into a GenericRecord and the schema gets registered in the registry.
    When receiving the event in a different service, I can decode every event back to a GenericRecord with all the correct fields.
    But as one topic, contains several subtypes, the other service cannot decode it back to a proper Scala class (to properly aggregate your state depending on each event).
    So one solution is to include an TypeTag in every Event sent, use that information to get the right Scala class decoder to get back into the Scala types.
    This feels very weird though to have some kind of Map[TypeTag, Decoder] flying around in every consumer.
    I assume a similar problem needs to have been solved with Aecor but I cant find any specifics in the repo. Which makes me think it is or hidden in the protobuf or handled by Akka Persistence.
    Am I making sense? :)
    Milan van der Meer
    @milanvdm
    Example:
    val event = consumer.poll[UserEvent](topicName) // Gives back GenericRecord 
    def fold(e: UserEvent, aggregate: State) = { e match { case Created => .... } }
    fold(event, currentState) // Need for the correct decoding of a GenericRecord back to the correct scala type
    Denis Mikhaylov
    @notxcain
    Yes you are :) The problem is not Aecor specific. You have to come up with some kind of type tag in your serialised representation. I don't know how to do it using Avro. Never used Avro. The mapping Map[TypeTag, Decoder] seems legit solution.
    Milan van der Meer
    @milanvdm
    I understand, that's why I was wondering if you could point me to the way Aecor handles this problem :)
    To get some inspiration
    Denis Mikhaylov
    @notxcain

    The only place where Aecor handles it is in boopickel-wire-protocol module. And it uses numeric type tags for. Let's say you have

    sealed trait FooEvent
    case class FooCreated(name: String) extends FooEvent
    case class FooDeleted(reason: String) extends FooEvent

    Let's say you have some encoder: FooEvent => Avro and decoder: Avro => Option[FooEvent]. In pseudo code it would be

    val encoder = {
      case FooCreated(name) => encodeInt(0) then encodeString(name)
      case FooDeleted(reason) => encodeInt(1) then encodeString(reason)
    }
    
    val decoder = decodeInt.flatMap {
      case 0 => decodeString.map(name => FooCreated(name))
      case 1 => decoderString.map(reason => FooDeleted(reason))
      case _ => Option.empty
    }
    Denis Mikhaylov
    @notxcain
    Maybe https://github.com/sksamuel/avro4s has some examples for sealed hierarchies
    Milan van der Meer
    @milanvdm
    Cool, thanks a lot for the pointers! :)
    Denis Mikhaylov
    @notxcain
    My pleasure :)
    Denis Mikhaylov
    @notxcain
    I released Aecor 0.18.0
    Jisoo Park
    @guersam
    :tada: Great! I want to evaluate it in my new project at work with the postgres journal, waiting for the next chapters from Mr. Pavkin :)
    Denis Mikhaylov
    @notxcain
    Feel free to ask questions! :)
    Jisoo Park
    @guersam
    Thanks :)
    Dan Di Spaltro
    @dispalt
    @notxcain hey quick question, why did you make the deploy function on akkaPerRT, basically produce a, "producer", aka F[K => M[F]]?
    I get why MVar is a F[MVar[F]] but I don't grok this one
    Denis Mikhaylov
    @notxcain
    @dispalt hi! It’s because underneath it starts ShardRegion which is considered an effect, hence it captured in type.
    Dan Di Spaltro
    @dispalt
    Right, yeah that's what I figured
    SemanticBeeng
    @SemanticBeeng

    Is there a logical or technical problem with doing doing both append and reject ?

    In Vladimir's example, expire does append https://github.com/SemanticBeeng/ticket-booking-aecor/blob/7e9be350d58fe59d87315603bcba702b21287a8d/booking/src/main/scala/ru/pavkin/booking/booking/entity/EventsourcedBooking.scala#L78.

    In my functionality, the "command" should be both SessionExpired if the time check fails (just like above) but also the reject-ed, just because it has expired.

    Does that make sense, @notxcain ?

    Vladimir Pavkin
    @vpavkin
    @SemanticBeeng IMO it should be handled on a different level.
    I would add a non-unit response to the command (Option[Expiration]) and then handle it at a higher level
    so command would succeed with a SessionExpired event, and the sender would then decide what to do based of the result
    but I'm also curious what @notxcain has to say.

    So it would be smth like:

    def expire: I[Option[Expiration]]

    SemanticBeeng
    @SemanticBeeng

    Hmm... then I would have to do the same on every command.
    Expiration is something that happens at the entity level.

    I really would not want to expose callers to this through something other than a reject - feels most natural.

    Saw the use of >> in your "booking app example" and figured I could extend its use to compose these two things: one is internal facing (event) and the other is external facing, the reject.

    The combination between internal "mutation" and external result seems quite applicable in other situations. The external result could be either of the kind you suggested or a reject.

    Anyway, will mull and wait for more.

    Vladimir Pavkin
    @vpavkin
    I haven't even tested that TBH, but it appears that you can pull that off with how Reject instances are defined ATM
    @SemanticBeeng have you tried it, does it work?
    SemanticBeeng
    @SemanticBeeng
    Yes. As per https://pavkin.ru/aecor-part-2/ "composable command handler" seems to hold in this sense also.
    Compiles. :-)
    Now wip on trying to figure how to write specs (trying to avoid the Akka runtime and seems not possible but still mulling on it)
    Vladimir Pavkin
    @vpavkin
    You can use smth like this
    
    import cats.arrow.FunctionK
    import cats.data.Chain
    import cats.effect.Sync
    import cats.implicits._
    import cats.tagless.implicits._
    import cats.mtl.MonadState
    import cats.tagless.FunctorK
    
    object StateRuntime {
    
      def single[M[_[_]]: FunctorK, F[_], S, E](
        behaviour: EventsourcedBehavior[M, F, S, E]
      )(implicit F: Sync[F], S: MonadState[F, Chain[E]]): M[F] = {
    
        val actionRunner: ActionRunner[F, S, E] =
          new FunctionK[ActionT[F, S, E, ?], F] {
            def apply[A](fa: ActionT[F, S, E, A]): F[A] =
              for {
                previousEvents <- S.get
                prevState <- previousEvents.foldM(behaviour.create)(behaviour.update).unwrap[F]
                foldedResult <- fa.run(prevState, behaviour.update)
                (events, res) <- foldedResult.unwrap[F]
                _ <- S.modify(_ ++ events)
              } yield res
          }
    
        behaviour.actions.mapK(actionRunner)
      }
    
    }
    SemanticBeeng
    @SemanticBeeng
    Hmm... sweet. Will work with that, thanks.
    Vladimir Pavkin
    @vpavkin

    IDK, I understand your motive now, but it still feels weird to me. I've never eventsourced a session btw, wonder what made you do so :)

    Although it's true that algebra doesn't prevent you from doing this trick. We lack explicit laws for MonadAction* family, these would at least answer your question

    SemanticBeeng
    @SemanticBeeng
    It is a long lived entity with a non-trivial lifecycle, shared between a few business processes.
    Vladimir Pavkin
    @vpavkin
    @SemanticBeeng You made me understand that my example repo lacks a proper test suite. We have some neat tools internally, will add a small chapter on the blog probably and add tests to example repo
    SemanticBeeng
    @SemanticBeeng

    Yes, that would be great, indeed.
    If they could show testing process managers composed it would be even better.

    These guys are implemented functionally but likely have side effects and testing those would make state management aspects more explicit.

    Anything of difference between StateRuntime and GenericAkkaRuntime that would matter for functional testing above?

    Usually mixing Akka in makes functional testing hard.
    If there was a way to separate that away for the purpose of aecor and the micro-architecture you evolved in the booking app it would make a good topic for a few blog posts.

    Vladimir Pavkin
    @vpavkin
    Actually there is E2eSupport in aecor-testkit, there's even an example - you can check it out.
    We don't usually test everything wired up for "real" (when you'd get a Concurrent instance) - we don't want to have concurrency in tests.
    It's probably a better idea to have external automation for these complex scenarios
    SemanticBeeng
    @SemanticBeeng
    Oki, thanks. will do,
    Agreed. about concurrency in functional tests.
    A bit fuzzy on "external automation" but have stuff to work with, thanks.