Ciris
Standard environment variables and system properties for configuring a KCLConsumer, via Ciris
Installation
libraryDependencies += "io.github.etspaceman" %% "kinesis4cats-kcl-ciris" % "0.0.33"
Usage
import cats.effect._
import cats.syntax.all._
import kinesis4cats.kcl._
import kinesis4cats.kcl.ciris.KCLCiris
import kinesis4cats.syntax.bytebuffer._
object MyApp extends ResourceApp.Forever {
override def run(args: List[String]) = for {
consumer <- KCLCiris.consumer[IO](){
case records: List[CommittableRecord[IO]] =>
records.traverse_(r => IO.println(r.data.asString))
}
_ <- consumer.run()
} yield ()
}
Configuration
The configuration below is intended to only work with single-stream consumers at this time.
Common
Environment Variable | System Property | Required | Default | Description |
---|---|---|---|---|
KCL_APP_NAME |
kcl.app.name |
Yes | A unique name to give your consumer application. | |
KCL_STREAM_NAME |
kcl.stream.name |
Yes | The name of the stream to consume | |
KPL_INITIAL_POSITION |
kcl.initial.position |
No | LATEST |
The position to start stream consumption. Valid values are TRIM_HORIZON , LATEST and AT_TIMESTAMP:mytimestamp (timestamp must be parsable by java.time.Instant.parse ) |
Checkpoint
Currently the CheckpointConfig does not have any configuration, but this section is here in case it is provided at a later date.
Environment Variable | System Property | Required | Default | Description |
---|
Coordinator
CoordinatorConfig values.
Environment Variable | System Property | Required | Default | Description |
---|---|---|---|---|
KCL_COORDINATOR_MAX_INITIALIZATION_ATTEMPTS |
kcl.coordinator.max.initialization.attempts |
No | 20 | The maximum number of attempts to initialize the Scheduler |
KCL_COORDINATOR_PARENT_SHARD_POLL_INTERVAL |
kcl.coordinator.parent.shard.poll.interval |
No | 10 seconds | Interval between polling to check for parent shard completion |
KCL_COORDINATOR_SKIP_SHARD_SYNC_AT_INITIALIZATION_IF_LEASES_EXIST |
kcl.coordinator.skip.shard.sync.at.initialization.if.leases.exist |
No | false | If true, the Scheduler will skip shard sync during initialization if there are one or more leases in the lease table. |
KCL_COORDINATOR_SHARD_CONSUMER_DISPATCH_POLL_INTERVAL |
kcl.coordinator.shard.consumer.dispatch.poll.interval |
No | 1 second | The duration between polling of the shard consumer for triggering state changes, and health checks. |
KCL_COORDINATOR_SCHEDULER_INITIALIZATION_BACKOFF_TIME |
kcl.coordinator.scheduler.initialization.backoff.time |
No | 1 second | Interval between retrying the scheduler initialization. |
Lease
LeaseManagementConfig values.
Environment Variable | System Property | Required | Default | Description |
---|---|---|---|---|
KCL_LEASE_TABLE_NAME |
kcl.lease.table.name |
No | KCL_APP_NAME /kcl.app.name |
Name of the table to use in DynamoDB |
KCL_LEASE_WORKER_ID |
kcl.lease.worker.id |
No | Utils.randomUUIDString |
Used to distinguish different workers/processes of a KCL application. |
KCL_LEASE_FAILOVER_TIME |
kcl.lease.failover.time |
No | 10 seconds |
A worker which does not renew it's lease within this time interval will be regarded as having problems and it's shards will be assigned to other workers. |
KCL_LEASE_SHARD_SYNC_INTERVAL |
kcl.lease.shard.sync.interval |
No | 60 seconds |
Shard sync interval |
KCL_LEASE_CLEANUP_LEASES_UPON_SHARD_COMPLETION |
kcl.lease.cleanup.leases.upon.shard.completion |
No | true | Cleanup leases upon shards completion. |
KCL_LEASE_MAX_LEASES_FOR_WORKER |
kcl.lease.max.leases.for.worker |
No | Integer.MAX_VALUE |
The max number of leases (shards) this worker should process. |
KCL_LEASE_MAX_LEASES_TO_STEAL_AT_ONE_TIME |
kcl.lease.max.leases.to.steal.at.one.time |
No | 1 |
Max leases to steal from another worker at one time (for load balancing). |
KCL_LEASE_INITIAL_LEASE_TABLE_READ_CAPACITY |
kcl.lease.initial.lease.table.read.capacity |
No | 10 |
The Amazon DynamoDB table used for tracking leases will be provisioned with this read capacity if the billing mode is set to PROVISIONED |
KCL_LEASE_INITIAL_LEASE_TABLE_WRITE_CAPACITY |
kcl.lease.initial.lease.table.write.capacity |
No | 10 |
The Amazon DynamoDB table used for tracking leases will be provisioned with this write capacity if the billing mode is set to PROVISIONED |
KCL_LEASE_MAX_LEASE_RENEWAL_THREADS |
kcl.lease.max.lease.renewal.threads |
No | 20 |
The size of the thread pool to create for the lease renewer to use. |
KCL_LEASE_IGNORE_UNEXPECTED_CHILD_SHARDS |
kcl.lease.ignore.unexpected.child.shards |
No | false |
If true, ignores child shards that are received unexpectedly. |
KCL_LEASE_CONSISTENT_READS |
kcl.lease.consistent.reads |
No | false |
If true, ensures that reads from dynamo are consistent. |
KCL_LEASE_LIST_SHARDS_BACKOFF_TIME |
kcl.lease.list.shards.backoff.time |
No | 1.5 seconds |
Duration to wait between list-shards calls. |
KCL_LEASE_MAX_LIST_SHARDS_RETRY_ATTEMPTS |
kcl.lease.max.list.shards.retry.attempts |
No | 50 |
Amount of retries possible for list-shards calls. |
KCL_LEASE_EPSILON |
kcl.lease.epsilon |
No | 25 ms |
Applies variance when calculating lease expirations |
KCL_LEASE_DYNAMO_REQUEST_TIMEOUT |
kcl.lease.dynamo.request.timeout |
No | 1 minute |
Timeout for requests to dynamo |
KCL_LEASE_BILLING_MODE |
kcl.lease.billing.mode |
No | PAY_PER_REQUEST |
Billing mode to use for the dynamo table upon its creation. Valid values are PAY_PER_REQUEST and PROVISIONED |
KCL_LEASE_LEASES_RECOVERY_AUDITOR_EXECUTION_FREQUENCY |
kcl.lease.leases.auditor.execution.frequency |
No | 2 minutes |
Frequency of the auditor job to scan for partial leases in the lease table. |
KCL_LEASE_LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD |
kcl.lease.leases.auditor.inconsistency.confidence.threshold |
No | 3 |
Confidence threshold for the periodic auditor job to determine if leases for a stream in the lease table is inconsistent. |
KCL_LEASE_MAX_CACHE_MISSES_BEFORE_RELOAD |
kcl.lease.max.cache.misses.before.reload |
No | 1000 |
Amount of cache misses before reloading the lease cache. |
KCL_LEASE_LIST_SHARDS_CACHE_ALLOWED_AGE |
kcl.lease.list.shards.cache.allowed.age |
No | 30 seconds |
Duration that the list shards cache is valid. |
KCL_LEASE_CACHE_MISS_WARNING_MODULUS |
kcl.lease.cache.miss.warning.modulus |
No | 250 |
Modulus to use against the number of cache misses before a warning is logged. |
KCL_LEASE_TABLE_DELETION_PROTECTION_ENABLED |
kcl.lease.table.deletion.protection.enabled |
No | false |
Whether to enable deletion protection on the DynamoDB lease table created by KCL. This does not update already existing tables. |
KCL_LEASE_TABLE_PITR_ENABLED |
kcl.lease.table.pitr.enabled |
No | false |
Whether to enable PITR (point in time recovery) on the DynamoDB lease table created by KCL. If true, this can update existing table's PITR. |
KCL_LEASE_IN_MEMORY_WORKER_METRICS_CAPTURE_FREQUENCY |
kcl.lease.in.memory.worker.metrics.capture.frequency |
No | 1 second |
This defines the frequency of capturing worker metric stats in memory. |
KCL_LEASE_WORKER_METRICS_REPORTER_FREQ |
kcl.lease.in.memory.worker.metrics.reporter.freq |
No | 30 seconds |
This defines the frequency of reporting worker metric stats to storage. |
KCL_LEASE_NO_OF_PERSISTED_METRICS_PER_WORKER_METRICS |
kcl.lease.no.of.persisted.metrics.per.worker.metrics |
No | 10 |
These are the no. of metrics that are persisted in storage in WorkerMetricStats ddb table. |
KCL_LEASE_DISABLE_WORKER_METRICS |
kcl.lease.disable.worker.metrics |
No | false |
Option to disable workerMetrics to use in lease balancing. |
KCL_LEASE_MAX_THROUGHPUT_PER_HOST_KBPS |
kcl.lease.max.throughput.per.host.kbps |
No | Double.MAX_VALUE |
Max throughput per host KBps, default is unlimited. |
KCL_LEASE_DAMPENING_PERCENTAGE |
kcl.lease.dampening.percentage |
No | 60 |
Percentage of value to achieve critical dampening during this case |
KCL_LEASE_REBALANCE_THRESHOLD_PERCENTAGE |
kcl.lease.rebalance.threshold.percentage |
No | 10 |
Percentage value used to trigger reBalance. If fleet has workers which are have metrics value more or less than 10% of fleet level average then reBalance is triggered. Leases are taken from workers with metrics value more than fleet level average. The load to take from these workers is determined by evaluating how far they are with respect to fleet level average. |
KCL_LEASE_ALLOW_THROUGHPUT_OVERSHOOT |
kcl.lease.allow.throughput.overshoot |
No | true |
The allowThroughputOvershoot flag determines whether leases should still be taken even if it causes the total assigned throughput to exceed the desired throughput to take for re-balance. Enabling this flag provides more flexibility for the LeaseAssignmentManager to explore additional assignment possibilities, which can lead to faster throughput convergence. |
KCL_LEASE_STABLE_WORKER_METRICS_ENTRY_CLEANUP_DURATION |
kcl.lease.stable.worker.metrics.entry.cleanup.duration |
No | 1 day |
Duration after which workerMetricStats entry from WorkerMetricStats table will be cleaned up. When an entry's lastUpdateTime is older than staleWorkerMetricsEntryCleanupDuration from current time, entry will be removed from the table. |
KCL_LEASE_VARIANCE_BALANCING_FREQUENCY |
kcl.lease.variance.balancing.frequency |
No | 3 |
Frequency to perform worker variance balancing. This value is used with respect to the LAM frequency, that is every third (as default) iteration of LAM the worker variance balancing will be performed. Setting it to 1 will make varianceBalancing run on every iteration of LAM and 2 on every 2nd iteration and so on. |
KCL_LEASE_WORKER_METRICS_EMA_ALPHA |
kcl.lease.worker.metrics.ema.alpha |
No | 0.5 |
Alpha value used for calculating exponential moving average of worker's metricStats. Selecting higher alpha value gives more weightage to recent value and thus low smoothing effect on computed average and selecting smaller alpha values gives more weightage to past value and high smoothing effect. |
KCL_LEASE_WORKER_METRICS_TABLE_BILLING_MODE |
kcl.lease.worker.metrics.table.billing.mode |
No | PAY_PER_REQUEST |
Billing mode used to create the DDB table for worker metrics |
KCL_LEASE_WORKER_METRICS_TABLE_READ_CAPACITY |
kcl.lease.worker.metrics.table.read.capacity |
No | Read capacity to provision during DDB table creation for worker metrics, if billing mode is PROVISIONED.DDB | |
KCL_LEASE_WORKER_METRICS_TABLE_WRITE_CAPACITY |
kcl.lease.worker.metrics.table.write.capacity |
No | Write capacity to provision during DDB table creation for worker metrics, if billing mode is PROVISIONED.DDB | |
KCL_LEASE_WORKER_METRICS_TABLE_PITR_ENABLED |
kcl.lease.worker.metrics.table.pitr.enabled |
No | false | Flag to enable Point in Time Recovery on the DDB table for worker metrics. |
KCL_LEASE_WORKER_METRICS_TABLE_DELETION_PROTECTION_ENABLED |
kcl.lease.worker.metrics.table.deletion.protection.enabled |
No | false | Flag to enable deletion protection on the DDB table for worker metrics. |
KCL_LEASE_TABLE_TAGS |
kcl.lease.table.tags |
No | Tags to add to the DDB table. In the format of key:value,key2:value2 |
|
KCL_LEASE_WORKER_METRICS_TABLE_TAGS |
kcl.lease.worker.metrics.table.tags |
No | Tags to add to the DDB table for worker metrics. In the format of key:value,key2:value2 |
Lifecycle
LifecycleConfig values.
Environment Variable | System Property | Required | Default | Description |
---|---|---|---|---|
KCL_LIFECYCLE_LOG_WARNING_FOR_TASK_AFTER |
kcl.lifecycle.log.warning.for.task.after |
No | None | Logs warn message if as task is held in a task for more than the set time. |
KCL_LIFECYCLE_TASK_BACKOFF_TIME |
kcl.lifecycle.task.backoff.time |
No | 500 ms |
Backoff time for Amazon Kinesis Client Library tasks (in the event of failures). |
KCL_LIFECYCLE_READ_TIMEOUTS_TO_IGNORE_BEFORE_WARNING |
kcl.lifecycle.read.timeouts.to.ignore.before.warning |
No | 0 |
Number of consecutive ReadTimeouts to ignore before logging warning messages. |
Metrics
MetricsConfig values.
Environment Variable | System Property | Required | Default | Description |
---|---|---|---|---|
KCL_METRICS_NAMESPACE |
kcl.metrics.namespace |
No | KCL_APP_NAME /kcl.app.name |
Namespace for KCL metrics. |
KCL_METRICS_BUFFER_TIME |
kcl.metrics.buffer.time |
No | 10 seconds |
Buffer metrics for at most this long before publishing to CloudWatch. |
KCL_METRICS_MAX_QUEUE_SIZE |
kcl.metrics.max.queue.size |
No | 10000 |
Buffer at most this many metrics before publishing to CloudWatch. |
KCL_METRICS_LEVEL |
kcl.metrics.level |
No | DETAILED |
Metrics level for which to enable CloudWatch metrics. Valid values are DETAILED , SUMMARY and NONE |
KCL_METRICS_ENABLED_DIMENSIONS |
kcl.metrics.enabled.dimensions |
No | Set("ALL") |
Allowed dimensions for CloudWatchMetrics. |
KCL_METRICS_PUBLISHER_FLUSH_BUFFER |
kcl.metrics.publisher.flush.buffer |
No | 200 |
Buffer size for MetricDatums before publishing. |
Retrieval
RetrievalConfig values.
Environment Variable | System Property | Required | Default | Description |
---|---|---|---|---|
KCL_RETRIEVAL_TYPE |
kcl.retrieval.type |
Yes | fanout |
Type of retrieval for the KCL to use. Valid values are fanout and polling |
KCL_RETRIEVAL_LIST_SHARDS_BACKOFF_TIME |
kcl.retrieval.list.shards.backoff.time |
No | 1.5 seconds |
Backoff time between consecutive ListShards calls. |
KCL_RETRIEVAL_MAX_LIST_SHARDS_RETRY_ATTEMPTS |
kcl.retrieval.max.list.shards.retry.attempts |
No | 50 |
Max number of retries for ListShards when throttled/exception is thrown. |
FanOut
Configuration values if KCL_RETRIEVAL_TYPE
/kcl.retrieval.type
is fanout
. Used for constructing the FanOutConfig
Environment Variable | System Property | Required | Default | Description |
---|---|---|---|---|
KCL_RETRIEVAL_FANOUT_CONSUMER_ARN |
kcl.retrieval.fanout.consumer.arn |
No | The ARN of an already created consumer, if this is set no automatic consumer creation will be attempted. | |
KCL_RETRIEVAL_FANOUT_CONSUMER_NAME |
kcl.retrieval.fanout.consumer.name |
No | KCL_APP_NAME /kcl.app.name |
The name of the consumer to create. |
KCL_RETRIEVAL_FANOUT_MAX_DESCRIBE_STREAM_SUMMARY_RETRIES |
kcl.retrieval.fanout.max.describe.stream.summary.retries |
No | 10 |
The maximum number of retries for calling describe stream summary. |
KCL_RETRIEVAL_FANOUT_MAX_DESCRIBE_STREAM_CONSUMER_RETRIES |
kcl.retrieval.fanout.max.describe.stream.consumer.retries |
No | 10 |
The maximum number of retries for calling DescribeStreamConsumer. |
KCL_RETRIEVAL_FANOUT_REGISTER_STREAM_CONSUMER_RETRIES |
kcl.retrieval.fanout.register.stream.consumer.retries |
No | 10 |
The maximum number of retries for calling DescribeStreamConsumer. |
KCL_RETRIEVAL_FANOUT_RETRY_BACKOFF |
kcl.retrieval.fanout.retry.backoff |
No | 1 second |
The maximum amount of time that will be made between failed calls. |
Polling
Configuration values if KCL_RETRIEVAL_TYPE
/kcl.retrieval.type
is polling
. Used for constructing the PollingConfig
Environment Variable | System Property | Required | Default | Description |
---|---|---|---|---|
KCL_RETRIEVAL_POLLING_MAX_RECORDS |
kcl.retrieval.polling.max.records |
No | 10000 |
Max records to fetch from Kinesis in a single GetRecords call. |
KCL_RETRIEVAL_POLLING_IDLE_TIME_BETWEEN_READS |
kcl.retrieval.polling.idle.time.between.reads |
No | 1 second |
The value for how long the ShardConsumer should sleep in between calls to GetRecords |
KCL_RETRIEVAL_POLLING_RETRY_GET_RECORDS_INTERVAL |
kcl.retrieval.polling.retry.get.records.interval |
No | None |
Time to wait in seconds before the worker retries to get a record. If None, retries immediately |
KCL_RETRIEVAL_POLLING_MAX_GET_RECORDS_THREAD_POOL |
kcl.retrieval.polling.max.get.records.thread.pool |
No | None |
The max number of threads in the records thread pool. If None, there is no limit. |
Processor
ProcessorConfig values.
Environment Variable | System Property | Required | Default | Description |
---|---|---|---|---|
KCL_PROCESSOR_CALL_PROCESS_RECORDS_EVEN_FOR_EMPTY_LIST |
kcl.processor.call.process.records.even.for.empty.list |
No | false |
If true, process records will be invoked on empty batches. |
KCL_PROCESSOR_RAISE_ON_ERROR |
kcl.processor.raise.on.error |
No | true |
If true, the application will raise an error and be killed if processRecords also throws an error. Otherwise, the error will be logged but the consumer will receive a new batch of records. |
KCL_PROCESSOR_CHECKPOINT_RETRIES |
kcl.processor.checkpoint.retries |
No | 5 |
Max retries for retrying commits if autoCommit is true |
KCL_PROCESSOR_CHECKPOINT_RETRY_INTERVAL |
kcl.processor.checkpoint.retry.interval |
No | 0 seconds |
Checkpoint retry interval for retrying commits if autoCommit is true |
KCL_PROCESSOR_AUTO_COMMIT |
kcl.processor.auto.commit |
No | 0 seconds |
If true, the record processor will automatically commit records after the user defined callback is complete. |
Duration
Some of these environment variables are loaded as a Scala Duration, and are parsed through its apply method.
For example, if you wanted to set the duration to a day, you could use:
- 1d
- 1 day
- 24 hours
- 24h
FS2 Configuration
Standard environment variables and system properties for configuring a KCLConsumerFS2, via Ciris
Usage
import cats.effect._
import kinesis4cats.kcl.fs2.ciris.KCLCirisFS2
import kinesis4cats.syntax.bytebuffer._
object MyApp extends ResourceApp.Forever {
override def run(args: List[String]) = for {
consumer <- KCLCirisFS2.consumer[IO]()
_ <- consumer.stream()
.flatMap(stream =>
stream
.evalTap(x => IO.println(x.data.asString))
.through(consumer.commitRecords)
.compile
.resource
.drain
)
} yield ()
}
Configuration
The configuration below is intended to only work with single-stream consumers at this time.
Environment Variable | System Property | Required | Default | Description |
---|---|---|---|---|
KCL_FS2_QUEUE_SIZE |
kcl.fs2.queue.size |
No | 100 | Size of the global records queue. If full, backpressure will occur. |
KCL_FS2_COMMIT_MAX_CHUNK |
kcl.fs2.commit.max.chunk |
No | 1000 | Max size of records in the commit queue before commits are made. |
KCL_FS2_COMMIT_MAX_WAIT |
kcl.fs2.commit.max.wait |
No | 10 seconds | Max interval between commit batch evaluation. |
KCL_FS2_COMMIT_MAX_RETRIES |
kcl.fs2.commit.max.retries |
No | 5 | Max retries for running commits |
KCL_FS2_COMMIT_RETRY_INTERVAL |
kcl.fs2.commit.retry.interval |
No | 0 seconds | Interval between commit retries |