Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • Dec 06 09:11

    mergify[bot] on 1.x

    Update lettuce-core to 6.2.2.RE… Merge pull request #755 from pr… (compare)

  • Dec 06 09:11

    mergify[bot] on lettuce-core-6.2.2.RELEASE

    (compare)

  • Dec 06 09:11
    mergify[bot] closed #755
  • Dec 06 09:03
    mergify[bot] labeled #755
  • Dec 06 09:02
    gvolpe opened #755
  • Dec 06 09:02

    gvolpe on lettuce-core-6.2.2.RELEASE

    Update lettuce-core to 6.2.2.RE… (compare)

  • Nov 30 09:12

    mergify[bot] on 1.x

    Update cats-effect, cats-effect… Merge pull request #754 from pr… (compare)

  • Nov 30 09:12

    mergify[bot] on cats-effect-3.4.2

    (compare)

  • Nov 30 09:12
    mergify[bot] closed #754
  • Nov 30 09:05
    mergify[bot] labeled #754
  • Nov 30 09:04
    gvolpe opened #754
  • Nov 30 09:04

    gvolpe on cats-effect-3.4.2

    Update cats-effect, cats-effect… (compare)

  • Nov 25 10:06

    gvolpe on v1.3.0

    (compare)

  • Nov 25 09:57

    mergify[bot] on cats-effect-3.4.1

    (compare)

  • Nov 25 09:57

    mergify[bot] on 1.x

    Update cats-effect, cats-effect… Merge pull request #751 from pr… (compare)

  • Nov 25 09:57
    mergify[bot] closed #751
  • Nov 25 09:51
    MamdouhAhmed synchronize #751
  • Nov 25 09:51

    MamdouhAhmed on cats-effect-3.4.1

    Update cats-effect, cats-effect… (compare)

  • Nov 25 09:41
    gvolpe commented #753
  • Nov 25 09:25
    MamdouhAhmed commented #753
kyri-petrou
@kyri-petrou:matrix.org
[m]
Hmmm I'm actually not sure. Might be too unlikely, but any chance it had to do with tagging a commit that wasn't done by you?
I'm not too sure how the workflow is set up, I can look at it tomorrow
Also it's 11pm here so I'm a bit braindead, feel free to ignore me if what I'm saying doesn't make sense 😄
gvolpe
@gvolpe:matrix.org
[m]
I did not have the energy to deal with that, ended up creating another tag: https://github.com/profunktor/redis4cats/releases/tag/v1.1.1
Publishing has been triggered at least
kyri-petrou
@kyri-petrou:matrix.org
[m]
2 releases in 1 day 🎉
gvolpe
@gvolpe:matrix.org
[m]
I need to go now, hope that works! Have a good night then :)
kyri-petrou
@kyri-petrou:matrix.org
[m]
I've done that one too many times myself I have to admit
gvolpe
@gvolpe:matrix.org
[m]
Hmm although the job failed, it seems the publishing to Maven Central was successful: https://repo1.maven.org/maven2/dev/profunktor/redis4cats-effects_3/1.1.1/redis4cats-effects_3-1.1.1.pom
kyri-petrou
@kyri-petrou:matrix.org
[m]
Yeah not sure exactly what failed, but it definitely failed after the package was pushed. So I guess that's fine? 🤷‍♂️
gvolpe
@gvolpe:matrix.org
[m]
Yes, seems to be some Sonatype hiccup but the version was correctly published to Maven Central, I'm already using it in other projects! :)
kyri-petrou
@kyri-petrou:matrix.org
[m]
Yeah started using it as well!
kyri-petrou
@kyri-petrou:matrix.org
[m]
gvolpe got your book(s) today - really looking forward to reading them :)
1 reply
Denis Savitsky
@desavitsky
Hi!
I have such transaction, that one of actions is List[F[Option[Long]]]
Could you help me to understand how I should compose it into one transaction in a such way that I will be able to do smth with the list of results
transaction.exec(rem :: counts :: add :: timeout :: HNil).map { // counts is the list
case ~: res2 ~: ~: _ ~: HNil => ???
}
Here res2 has a type of Option[Long] so I am obviously doing smth wrong
3 replies
Denis Savitsky
@desavitsky
Hi!
What does that error mean "dev.profunktor.redis4cats.transactions$TransactionDiscarded$: null"?
I get that transaction discarded but why?
Could someone help me to get it?
2 replies
gvolpe
@gvolpe:matrix.org
[m]
If you really need it, use it in the imperative way directly with Lettuce (the underlying Java library).
Denis Savitsky
@desavitsky
Do transactions use watch command somewhere? Because it looks like that because transactions fail if two start at the same time
gvolpe
@gvolpe:matrix.org
[m]
No, a transaction is simply modeled with MULTI and DISCARD. WATCH is only used for optimistic locking and it has to be explicitly called: https://github.com/profunktor/redis4cats/blob/series/1.x/modules/effects/src/main/scala/dev/profunktor/redis4cats/transactions.scala#L38
Anyway, as I previously mentioned, the implementation is currently broken and extremely hard to get right so I don't recommend its usage. At some point, I would like to delete all this code, which would be better than supporting broken code.
gvolpe
@gvolpe:matrix.org
[m]

I wrote about the issue with transactions / pipelining. If anyone would like to help, reach out on the issue.

profunktor/redis4cats#681

kyri-petrou
@kyri-petrou:matrix.org
[m]
Realised I posted in the wrong channel. Well done on the new release gvolpe ! Really great features
gvolpe
@gvolpe:matrix.org
[m]
kyri-petrou ha! don't know where you posted before, but thanks! :)
kyri-petrou
@kyri-petrou:matrix.org
[m]
By the way I think the same issue as before happened, the CI failed to release to MVC
kyri-petrou it failed yesterday due to some weird gpg stuff but I fixed it a few mins ago :)
the kind of stuff we all love to work on, of course 😏
kyri-petrou
@kyri-petrou:matrix.org
[m]
😄
javierg1975
@javierg1975

Hi all,
The following code simply pushes a JSON object to a Redis queue.

  def publish(msg: CDAPayload): IO[Unit] =
    (for {
      client <- clientStream
      publisher <- Stream.resource(
        PubSub.mkPublisherConnection[IO, String, CDAPayload](client, cdaCodec)
      )
      pub = publisher.publish(cdaChannel)
      _ <- Stream.emit(msg).through(pub)
    } yield ()).compile.drain

It works just fine as long as the Redis server is deployed independently. The moment we try to deploy it with Redis as a service alongside the container that holds the app it fails with the following

cdalistener.RedisStream | Releasing Redis connection: RedisURI(redis://default:*******@scds_redis%3A6379)
o.n.c.SecuredCDAHandler.$anonfun | Could not publish to Redis: Unable to connect to scds_redis:6379:6379

This is confusing, it seems it can connect to the server but then closes the connection? (it mentions "releasing" the connection, but maybe that's me not understanding the wording) And then fails while trying to publish (which, if it just released the connection I guess it makes sense, but then, why does it do that?).
Once more, this is not an issue if the Redis server has been deployed independently but our docker-compose is almost identical to dozens of other ones where the server is deployed alongside as a service (including gvolpe/pfps-shopping-cart).
While I understand we might be doing something wrong with Docker, there's very little there to get wrong and nothing is obviously suspicious so I would love to hear if it is somehow possible that we're not configuring redis4cats properly in this case.
Here's a sample of the docker-compose.yml we're using in case that helps.

version: '3.4'
services:
  scds_redis:
    restart: always
    image: redis:latest
    networks:
      - redisnet
    ports:
     - '6379:6379'

    command: --requirepass ${REDIS_PWD} # we've also tried without credentials, no difference 

  cdalistener:
    restart: always
    image: xxxxxxxx/cdalistener:latest  
    networks:
      - redisnet
    ports:
      - '5000:8080' # http4s server
    depends_on:
      -   scds_redis  # we've tried without this, makes no difference
    environment:
      - REDIS_CNL=cda  # redis chanel to publish into
      - REDIS_PWD=redispw
      - REDIS_USR=default
      - REDIS_URL=scds_redis
      - REDIS_PRT=6379

networks:
  redisnet:

Any help will be much appreciated.
Best,

Javier

5 replies
kyri-petrou
@kyri-petrou:matrix.org
[m]

Hmm, there’s something funky going on. My guess at this point is that the service name resolution of docker-compose might already include the port number or that something weird is happening with the string interpolation.

Try the following and let me know if any works:

  1. Remove “:$port”
  2. Start container without credentials and remove them from connection string
  3. Hardcode all values within the code instead of parsing them via envvars (just for debugging)
  4. Wrap all interpolated variables in the string with { } (long shot)
Tomasz
@Kurek_gitlab

Hey, I'm using the library in one of my services and recently I've spotted a strange issue.

In the beginning when I used a simple client only with the master uri - I see that StatefulRedisConnection was created under the hood and everything worked fine.

When I changed to the master replica setup (StatefulRedisMasterReplicaConnection) transaction stuck. I was trying to investigate but I can't see anything in jstack - it seems like NIO loop doesn't get the response from the Redis (not sure if I should see the main thread waiting for the transaction being executed but I can't see anything) or for some reason doesn't handle that - in AWS Redis logs I see that transaction commands have been executed.

Frankly speaking, I'm not sure how to investigate it further, after changing the logs to TRACE the last thing that I can see
lettuce-nioEventLoop-4-3 i.l.c.p.CommandHandler - [channel=0x4458a6c8, /127.0.0.1:51473 -> localhost/127.0.0.1:6379, epid=0x3, chid=0x3] Completing command AsyncCommand [type=EXEC, output=MultiOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.AsyncCommand] and the program hangs.

I'm also not sure how to use the client in regards to the transaction - am I supposed to create dedicated connection via RedisCommands and reuse the client or create dedicated client for transaction handling. The use case is that I have one flow changing data structure in transaction and the second one that based on this structure do some actions and I want to assure that the operations don't interfere - modifying the structure and reading the data. In redis doc I see that technically it is assured on the client level so I created the second client for each use case but I'm not sure that this is the best approach. I know that it is technically possible to do it synchronously on the application level but I'd like to avoid that

Could you please take a look and give me some advice, any help much appreciated

2 replies
gvolpe
@gvolpe:matrix.org
[m]

Kurek_gitlab (Tomasz) according to the Lettuce docs, transactions should work with a master-replicate setup. However, this scenario hasn't been tested in Redis4cats. We only have tests for a single node and for clusters, but for master-replica we only have an example that doesn't use transactions.

Unfortunately, I don't have any time or energy to dig into this right now, but you could try updating this example (and even better, add master-replica tests!) and submit a PR, from which we can raise further issues if we can't get this to work.

1 reply
kyri-petrou
@kyri-petrou:matrix.org
[m]

gvolpe: I've been thinking about the discussion with @Kurek_gitlab and I played around with something that would allow to have instantiated connections for transactions / pipelines. What are your thoughts on having something like this:

trait Transactor[F[_], K, V] {
  def transact_(ops: RedisCommands[F, K, V] => List[F[Unit]]): F[Unit]
  def transact[A](ops: RedisCommands[F, K, V] => TxStore[F, String, A] => List[F[Unit]]): F[Map[String, A]]
}

object Transactor {
  private type Cmd[F[_], K, V] = RedisCommands[F, K, V]

  def make[F[_], K, V](redisR: Resource[F, Cmd[F, K, V]], nTransactionPools: Int)(implicit
      F: Async[F]
  ): Resource[F, Transactor[F, K, V]] = {
    assert(nTransactionPools >= 1, "nTransactionPools must be positive")
    for {
      cmds   <- List.fill(nTransactionPools)(redisR).sequence
      queue <- Resource.eval(Queue.bounded[F, Cmd[F, K, V]](nTransactionPools))
      _          <- Resource.eval(cmds.traverse(queue.offer))
    } yield new Transactor[F, K, V] {

      override def transact_(ops: Cmd[F, K, V] => List[F[Unit]]): F[Unit] =
        queue.take.flatMap { cmd =>
          cmd.transact_(ops(cmd)).guarantee(queue.offer(cmd).uncancelable)
        }

      override def transact[A](ops: Cmd[F, K, V] => TxStore[F, String, A] => List[F[Unit]]): F[Map[String, A]] =
        queue.take.flatMap { cmd =>
          cmd.transact(ops(cmd)).guarantee(queue.offer(cmd).uncancelable)
        }
    }
  }
}

Example usage (based on the one in the docs):

object Main extends IOApp.Simple {

  val key1 = "test1"
  val key2 = "test2"
  val key3 = "test3"
  val showResult: String => Option[String] => IO[Unit] = key =>
    _.fold(Log[IO].info(s"Key not found: $key"))(s => Log[IO].info(s"$key: $s"))

  override def run: IO[Unit] = {
    val redisR      = Redis[IO].utf8("redis://localhost")
    val transactorR = Transactor.make(redisR, 2)

    Resource.both(redisR, transactorR).use { case (redis, tx) =>
      val setters = redis.set(key2, "delete_me") >> redis.set(key3, "foo")

      val getters =
        redis.get(key1).flatTap(showResult(key1)) >>
          redis.get(key2).flatTap(showResult(key2))

      val ops = (cmd: RedisCommands[IO, String, String]) =>
        (store: TxStore[IO, String, Option[String]]) =>
          List(
            cmd.set(key1, "foo"),
            cmd.del(key2).void,
            cmd.get(key3).flatMap(store.set(key3))
          )

      val prog =
        tx
          .transact(ops(_))
          .flatMap { kv => IO.println(s"KV: ${kv}") }
          .recoverWith {
            case TransactionDiscarded => Log[IO].error("[Error] - Transaction Discarded")
            case e                    => Log[IO].error(s"[Error] - $e")
          }
      setters >> getters >> prog >> getters.void
    }
  }
}
kyri-petrou
@kyri-petrou:matrix.org
[m]

:point_up: Edit: gvolpe: I've been thinking about the discussion with @Kurek_gitlab and I played around with something that would allow to have instantiated connections for transactions / pipelines. What are your thoughts on having something like this:

trait Transactor[F[_], K, V] {
  def transact_(ops: RedisCommands[F, K, V] => List[F[Unit]]): F[Unit]
  def transact[A](ops: RedisCommands[F, K, V] => TxStore[F, String, A] => List[F[Unit]]): F[Map[String, A]]
}

object Transactor {
  private type Cmd[F[_], K, V] = RedisCommands[F, K, V]

  def make[F[_], K, V](redisR: Resource[F, Cmd[F, K, V]], nTransactionPools: Int)(implicit
      F: Async[F]
  ): Resource[F, Transactor[F, K, V]] = {
    assert(nTransactionPools >= 1, "nTransactionPools must be positive")
    for {
      cmds   <- List.fill(nTransactionPools)(redisR).sequence
      queue <- Resource.eval(Queue.bounded[F, Cmd[F, K, V]](nTransactionPools))
      _          <- Resource.eval(cmds.traverse(queue.offer))
    } yield new Transactor[F, K, V] {

      override def transact_(ops: Cmd[F, K, V] => List[F[Unit]]): F[Unit] =
        queue.take.flatMap { cmd =>
          cmd.transact_(ops(cmd)).guarantee(queue.offer(cmd))
        }

      override def transact[A](ops: Cmd[F, K, V] => TxStore[F, String, A] => List[F[Unit]]): F[Map[String, A]] =
        queue.take.flatMap { cmd =>
          cmd.transact(ops(cmd)).guarantee(queue.offer(cmd))
        }
    }
  }
}

Example usage (based on the one in the docs):

object Main extends IOApp.Simple {

  val key1 = "test1"
  val key2 = "test2"
  val key3 = "test3"
  val showResult: String => Option[String] => IO[Unit] = key =>
    _.fold(Log[IO].info(s"Key not found: $key"))(s => Log[IO].info(s"$key: $s"))

  override def run: IO[Unit] = {
    val redisR      = Redis[IO].utf8("redis://localhost")
    val transactorR = Transactor.make(redisR, 2)

    Resource.both(redisR, transactorR).use { case (redis, tx) =>
      val setters = redis.set(key2, "delete_me") >> redis.set(key3, "foo")

      val getters =
        redis.get(key1).flatTap(showResult(key1)) >>
          redis.get(key2).flatTap(showResult(key2))

      val ops = (cmd: RedisCommands[IO, String, String]) =>
        (store: TxStore[IO, String, Option[String]]) =>
          List(
            cmd.set(key1, "foo"),
            cmd.del(key2).void,
            cmd.get(key3).flatMap(store.set(key3))
          )

      val prog =
        tx
          .transact(ops(_))
          .flatMap { kv => IO.println(s"KV: ${kv}") }
          .recoverWith {
            case TransactionDiscarded => Log[IO].error("[Error] - Transaction Discarded")
            case e                    => Log[IO].error(s"[Error] - $e")
          }
      setters >> getters >> prog >> getters.void
    }
  }
}

:point_up: Edit: gvolpe: I've been thinking about the discussion with @Kurek_gitlab and I played around with something that would allow to have instantiated connections for transactions / pipelines. What are your thoughts on having something like this:

trait Transactor[F[_], K, V] {
  def transact_(ops: RedisCommands[F, K, V] => List[F[Unit]]): F[Unit]
  def transact[A](ops: RedisCommands[F, K, V] => TxStore[F, String, A] => List[F[Unit]]): F[Map[String, A]]
}

object Transactor {
  private type Cmd[F[_], K, V] = RedisCommands[F, K, V]

  def make[F[_], K, V](redisR: Resource[F, Cmd[F, K, V]], nTransactionPools: Int)(implicit
      F: Async[F]
  ): Resource[F, Transactor[F, K, V]] = {
    assert(nTransactionPools >= 1, "nTransactionPools must be positive")
    for {
      cmds  <- List.fill(nTransactionPools)(redisR).sequence
      queue <- Resource.eval(Queue.bounded[F, Cmd[F, K, V]](nTransactionPools))
      _          <- Resource.eval(cmds.traverse(queue.offer))
    } yield new Transactor[F, K, V] {

      override def transact_(ops: Cmd[F, K, V] => List[F[Unit]]): F[Unit] =
        queue.take.flatMap { cmd =>
          cmd.transact_(ops(cmd)).guarantee(queue.offer(cmd))
        }

      override def transact[A](ops: Cmd[F, K, V] => TxStore[F, String, A] => List[F[Unit]]): F[Map[String, A]] =
        queue.take.flatMap { cmd =>
          cmd.transact(ops(cmd)).guarantee(queue.offer(cmd))
        }
    }
  }
}

Example usage (based on the one in the docs):

object Main extends IOApp.Simple {

  val key1 = "test1"
  val key2 = "test2"
  val key3 = "test3"
  val showResult: String => Option[String] => IO[Unit] = key =>
    _.fold(Log[IO].info(s"Key not found: $key"))(s => Log[IO].info(s"$key: $s"))

  override def run: IO[Unit] = {
    val redisR      = Redis[IO].utf8("redis://localhost")
    val transactorR = Transactor.make(redisR, 2)

    Resource.both(redisR, transactorR).use { case (redis, tx) =>
      val setters = redis.set(key2, "delete_me") >> redis.set(key3, "foo")

      val getters =
        redis.get(key1).flatTap(showResult(key1)) >>
          redis.get(key2).flatTap(showResult(key2))

      val ops = (cmd: RedisCommands[IO, String, String]) =>
        (store: TxStore[IO, String, Option[String]]) =>
          List(
            cmd.set(key1, "foo"),
            cmd.del(key2).void,
            cmd.get(key3).flatMap(store.set(key3))
          )

      val prog =
        tx
          .transact(ops(_))
          .flatMap { kv => IO.println(s"KV: ${kv}") }
          .recoverWith {
            case TransactionDiscarded => Log[IO].error("[Error] - Transaction Discarded")
            case e                    => Log[IO].error(s"[Error] - $e")
          }
      setters >> getters >> prog >> getters.void
    }
  }
}

:point_up: Edit: gvolpe: I've been thinking about the discussion with @Kurek_gitlab and I played around with something that would allow to have instantiated connections for transactions / pipelines. What are your thoughts on having something like this:

trait Transactor[F[_], K, V] {
  def transact_(ops: RedisCommands[F, K, V] => List[F[Unit]]): F[Unit]
  def transact[A](ops: RedisCommands[F, K, V] => TxStore[F, String, A] => List[F[Unit]]): F[Map[String, A]]
}

object Transactor {
  private type Cmd[F[_], K, V] = RedisCommands[F, K, V]

  def make[F[_], K, V](redisR: Resource[F, Cmd[F, K, V]], nTransactionPools: Int)(implicit
      F: Async[F]
  ): Resource[F, Transactor[F, K, V]] = {
    assert(nTransactionPools >= 1, "nTransactionPools must be positive")
    for {
      cmds  <- List.fill(nTransactionPools)(redisR).sequence
      queue <- Resource.eval(Queue.bounded[F, Cmd[F, K, V]](nTransactionPools))
      _        <- Resource.eval(cmds.traverse(queue.offer))
    } yield new Transactor[F, K, V] {

      override def transact_(ops: Cmd[F, K, V] => List[F[Unit]]): F[Unit] =
        queue.take.flatMap { cmd =>
          cmd.transact_(ops(cmd)).guarantee(queue.offer(cmd))
        }

      override def transact[A](ops: Cmd[F, K, V] => TxStore[F, String, A] => List[F[Unit]]): F[Map[String, A]] =
        queue.take.flatMap { cmd =>
          cmd.transact(ops(cmd)).guarantee(queue.offer(cmd))
        }
    }
  }
}

Example usage (based on the one in the docs):

object Main extends IOApp.Simple {

  val key1 = "test1"
  val key2 = "test2"
  val key3 = "test3"
  val showResult: String => Option[String] => IO[Unit] = key =>
    _.fold(Log[IO].info(s"Key not found: $key"))(s => Log[IO].info(s"$key: $s"))

  override def run: IO[Unit] = {
    val redisR      = Redis[IO].utf8("redis://localhost")
    val transactorR = Transactor.make(redisR, 2)

    Resource.both(redisR, transactorR).use { case (redis, tx) =>
      val setters = redis.set(key2, "delete_me") >> redis.set(key3, "foo")

      val getters =
        redis.get(key1).flatTap(showResult(key1)) >>
          redis.get(key2).flatTap(showResult(key2))

      val ops = (cmd: RedisCommands[IO, String, String]) =>
        (store: TxStore[IO, String, Option[String]]) =>
          List(
            cmd.set(key1, "foo"),
            cmd.del(key2).void,
            cmd.get(key3).flatMap(store.set(key3))
          )

      val prog =
        tx
          .transact(ops(_))
          .flatMap { kv => IO.println(s"KV: ${kv}") }
          .recoverWith {
            case TransactionDiscarded => Log[IO].error("[Error] - Transaction Discarded")
            case e                    => Log[IO].error(s"[Error] - $e")
          }
      setters >> getters >> prog >> getters.void
    }
  }
}

:point_up: Edit: gvolpe: I've been thinking about the discussion with @Kurek_gitlab and I played around with something that would allow to have instantiated connections for transactions / pipelines. What are your thoughts on having something like this:

trait Transactor[F[_], K, V] {
  def transact_(ops: RedisCommands[F, K, V] => List[F[Unit]]): F[Unit]
  def transact[A](ops: RedisCommands[F, K, V] => TxStore[F, String, A] => List[F[Unit]]): F[Map[String, A]]
}

object Transactor {
  private type Cmd[F[_], K, V] = RedisCommands[F, K, V]

  def make[F[_], K, V](redisR: Resource[F, Cmd[F, K, V]], nTransactionPools: Int)(implicit
      F: Async[F]
  ): Resource[F, Transactor[F, K, V]] = {
    assert(nTransactionPools >= 1, "nTransactionPools must be positive")
    for {
      cmds  <- List.fill(nTransactionPools)(redisR).sequence
      queue <- Resource.eval(Queue.bounded[F, Cmd[F, K, V]](nTransactionPools))
      _      <- Resource.eval(cmds.traverse(queue.offer))
    } yield new Transactor[F, K, V] {

      override def transact_(ops: Cmd[F, K, V] => List[F[Unit]]): F[Unit] =
        queue.take.flatMap { cmd =>
          cmd.transact_(ops(cmd)).guarantee(queue.offer(cmd))
        }

      override def transact[A](ops: Cmd[F, K, V] => TxStore[F, String, A] => List[F[Unit]]): F[Map[String, A]] =
        queue.take.flatMap { cmd =>
          cmd.transact(ops(cmd)).guarantee(queue.offer(cmd))
        }
    }
  }
}

Example usage (based on the one in the docs):

object Main extends IOApp.Simple {

  val key1 = "test1"
  val key2 = "test2"
  val key3 = "test3"
  val showResult: String => Option[String] => IO[Unit] = key =>
    _.fold(Log[IO].info(s"Key not found: $key"))(s => Log[IO].info(s"$key: $s"))

  override def run: IO[Unit] = {
    val redisR      = Redis[IO].utf8("redis://localhost")
    val transactorR = Transactor.make(redisR, 2)

    Resource.both(redisR, transactorR).use { case (redis, tx) =>
      val setters = redis.set(key2, "delete_me") >> redis.set(key3, "foo")

      val getters =
        redis.get(key1).flatTap(showResult(key1)) >>
          redis.get(key2).flatTap(showResult(key2))

      val ops = (cmd: RedisCommands[IO, String, String]) =>
        (store: TxStore[IO, String, Option[String]]) =>
          List(
            cmd.set(key1, "foo"),
            cmd.del(key2).void,
            cmd.get(key3).flatMap(store.set(key3))
          )

      val prog =
        tx
          .transact(ops(_))
          .flatMap { kv => IO.println(s"KV: ${kv}") }
          .recoverWith {
            case TransactionDiscarded => Log[IO].error("[Error] - Transaction Discarded")
            case e                    => Log[IO].error(s"[Error] - $e")
          }
      setters >> getters >> prog >> getters.void
    }
  }
}

:point_up: Edit: gvolpe: I've been thinking about the discussion with @Kurek_gitlab and I played around with something that would allow to have instantiated connections for transactions / pipelines. What are your thoughts on having something like this:

trait Transactor[F[_], K, V] {
  def transact_(ops: RedisCommands[F, K, V] => List[F[Unit]]): F[Unit]
  def transact[A](ops: RedisCommands[F, K, V] => TxStore[F, String, A] => List[F[Unit]]): F[Map[String, A]]
}

object Transactor {
  private type Cmd[F[_], K, V] = RedisCommands[F, K, V]

  def make[F[_], K, V](redisR: Resource[F, Cmd[F, K, V]], nTransactionPools: Int)(implicit
      F: Async[F]
  ): Resource[F, Transactor[F, K, V]] = {
    assert(nTransactionPools >= 1, "nTransactionPools must be positive")
    for {
      cmds  <- List.fill(nTransactionPools)(redisR).sequence
      queue <- Resource.eval(Queue.bounded[F, Cmd[F, K, V]](nTransactionPools))
      _    <- Resource.eval(cmds.traverse(queue.offer))
    } yield new Transactor[F, K, V] {

      override def transact_(ops: Cmd[F, K, V] => List[F[Unit]]): F[Unit] =
        queue.take.flatMap { cmd =>
          cmd.transact_(ops(cmd)).guarantee(queue.offer(cmd))
        }

      override def transact[A](ops: Cmd[F, K, V] => TxStore[F, String, A] => List[F[Unit]]): F[Map[String, A]] =
        queue.take.flatMap { cmd =>
          cmd.transact(ops(cmd)).guarantee(queue.offer(cmd))
        }
    }
  }
}

Example usage (based on the one in the docs):

object Main extends IOApp.Simple {

  val key1 = "test1"
  val key2 = "test2"
  val key3 = "test3"
  val showResult: String => Option[String] => IO[Unit] = key =>
    _.fold(Log[IO].info(s"Key not found: $key"))(s => Log[IO].info(s"$key: $s"))

  override def run: IO[Unit] = {
    val redisR      = Redis[IO].utf8("redis://localhost")
    val transactorR = Transactor.make(redisR, 2)

    Resource.both(redisR, transactorR).use { case (redis, tx) =>
      val setters = redis.set(key2, "delete_me") >> redis.set(key3, "foo")

      val getters =
        redis.get(key1).flatTap(showResult(key1)) >>
          redis.get(key2).flatTap(showResult(key2))

      val ops = (cmd: RedisCommands[IO, String, String]) =>
        (store: TxStore[IO, String, Option[String]]) =>
          List(
            cmd.set(key1, "foo"),
            cmd.del(key2).void,
            cmd.get(key3).flatMap(store.set(key3))
          )

      val prog =
        tx
          .transact(ops(_))
          .flatMap { kv => IO.println(s"KV: ${kv}") }
          .recoverWith {
            case TransactionDiscarded => Log[IO].error("[Error] - Transaction Discarded")
            case e                    => Log[IO].error(s"[Error] - $e")
          }
      setters >> getters >> prog >> getters.void
    }
  }
}

:point_up: Edit: gvolpe: I've been thinking about the discussion with @Kurek_gitlab and I played around with something that would allow to have instantiated connections for transactions / pipelines. What are your thoughts on having something like this:

trait Transactor[F[_], K, V] {
  def transact_(ops: RedisCommands[F, K, V] => List[F[Unit]]): F[Unit]
  def transact[A](ops: RedisCommands[F, K, V] => TxStore[F, String, A] => List[F[Unit]]): F[Map[String, A]]
}

object Transactor {
  private type Cmd[F[_], K, V] = RedisCommands[F, K, V]

  def make[F[_], K, V](redisR: Resource[F, Cmd[F, K, V]], nTransactionPools: Int)(implicit
      F: Async[F]
  ): Resource[F, Transactor[F, K, V]] = {
    assert(nTransactionPools >= 1, "nTransactionPools must be positive")
    for {
      cmds  <- List.fill(nTransactionPools)(redisR).sequence
      queue <- Resource.eval(Queue.bounded[F, Cmd[F, K, V]](nTransactionPools))
      _     <- Resource.eval(cmds.traverse(queue.offer))
    } yield new Transactor[F, K, V] {

      override def transact_(ops: Cmd[F, K, V] => List[F[Unit]]): F[Unit] =
        queue.take.flatMap { cmd =>
          cmd.transact_(ops(cmd)).guarantee(queue.offer(cmd))
        }

      override def transact[A](ops: Cmd[F, K, V] => TxStore[F, String, A] => List[F[Unit]]): F[Map[String, A]] =
        queue.take.flatMap { cmd =>
          cmd.transact(ops(cmd)).guarantee(queue.offer(cmd))
        }
    }
  }
}

Example usage (based on the one in the docs):

object Main extends IOApp.Simple {

  val key1 = "test1"
  val key2 = "test2"
  val key3 = "test3"
  val showResult: String => Option[String] => IO[Unit] = key =>
    _.fold(Log[IO].info(s"Key not found: $key"))(s => Log[IO].info(s"$key: $s"))

  override def run: IO[Unit] = {
    val redisR      = Redis[IO].utf8("redis://localhost")
    val transactorR = Transactor.make(redisR, 2)

    Resource.both(redisR, transactorR).use { case (redis, tx) =>
      val setters = redis.set(key2, "delete_me") >> redis.set(key3, "foo")

      val getters =
        redis.get(key1).flatTap(showResult(key1)) >>
          redis.get(key2).flatTap(showResult(key2))

      val ops = (cmd: RedisCommands[IO, String, String]) =>
        (store: TxStore[IO, String, Option[String]]) =>
          List(
            cmd.set(key1, "foo"),
            cmd.del(key2).void,
            cmd.get(key3).flatMap(store.set(key3))
          )

      val prog =
        tx
          .transact(ops(_))
          .flatMap { kv => IO.println(s"KV: ${kv}") }
          .recoverWith {
            case TransactionDiscarded => Log[IO].error("[Error] - Transaction Discarded")
            case e                    => Log[IO].error(s"[Error] - $e")
          }
      setters >> getters >> prog >> getters.void
    }
  }
}
gvolpe
@gvolpe:matrix.org
[m]
kyri-petrou I'll get back to you next week, I'm off this one
kyri-petrou
@kyri-petrou:matrix.org
[m]
No worries, enjoy the time off!
gvolpe
@gvolpe:matrix.org
[m]
kyri-petrou didn't forget about your question and PR, I'll be back on the computer full-time tomorrow
kyri-petrou
@kyri-petrou:matrix.org
[m]
No stress!
gvolpe
@gvolpe:matrix.org
[m]
kyri-petrou can you elaborate a bit more on the idea for that internal queue and the number of transaction pools? Initially, I would not be opposed to that Transactor interface but I think we could improve the UX, probably with extension methods as we do with redis.transact { ... }
1 reply
Tobias Roland
@TobiasRoland

:wave: Howdy! I'm having some trouble understanding why I'm getting ERR MULTI calls can not be nested when I try to do:

cmds.transact_(
                 cmds.set(cacheKey, a).void :: cmds.expire(cacheKey, ttl).void :: Nil
               )

... is this not the correct way to set a key/value + its expiry?

Tobias Roland
@TobiasRoland
Ah, I see there's a .setEx... still, curious to know why the above didn't work if anyone knows?
gvolpe
@gvolpe:matrix.org
[m]

TobiasRoland (Tobias Roland) you're more likely running concurrent transactions using a single connection. See https://redis4cats.profunktor.dev/transactions.html#concurrent-transactions

Also note that a lot of work has been placed on transactions, it is recommended to upgrade to the latest version.