Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Stephan Emmerich
@StephanEmmerich
I am still integrating a Kafka topic, but now I want to test the publish method. I followed the documentation, but the ServiceTest.Server fails at server.client(MyService.class).myTopic().subscribe(). It gives a 'Attempt to get a topic, but there is no TopicFactory provided to implement it.' exception. When I debug the code the JavadslServiceClientImplementor doesn't get a TopicFactoryProvider. Did I miss something? Because the documentation doesn't mention anything about a topicFactory.
Indefinite Integral
@catchergeese_twitter
Hi! Does anyone have any clues what might cause the following error or how to debug it? Happens during development. ERROR org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=/system/sharding/kafkaProducer-drafts/singleton/singleton/producer] Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for connection 0
Stephan Emmerich
@StephanEmmerich
After deep debugging I figured out the problem with my 'no TopicFactory provided' issue. First of all, if I run my service normally (mvn lagom:runAll), it works as expected. But when running as test with test server, then I found out I have a circular dependencies issue. Guice detects this when running the test. This occurred due to fact that I injected my api in the impl module, because I need to do some migration. When I removed the injection of the service, it works normally.
Now, the only thing left is, is correct behaviour by the testkit/testserver, as it isn't an issue when running in 'normal' mode?
And in addition, perhaps this issue can be described somewhere, such that other people may know where to look having the same issue.
Second thoughts, my synchronizer should have not been injected. I will try to work around this.
nikhilaroratgo
@nikhilaroratgo
@ihostage I managed to test by using the embedded DB and then starting it up before the test cases
@ihostage how do you suggest I write tests for EventProcessors
Sergey Morgunov
@ihostage
@nikhilaroratgo :+1:
nikhilaroratgo
@nikhilaroratgo
@ihostage in the Event handlers of EventProcessor there is some logic which has to be tested but testing this class in isolation is not possible.The thing is that the handlers get called once a command arrives .So I am wondering if the only way to test it using the approach mentioned in https://github.com/lagom/lagom/blob/master/docs/src/test/java/docs/home/persistence/CqrsIntegrationTest.java
lynxpluto
@lynxpluto
@nikhilaroratgo what do you mean by EventProcessor. A topic publisher or ReadSideProcessor ?
nikhilaroratgo
@nikhilaroratgo
@lynxpluto I mean ReadSideProcessor
@lynxpluto Any class which extends ReadSideProcessor<Event>()
lynxpluto
@lynxpluto
@nikhilaroratgo ok. You may create a repository class with the singleton scope and the injection of the com.lightbend.lagom.javadsl.persistence.jpa.JpaReadSide for example. In this class you may implement methods to access the read-side data. So you are able to get records from the read-side DB and assert them in your tests.
nikhilaroratgo
@nikhilaroratgo
@lynxpluto Yes ,I am doing something like this.But nice idea of using the already existing repository..:-) .Thanks
lynxpluto
@lynxpluto
In my case the singleton scope is necessary only to register ReadSideProcessor once the repository class is initialized eagerly on startup.
nikhilaroratgo
@nikhilaroratgo
@lynxpluto exactly,This problem i already faced yesterday ,Where I found that I had to make the Repository Singelton because I register Readprocessor in in constructor of Repository
@lynxpluto so to summarise ,In order to test the ReadProcessor I will initiate the command from within the test and then using the repository I will read the data from DB and assert it ..Thats all I could think of to test the ReadProcessor
Sergey Morgunov
@ihostage
@nikhilaroratgo Yep :wink:
Brian Miller
@bkmdev
Does anyone know what's up with prod-logback.xml and bash-like env vars in there? ie: ${application.home:-.}, does this actually work? we're seeing where when we point to that upon service startup, no log file is actually created.
Tomasz Pasternak
@tpasternak
hello, just a question - is it safe to produce multiple message broker messages from a single event and concat them with mapConcat(identity) ?
Adam Ritter
@adamgerst
Does anyone know how to configure how often a ReadSideProcessor (JDBC/SLICK) polls the tables?
Tim Moore
@TimMoore
@adamgerst jdbc-read-journal.refresh-interval = 5s or whatever value you like in application.conf: https://github.com/dnvriend/akka-persistence-jdbc/blob/afdc79cd349734852863f2e3c49dad3335540d90/src/main/resources/reference.conf#L334-L335
Michael
@grouzen

Hi guys.

I'm trying to emulate a synchronous API for C(R)UD operations.
I found that there is persistentEntityRegistry.eventStream mechanism.

That's the example of my current code:

// first, call my entity
view <- refFor[WalletEntity](walletId)
               .ask(command)
               .map { ev =>
                 ev.into[WalletView]
                   .withFieldConst(_.balance, 0L)
                   .withFieldConst(_.units, ev.units)
                   .transform
               }
               .recoverWith {
                 case WalletCreateRejectedException(message) => Future.failed(BadRequest(message))
               }
 // then, wait on a particular event that will be generated by the entity
      ev <- persistentEntityRegistry
             .eventStream(WalletEvent.tag, Offset.noOffset)
             .collect {
               case ev: EventStreamElement[WalletEvent] if ev.entityId === walletId.toString => ev.event
             }
             .runWith(Sink.head)

The question is how to get current offset. Is there way to do this?
Also, I'm not sure that this is a correct way to solve the problem. My first thought was to use some message broker, but then I found this functionality inside Lagom.
I don't know what restrictions and performance issues this way has.

I will appreciate any help with my problem.
Thanks!

Sergey Morgunov
@ihostage

@grouzen Hi, Michael! It’s very strange what you doing :scream: Why?
PersistentEntity is a implementation of CQRS pattern, where entity updated by Command (on write-side) and her state getting by Query (on read-side).

CRUD is very different pattern. Are you sure, that you need PersistentEntity?

If you need CRUD pattern for service, I highly recommend don't use PersistenteEntity for this.
Michael
@grouzen
Hm, Okay. I need to clarify this ;)
Lagom implies that create/update/delete service calls will return the response asynchronously.
I need make them synchronous somehow, that's the requirement of the architecture we have in our company.
Sergey Morgunov
@ihostage
Do you need a history of change your entity?
Michael
@grouzen
For example - i have API end point that creates some long running transaction which involves several entities in the background. Lagom returns the answer immidiatly, but I have to wait when transaction will finish and return response after that to the caller.
Sergey Morgunov
@ihostage
Maybe do you really need to implement "classical" CRUD service without CQRS and PersistentEntity?
Carl
@carlpayne
Is it considered good/bad practice to include the entityId on (internal) events in Lagom? E.g. if my entityId is "auctionId", and I have an event case class AuctionCreated(auctionId: String), is this redundant? On the api (external) side, we could add the entityId to external events for outside consumption, but can't think why it would be needed internally?
Michael
@grouzen
The one possible solution is to use kafka.
  1. send command to the entity
  2. subscribe to kafka topic and wait for a particular event inside ServiceCall, http call will hang for a while.
  3. when the work will be done, correspondent event processor will push the event into kafka
  4. profit
@ihostage no, I need CQRS/ES. I already have the complete system. I need to emulate this synchronous CRUD-style http calls, that's it!
Tim Moore
@TimMoore
@grouzen you might be able to use the distributed pub-sub API for this https://www.lagomframework.com/documentation/1.5.x/scala/PubSub.html
Create a Promise that is completed by a subscriber to the pub-sub topic, return its Future from the service call. Then, in your persistent entity after-persist callback or read side processor, publish the event to the topic.
This is based on Akka clustering and doesn't require Kafka
Michael
@grouzen
@TimMoore that's cool idea! Thanks a lot! I will research on this.
Sergey Morgunov
@ihostage
Or just use a read-only command, that’s it :joy:
Tim Moore
@TimMoore
that's fine if the response only depends on the one entity
or it doesn't have to be read-only, you could have your write command return the reponse... however "long running transaction which involves several entities in the background" led me to believe that isn't an option
Sergey Morgunov
@ihostage
@TimMoore can you look at my question on contributors channel? :smile:
kotdv
@kotdv
"long running transaction which involves several entities in the background" led me to believe that isn't an option
side effecting in PE again
recipe for disaster
lynxpluto
@lynxpluto
@carlpayne I think it is normal to include entityId for those events where it is necessary. For example it may be needed in the read side processor event handlers to identify a DB record to update
Adam Ritter
@adamgerst
@TimMoore Ah thanks. Also, can you explain what the differences are for the akka-persistence jars/libraries and the lagom-persistence jars/libraries?
Sergey Morgunov
@ihostage
@adamgerst akka-persistence-* are libraries for persistence mechanics of Akka PersistentActor. lagom-persistence-* are libraries for persistence mechanics of Lagom PersistentEntity (both side). PersistentEntity not equals PersistentActor.
Adam Ritter
@adamgerst
What is the difference between the two actors? i.e PersistentEntity and PersistentActor
lynxpluto
@lynxpluto
Carl
@carlpayne
Thanks @lynxpluto, hadn't thought of that one :-)