Streams (experimental)
High-level, safe and pure functional API on top of Redis Streams.
Establishing a connection
There are two ways of establishing a connection using the RedisStream
interpreter:
Single connection
def mkStreamingConnection[F[_], K, V](
client: RedisClient,
codec: RedisCodec[K, V],
uri: RedisURI
): Stream[F, Streaming[Stream[F, ?], K, V]]
Master / Replica connection
def mkMasterReplicaConnection[F[_], K, V](codec: RedisCodec[K, V], uris: RedisURI*)(
readFrom: Option[ReadFrom] = None): Stream[F, Streaming[Stream[F, ?], K, V]]
Cluster connection
Not implemented yet.
Streaming API
At the moment there’s only two combinators:
trait Streaming[F[_], K, V] {
def append: F[XAddMessage[K, V]] => F[MessageId]
def read(keys: Set[K], initialOffset: K => StreamingOffset[K] = StreamingOffset.All[K]): F[XReadMessage[K, V]]
}
append
can be used as a Sink[F, StreamingMessage[K, V]
and read(keys)
as a source Stream[F, StreamingMessageWithId[K, V]
. Note that Redis
allows you to consume from multiple stream
keys at the same time.
Streaming Example
import cats.effect.{IO, IOApp}
import cats.syntax.all._
import dev.profunktor.redis4cats.connection.RedisClient
import dev.profunktor.redis4cats.data._
import dev.profunktor.redis4cats.log4cats._
import dev.profunktor.redis4cats.streams.RedisStream
import dev.profunktor.redis4cats.streams.data._
import fs2.Stream
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import scala.concurrent.duration._
import scala.util.Random
object StreamingExample extends IOApp.Simple {
implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO]
val stringCodec = RedisCodec.Utf8
def putStrLn[A](a: A): IO[Unit] = IO(println(a))
val streamKey1 = "demo"
val streamKey2 = "users"
def randomMessage: Stream[IO, XAddMessage[String, String]] = Stream.eval {
val rndKey = IO(Random.nextInt(1000).toString)
val rndValue = IO(Random.nextString(10))
(rndKey, rndValue).parMapN {
case (k, v) =>
XAddMessage(streamKey1, Map(k -> v))
}
}
def run: IO[Unit] = {
for {
client <- Stream.resource(RedisClient[IO].from("redis://localhost"))
streaming <- RedisStream.mkStreamingConnection[IO, String, String](client, stringCodec)
source = streaming.read(Set(streamKey1, streamKey2), chunkSize = 1)
appender = streaming.append
_ <- Stream(
source.evalMap(putStrLn(_)),
Stream.awakeEvery[IO](3.seconds) >> randomMessage.through(appender)
).parJoin(2).void
} yield ()
}.compile.drain
}