Spring Cloud Stream AWS Kinesis Binder

From NovaOrdis Knowledge Base
Revision as of 21:01, 18 December 2018 by Ovidiu (talk | contribs) (→‎Sending)
Jump to navigation Jump to search

External

Internal

Overview

Dependencies

dependencies {
    implementation('org.springframework.cloud:spring-cloud-stream-binder-kinesis:1.0.0.RELEASE')
}

Playground Examples

Playground Kinesis Producer
Playground Kinesis Consumer

Message Propagation

SpringCloudStreamAmazonKinesisBinder.png

Sending

The payload passed to the message as follows:

MessageChannel outputChannel = ...;

String payload = "..."
Message m = new GenericMessage<>(payload);
outputChannel.send(m);

is recovered at the other end as follows:

@StreamListener(InputChannelFactory.INPUT_CHANNEL_NAME)
public void handle(Message<String> e) {
  String payload = e.getPayload();
}

For unclear reasons, custom headers are not propagated. Probably configuration, somewhere.

As a side effect of simply creating the message, the "id" and "timestamp" headers are added by the Spring Messaging layer:

"id" -> "4a96cc04-1116-94de-268b-d442b0e86683"
"timestamp" -> "1545163131036"

The "contentType" header is added by interceptor, in pre-send.

As part of the sending sequence, the headers are (by default) "embedded" and serialized together with the payload. Other header modes are possible (see org.springframework.cloud.stream.binder.HeaderMode). See org.springframework.cloud.stream.binder.AbstractMessageChannelBinder serializeAndEmbedHeadersIfApplicable().

At the Kinesis layer (org.springframework.integration.aws.outbound.KinesisMessageHandler) a new PutRecordRequest instance is created. This is where the following headers are added:

  • "aws_stream" (AwsHeaders.STREAM)
  • "aws_partitionKey" (AwsHeaders.PARTITION_KEY)
  • "aws_sequenceNumber" (AwsHeaders.SEQUENCE_NUMBER)

Receiving

It seems that KinesisMessageDrivenChannelAdapter is polling (org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.processTask()).

If there are "records", those records turned into messages "sent" to the @StreamListener.

A "raw" record contains a sequence number, an approximate arrival timestamp, the partitioin key and the data. This layer adds the following headers:

  • "aws_receivedStream" (AwsHeaders.RECEIVED_STREAM)
  • "aws_shard" (AwsHeaders.SHARD)
  • "aws_receivedPartitionKey" (AwsHeaders.RECEIVED_PARTITION_KEY)
  • "aws_receivedSequenceNumber" (AwsHeaders.RECEIVED_SEQUENCE_NUMBER)

Consumer Group Support

Also see:

Spring Cloud Stream Consumer Groups

Configuration

Stream Auto-Creation

1.0.0 cannot be configured to fail on inexistent stream, instead of creating it. See org.springframework.cloud.stream.binder.kinesis.provisioning.KinesisStreamProvisioner line 135.

Also see

Spring Integration Configuration