PubSub (experimental)

Simple, safe and pure functional streaming client to interact with Redis PubSub.

Establishing a connection

There are three options available in the PubSub interpreter:

  • mkPubSubConnection: Whenever you need one or more subscribers and publishers / stats.
  • mkSubscriberConnection: When all you need is one or more subscribers but no publishing / stats.
  • mkPublisherConnection: When all you need is to publish / stats.

Note: cluster support is not implemented yet.

Subscriber

trait SubscribeCommands[F[_], K, V] {
  def subscribe(channel: RedisChannel[K]): F[V]
  def unsubscribe(channel: RedisChannel[K]): F[Unit]
}

When using the PubSub interpreter the types will be Stream[F, V] and Stream[F, Unit] respectively.

Publisher / PubSubStats

trait PubSubStats[F[_], K] {
  def pubSubChannels: F[List[K]]
  def pubSubSubscriptions(channel: RedisChannel[K]): F[Subscription[K]]
  def pubSubSubscriptions(channels: List[RedisChannel[K]]): F[List[Subscription[K]]]
}

trait PublishCommands[F[_], K, V] extends PubSubStats[F, K] {
  def publish(channel: RedisChannel[K]): F[V] => F[Unit]
}

When using the PubSub interpreter the publish function will be defined as a Sink[F, V] that can be connected to a Stream[F, ?] source.

PubSub example

import cats.effect._
import cats.syntax.all._
import dev.profunktor.redis4cats.connection.RedisClient
import dev.profunktor.redis4cats.data._
import dev.profunktor.redis4cats.pubsub.PubSub
import dev.profunktor.redis4cats.log4cats._
import fs2.{Pipe, Stream}
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import scala.concurrent.duration._
import scala.util.Random

object PubSubDemo extends IOApp.Simple {

  implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO]

  private val stringCodec = RedisCodec.Utf8

  private val eventsChannel = RedisChannel("events")
  private val gamesChannel  = RedisChannel("games")

  def sink(name: String): Pipe[IO, String, Unit] = _.evalMap(x => IO(println(s"Subscriber: $name >> $x")))

  val program: Stream[IO, Unit] =
    for {
      client <- Stream.resource(RedisClient[IO].from("redis://localhost"))
      pubSub <- Stream.resource(PubSub.mkPubSubConnection[IO, String, String](client, stringCodec))
      sub1   = pubSub.subscribe(eventsChannel)
      sub2   = pubSub.subscribe(gamesChannel)
      pub1   = pubSub.publish(eventsChannel)
      pub2   = pubSub.publish(gamesChannel)
      _  <- Stream(
             sub1.through(sink("#events")),
             sub2.through(sink("#games")),
             Stream.awakeEvery[IO](3.seconds) >> Stream.eval(IO(Random.nextInt(100).toString)).through(pub1),
             Stream.awakeEvery[IO](5.seconds) >> Stream.emit("Pac-Man!").through(pub2),
             Stream.awakeDelay[IO](11.seconds) >> pubSub.unsubscribe(gamesChannel),
             Stream.awakeEvery[IO](6.seconds) >> pubSub
               .pubSubSubscriptions(List(eventsChannel, gamesChannel))
               .evalMap(x => IO(println(x)))
           ).parJoin(6).void
    } yield ()

  def run: IO[Unit] = program.compile.drain

}