Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • Jul 30 16:06
    stale[bot] closed #732
  • Jul 13 18:43
    stale[bot] labeled #732
  • Jul 13 18:43
    stale[bot] commented #732
  • Jul 11 22:26
    simpadjo opened #735
  • Jun 10 19:37
    stale[bot] closed #706
  • Jun 10 19:37
    stale[bot] closed #668
  • May 31 15:15
    tyrcho closed #698
  • May 31 00:29
    stale[bot] labeled #668
  • May 31 00:29
    stale[bot] labeled #698
  • May 31 00:29
    stale[bot] labeled #706
  • May 31 00:29
    stale[bot] commented #668
  • May 31 00:29
    stale[bot] commented #698
  • May 31 00:29
    stale[bot] commented #706
  • May 21 08:40

    mergify[bot] on master

    Update cats-effect to 3.3.12 Merge pull request #734 from sc… (compare)

  • May 21 08:40
    mergify[bot] closed #734
  • May 21 08:34
    scala-steward opened #734
  • May 20 02:17

    mergify[bot] on master

    Update circe-core, circe-generi… Merge pull request #733 from sc… (compare)

  • May 20 02:17
    mergify[bot] closed #733
  • May 20 02:12
    scala-steward opened #733
  • May 14 10:30
    scala-steward opened #732
Adriani Furtado
@Adriani277
Hi all, I was looking at the Publishing Algebra and noticed that most return types
are F[A => F[Unit]]. Is it possible to have a SimplePublish that takes a payload as a parameter and simply publishes the message?
Something like
def simplePublish[A](
      payload: A,
      channel: AMQPChannel,
      exchangeName: ExchangeName,
      routingKey: RoutingKey
  )(implicit encoder: MessageEncoder[F, A]): F[Unit] =
FlatMap[F].flatMap(client.createPublisher(exchangeName, routingKey))(_.apply(payload))
Gavin Bisesi
@Daenyth
I mean it won't be broken
I don't recall off hand what effect is being captured in the "create publisher" method
better to reuse it if you can, but wouldn't be broken if you didn't, I think
Adriani Furtado
@Adriani277
Alright I see, I suspected there would be some performance hit by re-creating the publisher
Sebastian Voss
@sebastianvoss

I have an application which creates a publisher and consumer to interact with an AMQP server.

When the app receives an SIGINT it needs to send a logout message and wait for the response before
shutting down. I got some hint from @Daenyth to use Resource.makeCase but I'm failing to see where
to inject this. Maybe you can help me out again.

val resources = for {
  conn    <- R.createConnection
  resChan <- R.createChannel(conn)
  reqChan <- R.createChannel(conn)
} yield (resChan, reqChan)

resources.use {
  case (responseChannel, requestChannel) =>
    for {
      publisher <- createPublisher(R)(requestChannel)
      consumer  <- createConsumer(R)(responseChannel)
      _         <- program(consumer, publisher).compile.drain
    } yield ()
}
Gavin Bisesi
@Daenyth
below reqChan in resources
oh well no..
actually, onFinalize on program should be able to get you done
Sebastian Voss
@sebastianvoss

Thanks @Daenyth , I just tried this:

resources.use {
  case (responseChannel, requestChannel) =>
    for {
      publisher <- createPublisher(R)(requestChannel)
      consumer  <- createConsumer(R)(responseChannel)
      _         <- program(consumer.onFinalize(IO(println("finalize consumer"), publisher)
          .onFinalize(IO(println("finalize program")
          .compile
          .drain
    } yield ()
}

it prints:

finalize consumer
finalize program
Gavin Bisesi
@Daenyth
wait, what type is Consumer there?
not a Resource, is it?
or stream maybe
Sebastian Voss
@sebastianvoss
Stream[IO, AmqpEnvelope[String]]
Gavin Bisesi
@Daenyth
ah right
Sebastian Voss
@sebastianvoss
createConsumer has this signature: def createConsumer(R: RabbitClient[IO])(implicit channel: AMQPChannel): IO[Stream[IO, AmqpEnvelope[String]]]
Gavin Bisesi
@Daenyth
ok then it needs to happen in program
or alternately, you could have it on consumer
because that's inside the scope of publisher
Sebastian Voss
@sebastianvoss
the thing is I need to keep the consumer stream running to wait for the response. When I tried to put it there the consumer stream got closed before the response came in.
Gavin Bisesi
@Daenyth
You need the consumer to come before the publisher then
I see
mm
yeah so like consumer -> producer -> Stream.bracket and in the bracket, do your cleanup logic there
Then the closure of both consumer and producer waits for that
but in general I do recommend you rethink the pattern because you can't prevent or handle a kill -9
So have an idea of what cleanup you do when the process for this code just vanishes
Sebastian Voss
@sebastianvoss
OK cool, let me try this. In case the shutdown is not graceful the client will reconnect and continue where it left off. This is only needed for graceful shutdown/logout.
Sebastian Voss
@sebastianvoss
I set automaticRecovery = false. On connection error I'm seeing WARN com.rabbitmq.client.impl.ForgivingExceptionHandler - An unexpected connection driver error occured (Exception message: Connection reset). As I have to send some messages to the broker after reconnect I would prefer to let the whole app stop and cleanly restart. Is there a way to do that?
Gabriel Volpe
@gvolpe
@bpholt I set up sbt-ci-release so now the publishing to Sonatype is automated. If you want to try out your changes, you can use this SNAPSHOT version: 0.0.0+1-0af65537-SNAPSHOT. I'll make a release in the next few days when I get some time...
Brian P. Holt
@bpholt
Oh, awesome! I'll give that a shot
Brian P. Holt
@bpholt
is there a specific resolver I need to use to pull that in? I haven't tried using published snapshots before
Brian P. Holt
@bpholt
found it:
resolvers += Resolver.sonatypeRepo("snapshots")
Brian P. Holt
@bpholt
Shoot, I forgot to add the new fs2-rabbit-testkit to the aggregated projects in fs2-rabbit-root, so I don't think it was published
Gabriel Volpe
@gvolpe
Huh! Well that's easy to do and when it gets merged into master you will have a SNAPSHOT ready to try out in a few mins
Gavin Bisesi
@Daenyth
Forgetting to aggregate in sbt is the worst :(
I've done that so many times
Brian P. Holt
@bpholt
Oh yeah, I'm more surprised when I remember to do it :joy:
Gavin Bisesi
@Daenyth
I hate that everything looks fine :(
CI just runs, everything green :tada:
because you didn't run the tests for the new module either
Brian P. Holt
@bpholt
🤦‍♂️
I ran them in IntelliJ… at least, I thought I did
yeah everything looks fine to me… the laws all pass and there aren't tests in the testkit module itself

Oh! you're saying typically, when forgetting to aggregate

:joy:

moment of panic passed

Gavin Bisesi
@Daenyth
yup
sorry, I meant the general case :)
Brian P. Holt
@bpholt
no worries :)
Victor Viale
@Koroeskohr
hello! does anyone have any tips to work with fs2-rabbit in a development environment? I just lost 3 hours trying to figure out a weird issue I had before realizing it was some old non-terminated version of the program that was running in the background
Gavin Bisesi
@Daenyth
Local development? I use the rabbit docker container