Getting Started

This module intends to be a native Scala implementation of a Kinesis Client using Smithy4s. The advantages of this are enormous, and we can provide interfaces that are more comfortable to every day Scala users.

Experimental Project Warning

:warning: WARNING This project is experimental. It should not be expected to be production ready at this time.

Some known issues:

Installation

libraryDependencies += "io.github.etspaceman" %% "kinesis4cats-smithy4s-client" % "0.0.28"

Usage

import cats.effect._
import com.amazonaws.kinesis._
import org.http4s.blaze.client.BlazeClientBuilder
import org.typelevel.log4cats.slf4j.Slf4jLogger
import smithy4s.aws._
import smithy4s.Blob

import kinesis4cats.smithy4s.client.KinesisClient

object MyApp extends IOApp {
    override def run(args: List[String]) = (for {
        underlying <- BlazeClientBuilder[IO].resource
        client <- KinesisClient.Builder
            .default[IO](underlying, AwsRegion.US_EAST_1)
            .withLogger(Slf4jLogger.getLogger)
            .build
    } yield client).use(client =>
        for {
            _ <- client.createStream(StreamName("my-stream"), Some(1))
            _ <- client.putRecord(
                Data(Blob("my-data".getBytes())),
                PartitionKey("some-partitionk-key"),
                Some(StreamName("my-stream"))
            )
        } yield ExitCode.Success
    )
}

Producer

kinesis4cats offers a Producer interface that handles the following:

This module provides an implementation of that interface, backed by the KinesisClient.

import cats.data.NonEmptyList
import cats.effect._
import org.http4s.blaze.client.BlazeClientBuilder
import org.typelevel.log4cats.slf4j.Slf4jLogger
import smithy4s.aws._

import kinesis4cats.smithy4s.client.producer.KinesisProducer
import kinesis4cats.producer._
import kinesis4cats.models.StreamNameOrArn

object MyApp extends IOApp {
    override def run(args: List[String]) =
        BlazeClientBuilder[IO].resource.flatMap(client =>
            KinesisProducer.Builder
                .default[IO](
                    StreamNameOrArn.Name("my-stream"),
                    client,
                    AwsRegion.US_EAST_1
                )
                .withLogger(Slf4jLogger.getLogger)
                .build
        ).use(producer =>
                for {
                    _ <- producer.put(
                        NonEmptyList.of(
                            Record("my-data".getBytes(), "some-partition-key"),
                            Record("my-data-2".getBytes(), "some-partition-key-2"),
                            Record("my-data-3".getBytes(), "some-partition-key-3"),
                        )
                    )
                } yield ExitCode.Success
        )
}

Failures vs Exceptions

The Producer interface works with the PutRecords API. This API can yield two possible error paths:

  1. Exceptions: Exceptions raised when interacting with the API. These are rare.
  2. Failures: Successful API responses, but with a partially-failed response.

In the 2nd case, the put method will return a result that contains:

The Producer offering here allows users to handle these failure paths in multiple ways.

Retrying failures

A user can supply a RetryPolicy that can be used to retry both error paths until a fully successful response is received.

In the event of a partially-failed response, the retry routine will only retry the failed records.

import cats.effect._
import org.http4s.blaze.client.BlazeClientBuilder
import org.typelevel.log4cats.slf4j.Slf4jLogger
import smithy4s.aws._

import kinesis4cats.compat.retry._
import kinesis4cats.smithy4s.client.producer.KinesisProducer
import kinesis4cats.models.StreamNameOrArn

// Retry 5 times with no delay between retries
val policy = RetryPolicies.limitRetries[IO](5)

BlazeClientBuilder[IO].resource.flatMap(client =>
    KinesisProducer.Builder
        .default[IO](
            StreamNameOrArn.Name("my-stream"),
            client,
            AwsRegion.US_EAST_1
        )
        .transformConfig(x => x.copy(retryPolicy = policy))
        .withLogger(Slf4jLogger.getLogger)
        .build
)

Raising exceptions

A user can configure the producer to raise an exception if any of the error paths are detected (including partially failed records).

import cats.effect._
import org.http4s.blaze.client.BlazeClientBuilder
import org.typelevel.log4cats.slf4j.Slf4jLogger
import smithy4s.aws._

import kinesis4cats.smithy4s.client.producer.KinesisProducer
import kinesis4cats.models.StreamNameOrArn

BlazeClientBuilder[IO].resource.flatMap(client =>
    KinesisProducer.Builder
        .default[IO](
            StreamNameOrArn.Name("my-stream"),
            client,
            AwsRegion.US_EAST_1
        )
        .transformConfig(x => x.copy(raiseOnFailures = true))
        .withLogger(Slf4jLogger.getLogger)
        .build
)

This can be done along with configuring a retry policy, in which case an error would be raised after all retries are exhausted.

import cats.effect._
import org.http4s.blaze.client.BlazeClientBuilder
import org.typelevel.log4cats.slf4j.Slf4jLogger
import smithy4s.aws._

import kinesis4cats.compat.retry._
import kinesis4cats.smithy4s.client.producer.KinesisProducer
import kinesis4cats.models.StreamNameOrArn

// Retry 5 times with no delay between retries
val policy = RetryPolicies.limitRetries[IO](5)

BlazeClientBuilder[IO].resource.flatMap(client =>
    KinesisProducer.Builder
        .default[IO](
            StreamNameOrArn.Name("my-stream"),
            client,
            AwsRegion.US_EAST_1
        )
        .transformConfig(x => x.copy(retryPolicy = policy, raiseOnFailures = true))
        .withLogger(Slf4jLogger.getLogger)
        .build
)

FS2

This package provides a KPL-like producer via implementing FS2Producer. This interface receives records from a user, enqueues them into a Queue and puts them as batches to Kinesis on a configured interval. This leverages all of the functionality of the Producer interface, including batching, aggregation and retries.

Usage

import cats.effect._
import org.http4s.blaze.client.BlazeClientBuilder
import org.typelevel.log4cats.slf4j.Slf4jLogger
import smithy4s.aws._

import kinesis4cats.smithy4s.client.producer.fs2.FS2KinesisProducer
import kinesis4cats.models.StreamNameOrArn
import kinesis4cats.producer.Record

object MyApp extends IOApp {
    override def run(args: List[String]) =
        BlazeClientBuilder[IO].resource.flatMap(client =>
            FS2KinesisProducer.Builder
                .default[IO](
                    StreamNameOrArn.Name("my-stream"),
                    client,
                    AwsRegion.US_EAST_1
                )
                .withLogger(Slf4jLogger.getLogger)
                .build
        ).use(producer =>
            for {
                _ <- producer.put(
                    Record("my-data".getBytes(), "some-partition-key")
                )
                _ <- producer.put(
                    Record("my-data-2".getBytes(), "some-partition-key-2")
                )
                _ <- producer.put(
                    Record("my-data-3".getBytes(), "some-partition-key-3")
                )
            } yield ExitCode.Success
        )
}

Callbacks

Kinesis4Cats offers a nested effect as the return type for the FS2Producer's put method. This nested effect serves as a callback for users to interact with the put result and any errors that occurred with the puts. For example:

import cats.effect._
import cats.syntax.all._
import org.http4s.blaze.client.BlazeClientBuilder
import org.typelevel.log4cats.slf4j.Slf4jLogger
import smithy4s.aws._

import kinesis4cats.smithy4s.client.producer.fs2.FS2KinesisProducer
import kinesis4cats.models.StreamNameOrArn
import kinesis4cats.producer.Record

object MyApp extends IOApp {
    override def run(args: List[String]) =
        BlazeClientBuilder[IO].resource.flatMap(client =>
            FS2KinesisProducer.Builder
                .default[IO](
                    StreamNameOrArn.Name("my-stream"),
                    client,
                    AwsRegion.US_EAST_1
                )
                .withLogger(Slf4jLogger.getLogger)
                .build
        ).use { producer =>
            val records = List(
              Record("my-data".getBytes(), "some-partition-key"),
              Record("my-data-2".getBytes(), "some-partition-key-2"),
              Record("my-data-3".getBytes(), "some-partition-key-3")
            )
            for {
              callbacks <- records.traverse(producer.put)
              _ <- callbacks.traverse(cb => cb.attempt.flatMap(x => IO.println(s"Response: $x")))
            } yield ExitCode.Success
        }
}