Streams (experimental)

High-level, safe and pure functional API on top of Redis Streams.

Establishing a connection

There are two ways of establishing a connection using the RedisStream interpreter:

Single connection

def mkStreamingConnection[F[_], K, V](
  client: RedisClient,
  codec: RedisCodec[K, V],
  uri: RedisURI
): Stream[F, Streaming[Stream[F, ?], K, V]]

Master / Replica connection

def mkMasterReplicaConnection[F[_], K, V](codec: RedisCodec[K, V], uris: RedisURI*)(
  readFrom: Option[ReadFrom] = None): Stream[F, Streaming[Stream[F, ?], K, V]]

Cluster connection

Not implemented yet.

Streaming API

At the moment there’s only two combinators:

trait Streaming[F[_], K, V] {
  def append: F[XAddMessage[K, V]] => F[MessageId]
  def read(keys: Set[K], initialOffset: K => StreamingOffset[K] = StreamingOffset.All[K]): F[XReadMessage[K, V]]

append can be used as a Sink[F, StreamingMessage[K, V] and read(keys) as a source Stream[F, StreamingMessageWithId[K, V]. Note that Redis allows you to consume from multiple stream keys at the same time.

Streaming Example

import cats.effect.IO
import cats.syntax.all._
import dev.profunktor.redis4cats.connection.RedisClient
import dev.profunktor.redis4cats.log4cats._
import dev.profunktor.redis4cats.streams.RedisStream
import fs2.Stream
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import scala.concurrent.duration._
import scala.util.Random

implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO]

val stringCodec = RedisCodec.Utf8

def putStrLn[A](a: A): IO[Unit] = IO(println(a))

val streamKey1 = "demo"
val streamKey2 = "users"

def randomMessage: Stream[IO, XAddMessage[String, String]] = Stream.eval {
  val rndKey   = IO(Random.nextInt(1000).toString)
  val rndValue = IO(Random.nextString(10))
  (rndKey, rndValue).parMapN {
    case (k, v) =>
      XAddMessage(streamKey1, Map(k -> v))

for {
  client    <- Stream.resource(RedisClient[IO].from("redis://localhost"))
  streaming <- RedisStream.mkStreamingConnection[IO, String, String](client, stringCodec)
  source    =, streamKey2), chunkSize = 1)
  appender  = streaming.append
  _ <- Stream(
         Stream.awakeEvery[IO](3.seconds) >> randomMessage.through(appender)
} yield ()