Spring Cloud Stream AWS Kinesis Binder

From NovaOrdis Knowledge Base
Jump to navigation Jump to search

External

Internal

Overview

Dependencies

dependencies {
    implementation('org.springframework.cloud:spring-cloud-stream-binder-kinesis:1.0.0.RELEASE')
}

Playground Examples

Playground Kinesis Producer
Playground Kinesis Consumer

Message Propagation

SpringCloudStreamAmazonKinesisBinder.png

Sending

The payload passed to the message as follows:

MessageChannel outputChannel = ...;

String payload = "..."
Message m = new GenericMessage<>(payload);
outputChannel.send(m);

is recovered at the other end as follows:

@StreamListener(InputChannelFactory.INPUT_CHANNEL_NAME)
public void handle(Message<String> e) {
  String payload = e.getPayload();
}

For unclear reasons, custom headers are not propagated. Probably configuration of "embeddedHeaders", somewhere; by default it only contains "correlationId", "sequenceSize", "sequenceNumber", "contentType", "originalContentType".

As a side effect of simply creating the message, the "id" and "timestamp" headers are added by the Spring Messaging layer:

"id" -> "4a96cc04-1116-94de-268b-d442b0e86683"
"timestamp" -> "1545163131036"

The "contentType" header is added by interceptor, in pre-send.

As part of the sending sequence, the headers are (by default) "embedded" and serialized together with the payload. Other header modes are possible (see org.springframework.cloud.stream.binder.HeaderMode). See org.springframework.cloud.stream.binder.AbstractMessageChannelBinder serializeAndEmbedHeadersIfApplicable().

At the Kinesis layer (org.springframework.integration.aws.outbound.KinesisMessageHandler) a new PutRecordRequest instance is created. This is where the following headers are added:

  • "aws_stream" (AwsHeaders.STREAM)
  • "aws_partitionKey" (AwsHeaders.PARTITION_KEY)
  • "aws_sequenceNumber" (AwsHeaders.SEQUENCE_NUMBER)

Receiving

It seems that KinesisMessageDrivenChannelAdapter is polling (org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.processTask()).

If there are "records", those records turned into messages "sent" to the @StreamListener.

A "raw" record contains a sequence number, an approximate arrival timestamp, the partitioin key and the data. This layer adds the following headers:

  • "aws_receivedStream" (AwsHeaders.RECEIVED_STREAM)
  • "aws_shard" (AwsHeaders.SHARD)
  • "aws_receivedPartitionKey" (AwsHeaders.RECEIVED_PARTITION_KEY)
  • "aws_receivedSequenceNumber" (AwsHeaders.RECEIVED_SEQUENCE_NUMBER)

Consumer Group Support

Also see:

Spring Cloud Stream Consumer Groups

Configuration

Amazon Kinesis Binder Manual

Also see:

Spring Cloud Stream Configuration
Spring Integration Configuration



The Kinesis binder configuration namespace is shifted with one position to the right relative to "spring.cloud.stream". Configure carefully, otherwise you may run into hard-to-debug configuration errors.

spring:
  cloud:
    stream:
      bindings:
       <channel-name>:
         destination: "..."
      kinesis:
        bindings:
         <channel-name>:
           consumer:
             shardIteratorType: TRIM_HORIZON

Stream Auto-Creation

1.0.0 cannot be configured to fail on inexistent stream, instead of creating it. See org.springframework.cloud.stream.binder.kinesis.provisioning.KinesisStreamProvisioner line 135.

Also see:

Spring Cloud Stream Configuration
Spring Integration Configuration

Consumer Configuration

Shard Iterator

The default value is LATEST, and it may sometimes miss records that were sent by a different channel in the same JVM. TRIM_HORIZON seems to work better.

spring:
  cloud:
    stream:
      bindings:
        my-channel:
          destination: "my-channel"
      kinesis:
         bindings:
           my-channel:
             consumer:
               shardIteratorType: TRIM_HORIZON

See:

Shard Iterator

Producer Configuration

Header Handling

By default., the Spring Cloud Stream AWS Kinesis Binders embeds headers (such as contentType). To suppress this, configure the producer's "headerMode" using Spring Cloud Stream configuration:

Spring Cloud Stream Configuration - Producer Header Handling