Streams

High-level, safe and pure functional API on top of Redis Streams.

Establishing a connection

There are two ways of establishing a connection using the RedisStream interpreter:

Single connection

def mkStreamingConnection[F[_], K, V](
  client: RedisClient,
  codec: RedisCodec[K, V],
  uri: RedisURI
): Stream[F, Streaming[Stream[F, ?], K, V]]

Master / Replica connection

def mkMasterReplicaConnection[F[_], K, V](codec: RedisCodec[K, V], uris: RedisURI*)(
  readFrom: Option[ReadFrom] = None): Stream[F, Streaming[Stream[F, ?], K, V]]

Cluster connection

Not implemented yet.

Custom connection

Redis[IO].simple("redis://localhost", RedisCodec.Utf8).map { redis =>
  // Wrap any RedisCommands with a RedisStream layer
  RedisStream[IO, String, String](redis)
}

Streaming API

See https://redis.io/docs/latest/develop/data-types/streams/

trait Streaming[F[_], S[_], K, V] {
  def append: S[XAddMessage[K, V]] => S[MessageId]
  def append(msg: XAddMessage[K, V]): F[MessageId]
  def read(
    streams: Set[XReadOffsets[K]],
    chunkSize: Int,
    block: Option[Duration] = Some(Duration.Zero),
    count: Option[Long] = None,
    restartOnTimeout: RestartOnTimeout = RestartOnTimeout.always
  ): S[StreamMessage[K, V]]
}

S[_] represents a Stream[F, *] or any other effectful type constructor used to model the streaming operations.

See https://redis.io/docs/latest/commands/xadd. append can be used to add a single message or as a fs2.Pipe[F, XAddMessage[K, V], MessageId] to a Stream of Stream[F, XAddMessage[K, V]].

See https://redis.io/docs/latest/commands/xread. read(...) is a source stream, Stream[F, StreamMessage[K, V], from multiple Redis stream’s.

It’s recommend to use a separate connection for reading and writing to the stream. Read operations may block if the block duration is set.

Streaming Example

import cats.effect.{ IO, IOApp }
import cats.syntax.all._
import dev.profunktor.redis4cats.Redis
import dev.profunktor.redis4cats.data._
import dev.profunktor.redis4cats.log4cats._
import dev.profunktor.redis4cats.streams.RedisStream
import dev.profunktor.redis4cats.effects.XReadOffsets
import dev.profunktor.redis4cats.streams.data._
import fs2.Stream
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import scala.concurrent.duration._
import scala.util.Random

object StreamingDemo2 extends IOApp.Simple {

  implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO]

  val stringCodec = RedisCodec.Utf8
  val redisURI    = "redis://localhost"

  val streamKey1 = "demo"
  val streamKey2 = "users"

  def randomMessage: Stream[IO, XAddMessage[String, String]] = Stream.evals {
    val rndKey   = IO(Random.nextInt(1000).toString)
    val rndValue = IO(Random.nextString(10))
    (rndKey, rndValue).parMapN { case (k, v) =>
      List(
        XAddMessage(streamKey1, Map(k -> v)),
        XAddMessage(streamKey2, Map(k -> v))
      )
    }
  }

  private val readStream: Stream[IO, Unit] =
    for {
      redis <- Stream.resource(Redis[IO].simple(redisURI, stringCodec))
      streaming = RedisStream[IO, String, String](redis)
      message <- streaming.read(XReadOffsets.all(streamKey1, streamKey2))
      _ <- Stream.eval(IO.println(message))
    } yield ()

  private val writeStream: Stream[IO, Unit] =
    for {
      redis <- Stream.resource(Redis[IO].simple(redisURI, stringCodec))
      streaming = RedisStream[IO, String, String](redis)
      _ <- Stream.awakeEvery[IO](2.seconds)
      _ <- randomMessage.through(streaming.append)
    } yield ()

  def run: IO[Unit] =
    readStream
      .concurrently(writeStream)
      .interruptAfter(5.seconds)
      .compile
      .drain

}