Localstack
The ability to provide a KCLConsumer that is compliant with Localstack
Installation
libraryDependencies += "io.github.etspaceman" %% "kinesis4cats-kcl-localstack" % "0.0.32"
Usage
import cats.effect.IO
import cats.syntax.all._
import software.amazon.kinesis.processor.SingleStreamTracker
import kinesis4cats.kcl._
import kinesis4cats.kcl.localstack.LocalstackKCLConsumer
import kinesis4cats.syntax.bytebuffer._
val processRecords = (records: List[CommittableRecord[IO]]) =>
records.traverse_(record => IO.println(record.data.asString))
// Builds a KCLConsumer as a Resource.
LocalstackKCLConsumer.Builder.default[IO](
new SingleStreamTracker("my-stream"),
"my-app-name"
).flatMap(_.withCallback(processRecords).build)
// Runs a KCLConsumer as a Resource. Resource contains a Deferred value,
//which completes when the consumer has begun to process records.
LocalstackKCLConsumer.Builder.default[IO](
new SingleStreamTracker("my-stream"),
"my-app-name"
).flatMap(_.withCallback(processRecords).build)
// Runs a KCLConsumer as a Resource. Resource contains 2 things:
// - A Deferred value, which completes when the consumer has begun to process records.
// - A results Queue, which contains records received by the consumer
LocalstackKCLConsumer.Builder.default[IO](
new SingleStreamTracker("my-stream"),
"my-app-name"
).flatMap(_.withCallback(processRecords).build)
Usage - FS2
import cats.effect.IO
import software.amazon.kinesis.processor.SingleStreamTracker
import kinesis4cats.kcl.fs2.localstack.LocalstackKCLConsumerFS2
// Runs a KCLConsumerFS2 as a Resource, which contains FS2 Streaming methods.
LocalstackKCLConsumerFS2.Builder.default[IO](
new SingleStreamTracker("my-stream"),
"my-app-name"
).flatMap(_.build)