Getting Started
This module intends to be an enriched wrapper for the KCL, offered by AWS.
Installation
libraryDependencies += "io.github.etspaceman" %% "kinesis4cats-kcl" % "0.0.32"
Usage
import cats.effect._
import cats.syntax.all._
import software.amazon.kinesis.processor.SingleStreamTracker
import kinesis4cats.kcl._
import kinesis4cats.syntax.bytebuffer._
object MyApp extends ResourceApp.Forever {
override def run(args: List[String]) = for {
consumer <- KCLConsumer.Builder.default[IO](
new SingleStreamTracker("my-stream"),
"my-app-name",
).withCallback(
(records: List[CommittableRecord[IO]]) =>
records.traverse_(r => IO.println(r.data.asString))
).build
_ <- consumer.run()
} yield ()
}
Usage - Multi Stream
The KCL introduced the capability to consume from multiple streams within the same application on an experimental basis. This module offers some helpers for constructing consumers for this.
It is not recommended to use this in production as scaling the application becomes more difficult when you have to consider more than one stream.
import cats.effect._
import cats.syntax.all._
import software.amazon.kinesis.common._
import kinesis4cats.client._
import kinesis4cats.models._
import kinesis4cats.kcl._
import kinesis4cats.kcl.multistream._
import kinesis4cats.syntax.bytebuffer._
object MyApp extends ResourceApp.Forever {
override def run(args: List[String]) = {
val streamArn1 = StreamArn(AwsRegion.US_EAST_1, "my-stream-1", "123456789012")
val streamArn2 = StreamArn(AwsRegion.US_EAST_1, "my-stream-2", "123456789012")
val position = InitialPositionInStreamExtended
.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
for {
kinesisClient <- KinesisClient.Builder.default[IO].build
tracker <- MultiStreamTracker.noLeaseDeletionFromArns[IO](
kinesisClient,
Map(streamArn1 -> position, streamArn2 -> position)
).toResource
consumer <- KCLConsumer.Builder
.default[IO](tracker, "my-app-name")
.withCallback(
(records: List[CommittableRecord[IO]]) =>
records.traverse_(r => IO.println(r.data.asString))
).build
_ <- consumer.run()
} yield ()
}
}
FS2
This package intends to be an enriched wrapper for the KCL, offered by AWS. This specific wrapper uses an FS2 Stream
Usage
import cats.effect._
import software.amazon.kinesis.processor.SingleStreamTracker
import kinesis4cats.kcl.fs2.KCLConsumerFS2
import kinesis4cats.syntax.bytebuffer._
object MyApp extends ResourceApp.Forever {
override def run(args: List[String]) = for {
consumer <- KCLConsumerFS2.Builder.default[IO](
new SingleStreamTracker("my-stream"),
"my-app-name",
).build
_ <- consumer
.stream()
.flatMap(stream =>
stream
.evalTap(x => IO.println(x.data.asString))
.through(consumer.commitRecords)
.compile
.resource
.drain
)
} yield ()
}