Getting Started

This module intends to be an enriched wrapper for the KinesisAsyncClient class, offered by the Java SDK.

Installation

libraryDependencies += "io.github.etspaceman" %% "kinesis4cats-client" % "0.0.32"

Usage

import cats.effect._
import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.services.kinesis.model._

import kinesis4cats.client.KinesisClient

object MyApp extends IOApp {
    override def run(args: List[String]) = 
        KinesisClient.Builder.default[IO].build.use(client => 
            for {
                _ <- client.createStream(
                    CreateStreamRequest
                        .builder()
                        .streamName("my-stream")
                        .shardCount(1)
                        .build()
                )
                _ <- client.putRecord(
                    PutRecordRequest
                        .builder()
                        .partitionKey("some-partition-key")
                        .streamName("my-stream")
                        .data(SdkBytes.fromUtf8String("my-data"))
                        .build()
                )
            } yield ExitCode.Success
        )
}

Producer

kinesis4cats offers a Producer interface that handles the following:

This module provides an implementation of that interface, backed by the KinesisClient.

import cats.data.NonEmptyList
import cats.effect._

import kinesis4cats.client.producer.KinesisProducer
import kinesis4cats.producer._
import kinesis4cats.models.StreamNameOrArn

object MyApp extends IOApp {
  override def run(args: List[String]) = 
    KinesisProducer.Builder
      .default[IO](StreamNameOrArn.Name("my-stream"))
      .build
      .use(producer =>
        for {
          _ <- producer.put(
            NonEmptyList.of(
              Record("my-data".getBytes(), "some-partition-key"),
              Record("my-data-2".getBytes(), "some-partition-key-2"),
              Record("my-data-3".getBytes(), "some-partition-key-3"),
            )
          )
        } yield ExitCode.Success
      )
}

Failures vs Exceptions

The Producer interface works with the PutRecords API. This API can yield two possible error paths:

  1. Exceptions: Exceptions raised when interacting with the API. These are rare.
  2. Failures: Successful API responses, but with a partially-failed response.

In the 2nd case, the put method will return a result that contains:

The Producer offering here allows users to handle these failure paths in multiple ways.

Retrying failures

A user can supply a RetryPolicy that can be used to retry both error paths until a fully successful response is received.

In the event of a partially-failed response, the retry routine will only retry the failed records.

import cats.effect._

import kinesis4cats.client.producer.KinesisProducer
import kinesis4cats.compat.retry._
import kinesis4cats.models.StreamNameOrArn

// Retry 5 times with no delay between retries
val policy = RetryPolicies.limitRetries[IO](5)

KinesisProducer.Builder
  .default[IO](StreamNameOrArn.Name("my-stream"))
  .transformConfig(x => x.copy(retryPolicy = policy))
  .build

Raising exceptions

A user can configure the producer to raise an exception if any of the error paths are detected (including partially failed records).

import cats.effect._

import kinesis4cats.client.producer.KinesisProducer
import kinesis4cats.models.StreamNameOrArn

KinesisProducer.Builder
  .default[IO](StreamNameOrArn.Name("my-stream"))
  .transformConfig(x => x.copy(raiseOnFailures = true))
  .build

This can be done along with configuring a retry policy, in which case an error would be raised after all retries are exhausted.

import cats.effect._

import kinesis4cats.client.producer.KinesisProducer
import kinesis4cats.compat.retry._
import kinesis4cats.models.StreamNameOrArn

// Retry 5 times with no delay between retries
val policy = RetryPolicies.limitRetries[IO](5)

KinesisProducer.Builder
  .default[IO](StreamNameOrArn.Name("my-stream"))
  .transformConfig(x => x.copy(retryPolicy = policy, raiseOnFailures = true))
  .build

FS2 Producer

This package provides a KPL-like producer via implementing FS2Producer. This interface receives records from a user, enqueues them into a Queue and puts them as batches to Kinesis on a configured interval. This leverages all of the functionality of the Producer interface, including batching, aggregation and retries.

import cats.effect._

import kinesis4cats.client.producer.fs2.FS2KinesisProducer
import kinesis4cats.models.StreamNameOrArn
import kinesis4cats.producer.Record

object MyApp extends IOApp {
    override def run(args: List[String]) = 
        FS2KinesisProducer.Builder
          .default[IO](StreamNameOrArn.Name("my-stream"))
          .build
          .use(producer =>
            for {
              _ <- producer.put(
                  Record("my-data".getBytes(), "some-partition-key")
                )
              _ <- producer.put(
                  Record("my-data-2".getBytes(), "some-partition-key-2")
                )
              _ <- producer.put(
                  Record("my-data-3".getBytes(), "some-partition-key-3")
                )
            } yield ExitCode.Success
        )
}

Callbacks

Kinesis4Cats offers a nested effect as the return type for the FS2Producer's put method. This nested effect serves as a callback for users to interact with the put result and any errors that occurred with the puts. For example:

import cats.effect._
import cats.syntax.all._

import kinesis4cats.client.producer.fs2.FS2KinesisProducer
import kinesis4cats.models.StreamNameOrArn
import kinesis4cats.producer.Record

object MyApp extends IOApp {
    override def run(args: List[String]) = 
        FS2KinesisProducer.Builder
          .default[IO](StreamNameOrArn.Name("my-stream"))
          .build
          .use { producer =>
            val records = List(
              Record("my-data".getBytes(), "some-partition-key"),
              Record("my-data-2".getBytes(), "some-partition-key-2"),
              Record("my-data-3".getBytes(), "some-partition-key-3")
            )
            for {
              callbacks <- records.traverse(producer.put)
              _ <- callbacks.traverse(cb => cb.attempt.flatMap(x => IO.println(s"Response: $x")))
            } yield ExitCode.Success
        }
}