Localstack

The ability to provide a KCLConsumer that is compliant with Localstack

Installation

libraryDependencies += "io.github.etspaceman" %% "kinesis4cats-kcl-localstack" % "0.0.28"

Usage

import cats.effect.IO
import cats.syntax.all._
import software.amazon.kinesis.processor.SingleStreamTracker

import kinesis4cats.kcl._
import kinesis4cats.kcl.localstack.LocalstackKCLConsumer
import kinesis4cats.syntax.bytebuffer._

val processRecords = (records: List[CommittableRecord[IO]]) => 
    records.traverse_(record => IO.println(record.data.asString))

// Builds a KCLConsumer as a Resource. 
LocalstackKCLConsumer.Builder.default[IO](
    new SingleStreamTracker("my-stream"), 
    "my-app-name"
).flatMap(_.withCallback(processRecords).build)

// Runs a KCLConsumer as a Resource. Resource contains a Deferred value, 
//which completes when the consumer has begun to process records.
LocalstackKCLConsumer.Builder.default[IO](
    new SingleStreamTracker("my-stream"), 
    "my-app-name"
).flatMap(_.withCallback(processRecords).build)

// Runs a KCLConsumer as a Resource. Resource contains 2 things: 
// - A Deferred value, which completes when the consumer has begun to process records. 
// - A results Queue, which contains records received by the consumer
LocalstackKCLConsumer.Builder.default[IO](
    new SingleStreamTracker("my-stream"), 
    "my-app-name"
).flatMap(_.withCallback(processRecords).build)

Usage - FS2

import cats.effect.IO
import software.amazon.kinesis.processor.SingleStreamTracker

import kinesis4cats.kcl.fs2.localstack.LocalstackKCLConsumerFS2

// Runs a KCLConsumerFS2 as a Resource, which contains FS2 Streaming methods.
LocalstackKCLConsumerFS2.Builder.default[IO](
    new SingleStreamTracker("my-stream"), 
    "my-app-name"
).flatMap(_.build)