Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Gabriele Favaretto
    @gabriele83
    @aludwiko I try to serialize the snapshot in the following way:
    val deliverySnapshot: AtLeastOnceDeliverySnapshot = SerializationExtension(system) .findSerializerFor(AtLeastOnceDeliverySnapshot.getClass) .fromBinary(snapshot.delivery.toByteArray) .asInstanceOf[AtLeastOnceDeliverySnapshot]
    aludwiko
    @aludwiko
    @gabriele83 exactly :)
    Gabriele Favaretto
    @gabriele83
    @aludwiko but doing so I have the following error: Attempted to deserialize message using Java serialization while akka.actor.allow-java-serialization was disabled. Check WARNING logs for more details.
    aludwiko
    @aludwiko
    @gabriele83 can you print what is exactly inside AtLeastOnceDeliverySnapshot before serialization?
    Gabriele Favaretto
    @gabriele83
    @aludwiko ok… wait a second… :-)
    Gabriele Favaretto
    @gabriele83
    @aludwiko Meanwhile, is it correct that I use akka.persistence.AtLeastOnceDeliverySnapshot for “ findSerializerFor"?
    Gabriele Favaretto
    @gabriele83
    private lazy val deliverySnapshotSerializer = SerializationExtension(system).findSerializerFor(getDeliverySnapshot)
    
      override def receiveRecover: Receive = LoggingReceive {
        case RecoveryCompleted ⇒ log.info("Recovery completed!")
        case event: ProductDetailsEvent ⇒ state = update(state, event)
        case event: AtLeastOnceEvent ⇒
          log.debug(s"Update delivery state in receiveRecover:$event")
          updateDeliveryState(event)
        case SnapshotOffer(_, snapshot: ProductDetailsWriteActorStateMessage) ⇒
          val deliverySnapshot: AtLeastOnceDeliverySnapshot =
            deliverySnapshotSerializer.fromBinary(snapshot.delivery.toByteArray).asInstanceOf[AtLeastOnceDeliverySnapshot]
          state = snapshot.state.get
          setDeliverySnapshot(deliverySnapshot)
        case unknown => log.error(s"Received unknown message in receiveRecover:$unknown")
      }
    
      override protected def saveSnapshotF(state: ProductDetails): Unit =
        if (lastSequenceNr % snapshotInterval == 0 && lastSequenceNr != 0) {
          val binary: Array[Byte] = deliverySnapshotSerializer.toBinary(getDeliverySnapshot)
          saveSnapshot(ProductDetailsWriteActorStateMessage(Some(state), ByteString.copyFrom(binary)))
        }
    aludwiko
    @aludwiko
    in my case, I serialized simply by:
    private lazy val akkaMessageSerializer  = new MessageSerializer(extendedSystem)
    ....
    val atLeastOnceDeliverySnapshotBytes = akkaMessageSerializer.toBinary(atLeastOnceDeliverySnapshot)
    Gabriele Favaretto
    @gabriele83
    @aludwiko how do you get the extended system?
    aludwiko
    @aludwiko
    :D
    class StateProtobufSerializer(val extendedSystem: ExtendedActorSystem) extends SerializerWithStringManifest will do the trick
    aludwiko
    @aludwiko
    anyway this will not help you with the issue because probably you have something in AtLeastOnceDS that doesn't have proper serializer
    that why I asked for the print
    Gabriele Favaretto
    @gabriele83
    @aludwiko your solution works unlike mine, I don't know why, but it would seem to me two ways to get the same thing.
    Great, I didn't understand why, but that's okay! :-D
    aludwiko
    @aludwiko
    @gabriele83 I think I wasn't too precise. I asked to print what is currently inside AtLeastOnceDeliverySnapshot just before serialization:) not the code, but the actual object so println(atLeastOnceDeliverySnapshot)
    Gabriele Favaretto
    @gabriele83
    @aludwiko LOG—————> AtLeastOnceDeliverySnapshot(1,Vector())
    aludwiko
    @aludwiko
    hmmm that's strange.... I assume that you registered a serializer for ProductDetailsWriteActorStateMessage ?
    Gabriele Favaretto
    @gabriele83
    yes correct
    aludwiko
    @aludwiko
    ok, it will be hard to help without the codebase
    Gabriele Favaretto
    @gabriele83
    @aludwiko But your solution solved the problem for me, now everything run
      private lazy val deliverySnapshotSerializer  = new MessageSerializer(system.asInstanceOf[ExtendedActorSystem])
    
      override def receiveRecover: Receive = LoggingReceive {
        case RecoveryCompleted ⇒ log.info("Recovery completed!")
        case event: ProductDetailsEvent ⇒ state = update(state, event)
        case event: AtLeastOnceEvent ⇒
          log.debug(s"Update delivery state in receiveRecover:$event")
          updateDeliveryState(event)
        case SnapshotOffer(_, snapshot: ProductDetailsWriteActorStateMessage) ⇒
          val deliverySnapshot: AtLeastOnceDeliverySnapshot =
            deliverySnapshotSerializer
              .fromBinary(snapshot.delivery.toByteArray, classOf[AtLeastOnceDeliverySnapshot])
              .asInstanceOf[AtLeastOnceDeliverySnapshot]
          state = snapshot.state.get
          setDeliverySnapshot(deliverySnapshot)
        case unknown => log.error(s"Received unknown message in receiveRecover:$unknown")
      }
    
      override protected def saveSnapshotF(state: ProductDetails): Unit =
        if (lastSequenceNr % snapshotInterval == 0 && lastSequenceNr != 0) {
          val binary: Array[Byte] = deliverySnapshotSerializer.toBinary(getDeliverySnapshot)
          saveSnapshot(ProductDetailsWriteActorStateMessage(Some(state), ByteString.copyFrom(binary)))
        }
    thank you
    aludwiko
    @aludwiko
    @gabriele83 ok awesome
    Bhavya Aggarwal
    @bhavya411
    @bhavya411
    Hi Guys, we are seeing a problem with our Dev Environment where there is a lot of reads happening from cassandra even when no active users of the service
    the number of reads have gone up from 200-300 per seconds to 15000 reads per second
    The only change that we did was to change the replication factor to 3
    DEBUG akka.persistence.cassandra.query.EventsByTagPublisher - backtracking delayed query for tag [com.yukon.ngh.circles.impl.events.CircleEvent9] from [20180305] [b67b4000-db52-11e5-8080-808080808080 (2016-02-25 00:00:00)] limit [500] kubernetes.container.name:circles kubernetes.namespace:default kubernetes.replicaset.name:circles-596b79f9b7 kubernetes.labels.app:circles kubernetes.labels.metrics:kamon kubernetes.labels.pod-template-hash:596b79f9b7 beat.name:filebeat-v2mm4 beat.hostname:filebeat-v2mm4 beat.version:6.4.2 host.name:filebeat-v2mm4 source:/var/lib/docker/containers/6278a882d90a86241c998d96c4da7d775d69686836c22993899a4cf345616fa2/6278a882d90a86241c998d96c4da7d775d69686836c22993899a4cf345616fa2-json.log offset:33,552 input.type:docker prospector.type:docker _id:P47P02oB-UFB28ldrxzW _type:doc _index:filebeat-6.4.2-2019.05.20 _score: -
    These logs are repeatedly coming
    We cleared the keyspaces as well but still the problem persists
    Its related to Lagom and we are using version 0.60
    Gabriele Favaretto
    @gabriele83

    Hi Guys,
    what is the correct place to insert this configuration: repair-by-discard-old ?
    The akka documentation says it should be here: akka.persistence.journal.cassandra-journal.replay-filter.mode = repair-by-discard-old
    While the cassandra plugin tests put the configuration here: cassandra-journal.replay-filter. mode = repair-by-discard-old

    Is there a way to have evidence of the correct functioning of the configuration?

    Tuhin Dey
    @Heisenbug-16
    Hello, We are using lagom which uses akka. As i was going to the internals of Lagom persistent entity which uses akka persistent actor, I found that akka persistent actor uses a single actor to write the events to journal. Which means all the write events of every persistent entity should go through this single actor itself for writing.
    I have also validated the same using breakPoints. I had a break point on AsyncWriteJournal - WriteMessage case. I had multiple requests(api hits) for two Different entities which lead to some state change of their respectice entitiy(PersistentEntity) via event. Although these requests were processed in parallel, while writing messages the events were processed sequentially irrespective of the entity. I also validated the actorRef id used to write these events and they were same for all events and for all entities(I tried for atleast 15-20 different entity id).
    Actually i expected that akka would be using one single Actor per EntityType per shard, as beyond this we do not need to care about ordering of events (I think so). Using a single actor to write all the events should cause bottleneck in case off high load. Am i missing something here?
    Sven Ludwig
    @sourcekick

    Hi all, these days, do Cassandra client-side sessions need to be closed, I mean by some explicit code written by the user of Akka Persistence?

    Also, is here someone who ever tried running Akka Persistence Cassandra on top of Microsoft Azure CosmosDB? If so, what are your experiences, did it work smoothly?

    Jan Ypma
    @jypma
    1) no, the session is open throughout the ActorSystem lifecycle, afaik.
    2) I think it it has trouble with ScyllaDB, but I haven't heard of anyone trying CosmosDB yet. Why not give it a spin? :)
    struijenID
    @struijenID
    @Heisenbug-16 No experience with Lagom, but the persistence ID of the event matters. If several entities want to write an event with the same persistence ID, it stands to reason only one entity would be responsible for the actual persisting. Otherwise you get stream corruption/out of order.
    Matheus Hoffmann
    @Hoffmannxd
    anyone can help me to use 2 differents keyspaces, in a akka-persistence project?
    In my project i would like to create a keyspace to each entity (account, banks, transactions and etc)
    Roy de Bokx
    @roy-tc

    @Hoffmannxd I faced the same issue and managed to implement it using these docs:
    https://doc.akka.io/docs/akka/current/persistence.html#multiple-persistence-plugin-configurations and

    I was able to select my persistence plugin configuration (with a separate keyspace) for each EntityType by using override final def journalPluginId = "user-actor-journal" and providing something like this in my application.conf:

    {
      user-actor-journal = ${cassandra-journal} {
        keyspace = "user"
        query-plugin = "user-actor-journal.cassandra-query-plugin"
        cassandra-query-plugin = ${cassandra-query-journal} {
          write-plugin = "user-actor-journal"
        }
    }
    Daulet Kabdiyev
    @SirMullich

    @roy-tc thanks for hint. I tried this approach in one of services and it worked great. However, when I try to did it in another service I keep getting errors (when cluster-sharding initializes and creates cassandra keyspaces):

    [ERROR] [29/07/2019 18:09:22] [akka.actor.ActorSystemImpl(DisbursalSystem)] a.a.ActorSystemImpl | Uncaught error from thread [DisbursalSystem-cassandra-plugin-default-dispatcher-37]: akka/util/ccompat/package$, shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[DisbursalSystem]
    java.lang.NoClassDefFoundError: akka/util/ccompat/package$
        at akka.stream.impl.SeqStage$$anon$3.<init>(Sinks.scala:305)
        at akka.stream.impl.SeqStage.createLogicAndMaterializedValue(Sinks.scala:304)
        at akka.stream.stage.GraphStageWithMaterializedValue.createLogicAndMaterializedValue(GraphStage.scala:47)
        at akka.stream.impl.GraphStageIsland.materializeAtomic(PhasedFusingActorMaterializer.scala:674)
        at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:491)
        at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:450)
        at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:443)
        at akka.stream.scaladsl.RunnableGraph.run(Flow.scala:620)
        at akka.stream.scaladsl.Source.runWith(Source.scala:106)
        at akka.persistence.cassandra.snapshot.CassandraSnapshotStore.metadata(CassandraSnapshotStore.scala:262)
        at akka.persistence.cassandra.snapshot.CassandraSnapshotStore.$anonfun$loadAsync$1(CassandraSnapshotStore.scala:108)
        at scala.concurrent.Future.$anonfun$flatMap$1(Future.scala:303)
        at scala.concurrent.impl.Promise.$anonfun$transformWith$1(Promise.scala:37)

    Default keyspace that is in cassandra-journal config entry is created in Cassandra, and keyspace from custom config entry is not created

    I believe it must be some config problem. Here is my application.conf:
    cassandra-journal {
      contact-points = ${CASSANDRA}
      keyspace = ${CASSANDRA_KEYSPACE}
      table = "events"
    }
    
    cassandra-snapshot-store {
      contact-points = ${CASSANDRA}
      keyspace = ${CASSANDRA_SNAPSHOTS_KEYSPACE}
      table = "snapshots"
    }
    
    cassandra-reference-entity-journal = ${cassandra-journal} {
      keyspace = ${CASSANDRA_REFERENCE_ENTITY_KEYSPACE}
      query-plugin = "cassandra-reference-entity-journal.cassandra-query-plugin"
      cassandra-query-plugin = ${cassandra-query-journal} {
        write-plugin = "cassandra-reference-entity-journal"
      }
    }
    
    cassandra-reference-entity-snapshot-store = ${cassandra-snapshot-store} {
      keyspace = ${CASSANDRA_REFERENCE_ENTITY_SNAPSHOTS_KEYSPACE}
    }
    Christopher Batey
    @chbatey
    Looks like mixed akka versions. Should have a warning logged on startup
    Daulet Kabdiyev
    @SirMullich
    It was mixed akka versions indeed, thanks :+1:
    Dragos Manolescu
    @polymorphic
    Hi there, I've built v0.99 locally but the tests start failing right away with com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1:9042] Cannot connect. This is on a Mac w/ Oracle JDK 1.8.0_181. Anybody else ran into this problem?
    Tim Moore
    @TimMoore
    @polymorphic the tests assume that you're already running Cassandra somehow, listening on localhost:9042. See https://github.com/akka/akka-persistence-cassandra/blob/master/CONTRIBUTING.md#running-the-tests
    Dragos Manolescu
    @polymorphic
    Thanks @TimMoore, that did it! I missed the change from 5/17, previous versions were using the CassandraLauncher for that.
    Dragos Manolescu
    @polymorphic
    This prompts the question @TimMoore : is the CassandraLauncher and the start methods in CassandraLifecycle dead code? If so, perhaps something I could contribute ;)
    Tim Moore
    @TimMoore
    @polymorphic other people do use it, but the team has considered whether to deprecate it akka/akka-persistence-cassandra#536
    Tomasz Pasternak
    @tpasternak
    Hello, I have a question about eventual-consistency-delay and an assumption that after five seconds all cassandra nodes are synchronized. Is it generally recommended to use akka-persistence-cassandra in a critical system where no event may be lost? If yes, is there any other way to guarantee that other than just putting large value to the delay setting?
    Carlos Angeles
    @cdelosangeles80_twitter
    Hello all, is there a best practices for using deleteMessages() and deleteSnapshot()? We are currently receiving the SaveSnapshotSuccess event and issuing deleteMessages() then receiving the DeleteMessagesSuccess event and issuing deleteSnapshot(). The sequence number we derive from the SaveSnapshotSuccess.metadata().sequenceNr() and subtracting (snapshot interval integer * number of snapshots to keep). We are seeing Cassandra having slowdowns. Is anyone doing deletes as part of a batch process? Any ideas would be great. Thanks!
    William Kinaan
    @WilliamKinaan
    Hello all, I am new to Akka and Akka persistance, we decided to use Cassandra Journay plugin, I could insert some events using it. My question is: what does each field in the akka.messages table mean? and how do I populate them from akka? using persist(event ... ) populate just some fields not all of them #cassandra #akka-persistence
    Artsiom Miklushou
    @mikla
    Hi,
    Why EventsByTagaMigration tries to create keyspace even if keyspace-autocreate=false .
    def createTables(): Future[Done] = {
        log.info("Creating keyspace {} and new tag tables", config.keyspace)
        for {
          _ <- session.executeWrite(createKeyspace)
    Is it done intentionally?
    Artsiom Miklushou
    @mikla
    Doesn't this migration supposed to be run on existing schema?
    Artsiom Miklushou
    @mikla

    Also,

    ```2019-10-17 15:07:04,969 - [WARN] - com.datastax.driver.core.RequestHandler : Query '[17 statements, 187 bound values] BEGIN UNLOGGED BATCH INSERT INTO nevos.tag_views(
    tag_name,
    timebucket,
    timestamp,
    tag_pid_sequence_nr,
    event,
    event_manifest,
    persistence_id,
    sequence_nr,
    ser_id,
    ser_manifest,
    writer_uuid

        ) VALUES (?,?,?,?,?,?,?,?,?,?,
    
        ?); INSERT INTO nevos.tag_views(
        tag_name,
        timebucket,
        timestamp,
        tag_pid_sequence_nr,
        event,
        event_manifest,
        ... [truncated output]' generated server side warning(s): Batch for [nevos.tag_views] is of size 8340, exceeding specified threshold of 5120 by 3220.```

    This batch insert fails for me even on quite small amount of data.