Transactions

Redis supports transactions via the MULTI, EXEC and DISCARD commands. redis4cats provides a RedisTx utility that models a transaction as a Resource.

Caveats

Note that every command has to be forked (.start) because the commands need to be sent to the server asynchronously and no response will be received until either an EXEC or a DISCARD command is sent. Both forking and sending the final command is handled by RedisTx.

These are internals, though. All you need to care about is what commands you want to run as part of a transaction and handle the possible errors and retry logic.

Concurrent transactions

⚠️ in order to run transactions concurrently, you’d need to acquire a connection per transaction (RedisCommands), as MULTI can not be called concurrently within the same connection. For such cases, it is recommended to share the same RedisClient. ⚠️

Working with transactions

There are two methods available in RedisCommands: transact and transact_. The former takes a function TxStore[F, K, V] => List[F[Unit]] whereas the latter only takes a List[F[Unit]].

TxStore provides a way to store transactional values for later retrieval, as we cannot chain transactional together via flatMap and friends (every command needs to be atomic and independent of previous Redis results).

trait TxStore[F[_], K, V] {
  def get: F[Map[K, V]]
  def set(key: K)(v: V): F[Unit]
}

The List[F[Unit]] represents a list of Redis operations you wish to run within a transaction, ordered from left to right.

Below you can find a first example of transactional commands.

import cats.effect.IO
import cats.implicits._
import dev.profunktor.redis4cats._
import dev.profunktor.redis4cats.tx.{ TransactionDiscarded, TxStore }

val key1 = "test1"
val key2 = "test2"
val key3 = "test3"

val showResult: String => Option[String] => IO[Unit] = key =>
  _.fold(Log[IO].info(s"Key not found: $key"))(s => Log[IO].info(s"$key: $s"))

commandsApi.use { redis => // RedisCommands[IO, String, String]
  val setters = redis.set(key2, "delete_me") >> redis.set(key3, "foo")

  val getters =
    redis.get(key1).flatTap(showResult(key1)) >>
      redis.get(key2).flatTap(showResult(key2))

  val ops = (store: TxStore[IO, String, Option[String]]) =>
    List(
      redis.set(key1, "foo"),
      redis.del(key2).void,
      redis.get(key3).flatMap(store.set(key3))
    )

  val prog =
    redis.transact(ops)
      .flatMap { kv =>
        IO.println(s"KV: ${kv}")
      }
      .recoverWith {
        case TransactionDiscarded =>
          Log[IO].error("[Error] - Transaction Discarded")
        case e =>
          Log[IO].error(s"[Error] - $e")
      }

  setters >> getters >> prog >> getters.void
}

It should be exclusively used to run Redis commands as part of a transaction, not any other computations. Fail to do so, may result in unexpected behavior.

Transactional commands may be discarded if something went wrong in between.

The transact method returns the values stored in the given TxStore, which is used to save results of commands that run as part of the transaction for later retrieval.

If you are only writing values (e.g. only using set), you may prefer to use transact_ instead.

How NOT to use transactions

For example, the following transaction will result in a dead-lock:

commandsApi.use { redis =>
  val getters =
    redis.get(key1).flatTap(showResult(key1)) *>
      redis.get(key2).flatTap(showResult(key2))

  val setters = redis.transact_(
    List(redis.set(key1, "foo"), redis.set(key2, "bar"), redis.discard)
  )

  getters *> setters.void *> getters.void
}

You should never pass a transactional command: MULTI, EXEC or DISCARD. These commands are made available in case you want to handle transactions manually, which you should do at your own risk.

The following example will result in a successful transaction on Redis. Yet, the operation will end up raising the error passed as a command.

commandsApi.use { redis =>
  val getters =
    redis.get(key1).flatTap(showResult(key1)) *>
      redis.get(key2).flatTap(showResult(key2))

  val failedTx = redis.transact_(
    List(redis.set(key1, "foo"), redis.set(key2, "bar"), IO.raiseError(new Exception("boom")))
  )

  getters *> failedTx.void *> getters.void
}

Optimistic locking

Redis provides a mechanism called optimistic locking using check-and-set via the WATCH command. Quoting the Redis documentation:

WATCHed keys are monitored in order to detect changes against them. If at least one watched key is modified before the EXEC command, the whole transaction aborts, and EXEC returns a Null reply to notify that the transaction failed.

This library translates the Null reply as a TransactionDiscarded error raised in the effect type. E.g.:

val mkRedis: Resource[IO, RedisCommands[IO, String, String]] =
  RedisClient[IO].from("redis://localhost").flatMap { cli =>
    Redis[IO].fromClient(cli, RedisCodec.Utf8)
  }

def txProgram(v1: String, v2: String) =
  mkRedis.use { redis =>
    val getters =
      redis.get(key1).flatTap(showResult(key1)) >>
        redis.get(key2).flatTap(showResult(key2)) >>
        redis.get(key2).flatTap(showResult(key3))

    val ops = List(redis.set(key1, v1), redis.set(key2, v2))

    val prog: IO[Unit] =
      redis.transact_(ops)
        .onError {
          case TransactionDiscarded =>
            Log[IO].error("[Error] - Transaction Discarded")
          case e =>
            Log[IO].error(s"[Error] - $e")
        }

    val watching = redis.watch(key1, key2)

    getters >> watching >> prog >> getters >> Log[IO].info("keep doing stuff...")
  }

Before executing the transaction, we invoke redis.watch(key1, key2). Next, let’s run two concurrent transactions:

IO.race(txProgram("osx", "linux"), txProgram("foo", "bar")).void

In this case, only the first transaction will be successful. The second one will be discarded. However, we want to eventually succeed most of the time, in which case we can retry a transaction until it succeeds (optimistic locking).

def retriableTx: IO[Unit] =
  txProgram("foo", "bar").recoverWith {
    case TransactionDiscarded => retriableTx
  }.uncancelable

IO.race(txProgram("nix", "guix"), retriableTx).void

The first transaction will be successful, but ultimately, the second transaction will retry and set the values “foo” and “bar”.

All these examples can be found under RedisTxDemo.