RabbitMQ stream-based client for Scala built on top of Fs2 and Cats Effect -> https://fs2-rabbit.profunktor.dev/
mergify[bot] on master
Update cats-effect to 3.3.12 Merge pull request #734 from sc… (compare)
mergify[bot] on master
Update circe-core, circe-generi… Merge pull request #733 from sc… (compare)
I have an application which creates a publisher and consumer to interact with an AMQP server.
When the app receives an SIGINT it needs to send a logout message and wait for the response before
shutting down. I got some hint from @Daenyth to use Resource.makeCase but I'm failing to see where
to inject this. Maybe you can help me out again.
val resources = for {
conn <- R.createConnection
resChan <- R.createChannel(conn)
reqChan <- R.createChannel(conn)
} yield (resChan, reqChan)
resources.use {
case (responseChannel, requestChannel) =>
for {
publisher <- createPublisher(R)(requestChannel)
consumer <- createConsumer(R)(responseChannel)
_ <- program(consumer, publisher).compile.drain
} yield ()
}
onFinalize
on program
should be able to get you done
Thanks @Daenyth , I just tried this:
resources.use {
case (responseChannel, requestChannel) =>
for {
publisher <- createPublisher(R)(requestChannel)
consumer <- createConsumer(R)(responseChannel)
_ <- program(consumer.onFinalize(IO(println("finalize consumer"), publisher)
.onFinalize(IO(println("finalize program")
.compile
.drain
} yield ()
}
it prints:
finalize consumer
finalize program
consumer
automaticRecovery = false
. On connection error I'm seeing WARN com.rabbitmq.client.impl.ForgivingExceptionHandler - An unexpected connection driver error occured (Exception message: Connection reset)
. As I have to send some messages to the broker after reconnect I would prefer to let the whole app stop and cleanly restart. Is there a way to do that?
Oh! you're saying typically, when forgetting to aggregate
:joy:
moment of panic passed