Transactions
Redis supports transactions via the MULTI
, EXEC
and DISCARD
commands. redis4cats
provides a RedisTx
utility that models a transaction as a Resource
.
Caveats
Note that every command has to be forked (.start
) because the commands need to be sent to the server asynchronously and no response will be received until either an EXEC
or a DISCARD
command is sent. Both forking and sending the final command is handled by RedisTx
.
These are internals, though. All you need to care about is what commands you want to run as part of a transaction and handle the possible errors and retry logic.
Concurrent transactions
⚠️ in order to run transactions concurrently, you’d need to acquire a connection per transaction (RedisCommands
), as MULTI
can not be called concurrently within the same connection. For such cases, it is recommended to share the same RedisClient
. ⚠️
Working with transactions
There are two methods available in RedisCommands
: transact
and transact_
. The former takes a function TxStore[F, K, V] => List[F[Unit]]
whereas the latter only takes a List[F[Unit]]
.
TxStore
provides a way to store transactional values for later retrieval, as we cannot chain transactional together via flatMap
and friends (every command needs to be atomic and independent of previous Redis results).
trait TxStore[F[_], K, V] {
def get: F[Map[K, V]]
def set(key: K)(v: V): F[Unit]
}
The List[F[Unit]]
represents a list of Redis operations you wish to run within a transaction, ordered from left to right.
Below you can find a first example of transactional commands.
import cats.effect.IO
import cats.implicits._
import dev.profunktor.redis4cats._
import dev.profunktor.redis4cats.tx.{ TransactionDiscarded, TxStore }
val key1 = "test1"
val key2 = "test2"
val key3 = "test3"
val showResult: String => Option[String] => IO[Unit] = key =>
_.fold(Log[IO].info(s"Key not found: $key"))(s => Log[IO].info(s"$key: $s"))
commandsApi.use { redis => // RedisCommands[IO, String, String]
val setters = redis.set(key2, "delete_me") >> redis.set(key3, "foo")
val getters =
redis.get(key1).flatTap(showResult(key1)) >>
redis.get(key2).flatTap(showResult(key2))
val ops = (store: TxStore[IO, String, Option[String]]) =>
List(
redis.set(key1, "foo"),
redis.del(key2).void,
redis.get(key3).flatMap(store.set(key3))
)
val prog =
redis.transact(ops)
.flatMap { kv =>
IO.println(s"KV: ${kv}")
}
.recoverWith {
case TransactionDiscarded =>
Log[IO].error("[Error] - Transaction Discarded")
case e =>
Log[IO].error(s"[Error] - $e")
}
setters >> getters >> prog >> getters.void
}
It should be exclusively used to run Redis commands as part of a transaction, not any other computations. Fail to do so, may result in unexpected behavior.
Transactional commands may be discarded if something went wrong in between.
The transact
method returns the values stored in the given TxStore
, which is used to save results of commands that run as part of the transaction for later retrieval.
If you are only writing values (e.g. only using set
), you may prefer to use transact_
instead.
How NOT to use transactions
For example, the following transaction will result in a dead-lock:
commandsApi.use { redis =>
val getters =
redis.get(key1).flatTap(showResult(key1)) *>
redis.get(key2).flatTap(showResult(key2))
val setters = redis.transact_(
List(redis.set(key1, "foo"), redis.set(key2, "bar"), redis.discard)
)
getters *> setters.void *> getters.void
}
You should never pass a transactional command: MULTI
, EXEC
or DISCARD
. These commands are made available in case you want to handle transactions manually, which you should do at your own risk.
The following example will result in a successful transaction on Redis. Yet, the operation will end up raising the error passed as a command.
commandsApi.use { redis =>
val getters =
redis.get(key1).flatTap(showResult(key1)) *>
redis.get(key2).flatTap(showResult(key2))
val failedTx = redis.transact_(
List(redis.set(key1, "foo"), redis.set(key2, "bar"), IO.raiseError(new Exception("boom")))
)
getters *> failedTx.void *> getters.void
}
Optimistic locking
Redis provides a mechanism called optimistic locking using check-and-set via the WATCH
command. Quoting the Redis documentation:
WATCH
ed keys are monitored in order to detect changes against them. If at least one watched key is modified before theEXEC
command, the whole transaction aborts, andEXEC
returns aNull
reply to notify that the transaction failed.
This library translates the Null
reply as a TransactionDiscarded
error raised in the effect type. E.g.:
val mkRedis: Resource[IO, RedisCommands[IO, String, String]] =
RedisClient[IO].from("redis://localhost").flatMap { cli =>
Redis[IO].fromClient(cli, RedisCodec.Utf8)
}
def txProgram(v1: String, v2: String) =
mkRedis.use { redis =>
val getters =
redis.get(key1).flatTap(showResult(key1)) >>
redis.get(key2).flatTap(showResult(key2)) >>
redis.get(key2).flatTap(showResult(key3))
val ops = List(redis.set(key1, v1), redis.set(key2, v2))
val prog: IO[Unit] =
redis.transact_(ops)
.onError {
case TransactionDiscarded =>
Log[IO].error("[Error] - Transaction Discarded")
case e =>
Log[IO].error(s"[Error] - $e")
}
val watching = redis.watch(key1, key2)
getters >> watching >> prog >> getters >> Log[IO].info("keep doing stuff...")
}
Before executing the transaction, we invoke redis.watch(key1, key2)
. Next, let’s run two concurrent transactions:
IO.race(txProgram("osx", "linux"), txProgram("foo", "bar")).void
In this case, only the first transaction will be successful. The second one will be discarded. However, we want to eventually succeed most of the time, in which case we can retry a transaction until it succeeds (optimistic locking).
def retriableTx: IO[Unit] =
txProgram("foo", "bar").recoverWith {
case TransactionDiscarded => retriableTx
}.uncancelable
IO.race(txProgram("nix", "guix"), retriableTx).void
The first transaction will be successful, but ultimately, the second transaction will retry and set the values “foo” and “bar”.
All these examples can be found under RedisTxDemo.