Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Jan 07 2019 01:26
    scullxbones commented #216
  • Jan 06 2019 15:04
    thjaeckle commented #216
  • Jan 06 2019 14:58
    thjaeckle synchronize #216
  • Jan 06 2019 02:53
    scullxbones commented #216
  • Jan 04 2019 15:08
    thjaeckle synchronize #216
  • Jan 04 2019 12:02
    thjaeckle synchronize #216
  • Jan 04 2019 11:10
    thjaeckle opened #216
  • Jan 03 2019 15:52
    pepite commented #214
  • Jan 03 2019 15:09
    scullxbones commented #214
  • Jan 03 2019 15:01
    scullxbones commented #215
  • Jan 03 2019 14:25
    thjaeckle opened #215
  • Jan 03 2019 09:29
    gbrd commented #193
  • Jan 02 2019 22:03
    pepite commented #214
  • Jan 02 2019 21:33
  • Jan 02 2019 15:50
    gbrd commented #193
  • Jan 02 2019 15:50
    gbrd commented #193
  • Jan 02 2019 15:48
    yahor-filipchyk commented #37
  • Jan 02 2019 15:12
    thjaeckle commented #193
  • Jan 02 2019 15:01
    gbrd commented #193
  • Dec 29 2018 14:14
    scullxbones commented #37
Neil J. L. Benn
@njlbenn
It looks to me like it reads the entire realtime Mongo collection on start up and then does the filtering in the application -- rather than sending the offset ID as part of the Mongo query.
Is that a deliberate design?
I was wondering because for the CurrentEventsByTag query we do send the offset ID as part of the Mongo query: https://github.com/scullxbones/akka-persistence-mongo/blob/master/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoReadJournaller.scala#L122
So I was wondering if the startup query-behaviour should be the same for both the journal and the realtime collections
Brian Scully
@scullxbones

Hi @njlbenn - I can't think of a good reason why these would be different, other than maybe one (current) got fixed and the other didn't.

it reads the entire realtime Mongo collection on start up

it does filter by tag, but you're right it still reads < offset off disk just to discard the documents on the scala side

hi @rurikovich - are you still seeing this error? missed your chat message
Neil J. L. Benn
@njlbenn
@scullxbones thanks for the reply :thumbsup: . If I get a chance maybe I'll do a quick PR to fix that. nothing urgent of course, just might be a useful optimisation.
James Brems
@JamesBrems78_twitter
Hello,
@scullxbones: If I do not change the default setting for connection pool (https://github.com/scullxbones/akka-persistence-mongo/blob/master/scala/src/main/resources/reference.conf), it is max. 100 connections. As the driver is setting up 2 connections (event journal + snapshots), does it open max. 200 DB connections then? Or do journal and snapshots share the max DB connections?
Gaël Bréard
@gbrd
Hi, scala driver seems not use optimisation "snapshot cache" is there a reason ?
Gaël Bréard
@gbrd
(open PR #261 for that)
Brian Scully
@scullxbones
@JamesBrems78_twitter - the pools should be shared between journal and snapshots due to the shared ScalaMongoDriver instance.
James Brems
@JamesBrems78_twitter
@scullxbones : ok thanks. Does akka-persistence-mongo write an mbean or a log where I could monitor the number of active DB connections?
Brian Scully
@scullxbones

@JamesBrems78_twitter sorry lost track of this.

No, if that capability existed it would be inside the actual driver i think. This plugin exposes some dropwizard timing metrics, but the driver internals are a black box.

Fabian Page
@fabianpage
Hi
I'm trying to write an eventadapter with akka-persistence-mongo. Has somebody an example how to do that? (My manifest, toJournal and fromJournal functions are never called)
Fabian Page
@fabianpage
(Also if i try to make a binding to "reactivemongo.bson.BSONDocument" i don't get any calls on my fromJournal or toJournal methods)
akka-contrib-mongodb-persistence-journal {
event-adapters {
user-time-booking-edited = "models.UserTimeBookingEdited_V1_V2"
}
event-adapter-bindings {
"models.UserTimeBookingEdited" = user-time-booking-edited
"reactivemongo.bson.BSONDocument" = user-time-booking-edited
}
}
Fabian Page
@fabianpage
I found my error, the event-adapter-bindings have to point to the sealed trait not to the case class which extends this sealed trait.
Scot Mcphee
@scotartt
Hello @scullxbones I want to switch my DynamoDB persistence in akka over to use MongoDB, using the reactive driver version. I'm running Akka 2.5x on AWS Corretto JDK 8. But my MongoDB is Mongo DB Atlas, which is usually a 4.0 or 4.2 version of Mongo. The documentation only seems to go up to v3.6 of Mongo. Is MongoDB 4.2 supported? And can I use the 'mongodb+srv://' format connection strings which Atlas requires?
Brian Scully
@scullxbones

Hi @scotartt - we currently verify the test suite on mongo versions 3.0 to 3.6. I haven't had a chance to update the travis configuration to run the suite against 4.0 or 4.2, but given the core subset of mongo's APIs and query language we use i'd be surprised if the plugin didn't work on those.

Atlas SNI urls are supported as of PR #194, this should be available for any of the 3 drivers supported by the plugin.

Scot Mcphee
@scotartt
Thanks for the info, I'll report back my experiences setting it up with Atlas.
Vikram
@vikram_darsi_twitter
Hi Team, we currently use default akka persistence plugin "leveldb" and planning to mirgate to mongodb, can someone please provide the link of akka.conf/reference.conf where journal and snapshots plugin refers to mongodb's?
Brian Scully
@scullxbones
@vikram_darsi_twitter here's a link to the docs covering installation and configuration for akka 2.5
Gaël Ferrachat
@gael-ft

Hello here,

I am trying to implement persistent actors (with akka typed) with (https://github.com/akka/akka-samples/tree/2.6/akka-sample-cqrs-java)[Akka Sample CQRS] and Rx implementation of the plugin.

I am facing an issue with 'eventsByTag' query because the stream is closing (should remain open).
I opened an issue in the repo, but would like to ensure that there is no tricks in the config or the mongouri for this to work properly ? thanks

Brian Scully
@scullxbones
hi @gael-ft - no there shouldn't be. all of the "live" queries (don't begin with the word current) should remain open. you can try other queries as a test e.g. eventsByPersistenceId to see if those close
alternately you can try the official-scala driver, as is is possible the bug is in just in the rxmongo flavor
i'm assuming of course the client code isn't swallowing errors in the stream. if the mongo database connection is flaky and drops that would cause the query to stop
Gaël Ferrachat
@gael-ft

Thanks you for your answer @scullxbones ,

I tried with both driver implementation :/

For example, taking into consideration that I previously persisted an event, then commented code and write this quick test:

object TestApp extends App {
  val system = ActorSystem[Guardian.Command](Guardian(), "TestApp")

  implicit val s = system

  RestartSource
    .withBackoff(minBackoff = 500.millis, maxBackoff = 5.seconds, randomFactor = 0.1, maxRestarts = -1) { () =>

      PersistenceQuery(system)
        .readJournalFor[ScalaDslMongoReadJournal](MongoReadJournal.Identifier)
        .allEvents()
        .log(name = "tester stream")
        .addAttributes(
          Attributes.logLevels(
            onElement = Attributes.LogLevels.Debug,
            onFinish = Attributes.LogLevels.Info,
            onFailure = Attributes.LogLevels.Error))
        .map { e =>
          system.log.debug("hello")
          e
        }

    }
    .runWith(Sink.ignore)

  // My app normally starts here, Guardian will just wait for messages, but nothing will happen except stream test above
  //system ! Guardian.DatabaseCheckup
}

Every 5 seconds (max) I can see a debug log hello followed by [tester stream] Upstream finished.

with both drivers
Gaël Ferrachat
@gael-ft
running mongo-community v4.0 on macOS
Gaël Ferrachat
@gael-ft
does realtime coll has anything do to with this ? does it need to be enabled ?
Brian Scully
@scullxbones
Ah of course. You were right after all. Without the realtime collection enabled, there are no live events. Isn't that enabled by default?
Gaël Ferrachat
@gael-ft

Ah ^^' know what is this collection for was on my todo list ...
Now that it is activated (back, yes it is by default) I am getting

reactivemongo.core.actors.Exceptions$PrimaryUnavailableException: MongoError['No primary node is available! (Supervisor-1/Connection-1)']

You have a clue for this ? Google answers on this are very different from one to another

Gaël Ferrachat
@gael-ft

Ok looks like I handled it modifying some configuration :)
I'll close the opened issue after some testing on my side.

Thanks for your help, and your plugin :)

Brian Scully
@scullxbones
Sure thing, thanks for working through it!
Gaël Bréard
@gbrd
hi @scullxbones in this doc we can read that metadata collection is emptied in migration tool, isn't is dangerous ? is it really rebuilt ?
Gaël Bréard
@gbrd
Here is what I understood : metadata contains maxSequenceNumber that is used when there is no event (only snapshots)
Brian Scully
@scullxbones

metadata contains maxSequenceNumber that is used when there is no event (only snapshots)

Yes that's correct

we can read that metadata collection is emptied in migration tool, isn't is dangerous ? is it really rebuilt ?

this seems like a mistake in the documentation, but i have to take a look at the code

From the doc:

Additionally, we do the same for snapshots, and remove all records from "akka_persistence_metadata" capped collection. This capped collection will be built again through usual event sourcing process...

But realtime is the capped collection, metadata is as you say storage of the max sequence

Brian Scully
@scullxbones
no unfortunately, looking at the code it does delete from metadata. that seems like a bug
Gaël Bréard
@gbrd
yes, for the case where migration tool is launched when there is no event in journal collection (because cleaned + only one snapshot kept for instance)
Gaël Bréard
@gbrd
open issue #338
I don't remember details of code : can we keep maxSequenceNumber ? sequence number don't change when moving documents in other collection ?
The fix could be just to stop cleaning up metadata collection ?
Brian Scully
@scullxbones
@gbrd yes that sounds right to me. performing the migration should not change sequence numbers at all, it's purely about separating different pids into different collections
Gaël Ferrachat
@gael-ft

Hello @scullxbones,

I saw that Akka team now provides AkkaProjection for to handle event sourced processing. Did you have a look at it ? If yes, do you plan to support it at some point ?

Thanks,

Brian Scully
@scullxbones
hi @gael-ft - I hadn't seen that yet, thanks for bringing it to my attention. Seems like it formalizes read projections. Will have to research it more.
That said, I don't immediately see anything extra that the plugin would need to do. Beyond fixing the global sequence problem that we struggle with re: eventsByTag probably.
It would be useful to consolidate the DB, so a MongoDB offset store maybe? Again this is very new to me, so not sure. I don't see a clear "offset store" plugin SPI, so that could be a bit unstable.
Gaël Ferrachat
@gael-ft

@scullxbones Looking at CassandraProjection implementation and akka cqrs sample, it abstracts the offset store, how commit is performed, and some stream management.

For example we could have MongoProjection.atLeastOnce(...) or MongoProjection.grouped(...) for batches...
The offset store could store object such as:

{
_id: ObjectId
projectionName: String // eg. PersonProjection if we handle events about a Person entity
projectionKey: String // eg. personTag-1 for Person events tagged
offset: ObjectIdOffset. // from your lib
}
`

Idea is that there is some boilerplate code around event processing and with this AkkaProjection we can focus on event processing itself.
Of course, we could have our own implementation, but I think it is quite common code.

https://github.com/akka/akka-projection/blob/master/akka-projection-cassandra/src/main/scala/akka/projection/cassandra/internal/CassandraProjectionImpl.scala