Spring Cloud Stream AWS Kinesis Binder: Difference between revisions
No edit summary |
|||
(44 intermediate revisions by the same user not shown) | |||
Line 1: | Line 1: | ||
=External= | =External= | ||
* GitHub https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis | * Binder Implementation GitHub https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis | ||
* AWS Kinesis Binder Reference Manual https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc | * AWS Kinesis Binder Reference Manual https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc | ||
* Example GitHub https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/kinesis-samples | |||
=Internal= | =Internal= | ||
* [[Spring_Cloud_Stream# | * [[Spring_Cloud_Stream#Binder|Spring Cloud Stream]] | ||
* [[Amazon Kinesis# | * [[Amazon_Kinesis_Streams#Amazon_Kinesis_Stream_Access_with_Spring_Data_Cloud|Amazon Kinesis Streams]] | ||
=Overview= | |||
=Dependencies= | |||
<syntaxhighlight lang='groovy'> | |||
dependencies { | |||
implementation('org.springframework.cloud:spring-cloud-stream-binder-kinesis:1.0.0.RELEASE') | |||
} | |||
</syntaxhighlight> | |||
=<span id='Playground_Example'></span>Playground Examples= | |||
{{External|[https://github.com/ovidiuf/playground/tree/master/spring/cloud/spring-cloud-stream/01-kinesis-producer Playground Kinesis Producer]}} | |||
{{External|[https://github.com/ovidiuf/playground/tree/master/spring/cloud/spring-cloud-stream/02-kinesis-consumer Playground Kinesis Consumer]}} | |||
=Message Propagation= | |||
:::[[Image:SpringCloudStreamAmazonKinesisBinder.png]] | |||
==Sending== | |||
The payload passed to the message as follows: | |||
<syntaxhighlight lang='java'> | |||
MessageChannel outputChannel = ...; | |||
String payload = "..." | |||
Message m = new GenericMessage<>(payload); | |||
outputChannel.send(m); | |||
</syntaxhighlight> | |||
is recovered at the other end as follows: | |||
<syntaxhighlight lang='java'> | |||
@StreamListener(InputChannelFactory.INPUT_CHANNEL_NAME) | |||
public void handle(Message<String> e) { | |||
String payload = e.getPayload(); | |||
} | |||
</syntaxhighlight> | |||
For unclear reasons, custom headers are not propagated. Probably configuration of "embeddedHeaders", somewhere; by default it only contains "correlationId", "sequenceSize", "sequenceNumber", "contentType", "originalContentType". | |||
As a side effect of simply creating the message, the "id" and "timestamp" headers are added by the [[Spring Messaging]] layer: | |||
"id" -> "4a96cc04-1116-94de-268b-d442b0e86683" | |||
"timestamp" -> "1545163131036" | |||
The "contentType" header is added by interceptor, in pre-send. | |||
As part of the sending sequence, the headers are (by default) "embedded" and serialized together with the payload. Other header modes are possible (see org.springframework.cloud.stream.binder.HeaderMode). See org.springframework.cloud.stream.binder.AbstractMessageChannelBinder serializeAndEmbedHeadersIfApplicable(). | |||
At the Kinesis layer (org.springframework.integration.aws.outbound.KinesisMessageHandler) a new PutRecordRequest instance is created. This is where the following headers are added: | |||
* "aws_stream" (AwsHeaders.STREAM) | |||
* "aws_partitionKey" (AwsHeaders.PARTITION_KEY) | |||
* "aws_sequenceNumber" (AwsHeaders.SEQUENCE_NUMBER) | |||
==Receiving== | |||
It seems that KinesisMessageDrivenChannelAdapter is polling (org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.processTask()). | |||
If there are "records", those records turned into messages "sent" to the @StreamListener. | |||
A "raw" record contains a sequence number, an approximate arrival timestamp, the partitioin key and the data. This layer adds the following headers: | |||
* "aws_receivedStream" (AwsHeaders.RECEIVED_STREAM) | |||
* "aws_shard" (AwsHeaders.SHARD) | |||
* "aws_receivedPartitionKey" (AwsHeaders.RECEIVED_PARTITION_KEY) | |||
* "aws_receivedSequenceNumber" (AwsHeaders.RECEIVED_SEQUENCE_NUMBER) | |||
=Consumer Group Support= | |||
Also see: {{Internal|Spring_Cloud_Stream#Consumer_Group|Spring Cloud Stream Consumer Groups}} | |||
=Configuration= | |||
{{External|[https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc Amazon Kinesis Binder Manual]}} | |||
Also see: | |||
{{Internal|Spring Cloud Stream#Configuration|Spring Cloud Stream Configuration}} | |||
{{Internal|Spring_Integration_Configuration|Spring Integration Configuration}} | |||
{{Warn|The Kinesis binder configuration namespace is shifted with one position to the right relative to "spring.cloud.stream". Configure carefully, otherwise you may run into hard-to-debug configuration errors.}} | |||
spring: | |||
cloud: | |||
stream: | |||
'''bindings''': | |||
<''channel-name''>: | |||
destination: "..." | |||
<font color='SandyBrown'>kinesis</font>: | |||
'''bindings:''' | |||
<''channel-name''>: | |||
consumer: | |||
shardIteratorType: TRIM_HORIZON | |||
==Stream Auto-Creation== | |||
1.0.0 cannot be configured to fail on inexistent stream, instead of creating it. See org.springframework.cloud.stream.binder.kinesis.provisioning.KinesisStreamProvisioner line 135. | |||
Also see: | |||
{{Internal|Spring Cloud Stream#Configuration|Spring Cloud Stream Configuration}} | |||
{{Internal|Spring_Integration_Configuration#Channel_Auto-Creation|Spring Integration Configuration}} | |||
==Consumer Configuration== | |||
===Shard Iterator=== | |||
The default value is LATEST, and it may sometimes miss records that were sent by a different channel in the same JVM. TRIM_HORIZON seems to work better. | |||
<syntaxhighlight lang='yaml'> | |||
spring: | |||
cloud: | |||
stream: | |||
bindings: | |||
my-channel: | |||
destination: "my-channel" | |||
kinesis: | |||
bindings: | |||
my-channel: | |||
consumer: | |||
shardIteratorType: TRIM_HORIZON | |||
</syntaxhighlight> | |||
See: {{Internal|Amazon_Kinesis#Shard_Iterator|Shard Iterator}} | |||
==Producer Configuration== | |||
===Header Handling=== | |||
By default., the Spring Cloud Stream AWS Kinesis Binders embeds headers (such as contentType). To suppress this, configure the producer's "headerMode" using Spring Cloud Stream configuration: | |||
{{Internal|Spring_Cloud_Stream#Header_Handling|Spring Cloud Stream Configuration - Producer Header Handling}} |
Latest revision as of 00:01, 20 December 2018
External
- Binder Implementation GitHub https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis
- AWS Kinesis Binder Reference Manual https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc
- Example GitHub https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/kinesis-samples
Internal
Overview
Dependencies
dependencies {
implementation('org.springframework.cloud:spring-cloud-stream-binder-kinesis:1.0.0.RELEASE')
}
Playground Examples
Message Propagation
Sending
The payload passed to the message as follows:
MessageChannel outputChannel = ...;
String payload = "..."
Message m = new GenericMessage<>(payload);
outputChannel.send(m);
is recovered at the other end as follows:
@StreamListener(InputChannelFactory.INPUT_CHANNEL_NAME)
public void handle(Message<String> e) {
String payload = e.getPayload();
}
For unclear reasons, custom headers are not propagated. Probably configuration of "embeddedHeaders", somewhere; by default it only contains "correlationId", "sequenceSize", "sequenceNumber", "contentType", "originalContentType".
As a side effect of simply creating the message, the "id" and "timestamp" headers are added by the Spring Messaging layer:
"id" -> "4a96cc04-1116-94de-268b-d442b0e86683" "timestamp" -> "1545163131036"
The "contentType" header is added by interceptor, in pre-send.
As part of the sending sequence, the headers are (by default) "embedded" and serialized together with the payload. Other header modes are possible (see org.springframework.cloud.stream.binder.HeaderMode). See org.springframework.cloud.stream.binder.AbstractMessageChannelBinder serializeAndEmbedHeadersIfApplicable().
At the Kinesis layer (org.springframework.integration.aws.outbound.KinesisMessageHandler) a new PutRecordRequest instance is created. This is where the following headers are added:
- "aws_stream" (AwsHeaders.STREAM)
- "aws_partitionKey" (AwsHeaders.PARTITION_KEY)
- "aws_sequenceNumber" (AwsHeaders.SEQUENCE_NUMBER)
Receiving
It seems that KinesisMessageDrivenChannelAdapter is polling (org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.processTask()).
If there are "records", those records turned into messages "sent" to the @StreamListener.
A "raw" record contains a sequence number, an approximate arrival timestamp, the partitioin key and the data. This layer adds the following headers:
- "aws_receivedStream" (AwsHeaders.RECEIVED_STREAM)
- "aws_shard" (AwsHeaders.SHARD)
- "aws_receivedPartitionKey" (AwsHeaders.RECEIVED_PARTITION_KEY)
- "aws_receivedSequenceNumber" (AwsHeaders.RECEIVED_SEQUENCE_NUMBER)
Consumer Group Support
Also see:
Configuration
Also see:
The Kinesis binder configuration namespace is shifted with one position to the right relative to "spring.cloud.stream". Configure carefully, otherwise you may run into hard-to-debug configuration errors.
spring: cloud: stream: bindings: <channel-name>: destination: "..." kinesis: bindings: <channel-name>: consumer: shardIteratorType: TRIM_HORIZON
Stream Auto-Creation
1.0.0 cannot be configured to fail on inexistent stream, instead of creating it. See org.springframework.cloud.stream.binder.kinesis.provisioning.KinesisStreamProvisioner line 135.
Also see:
Consumer Configuration
Shard Iterator
The default value is LATEST, and it may sometimes miss records that were sent by a different channel in the same JVM. TRIM_HORIZON seems to work better.
spring:
cloud:
stream:
bindings:
my-channel:
destination: "my-channel"
kinesis:
bindings:
my-channel:
consumer:
shardIteratorType: TRIM_HORIZON
See:
Producer Configuration
Header Handling
By default., the Spring Cloud Stream AWS Kinesis Binders embeds headers (such as contentType). To suppress this, configure the producer's "headerMode" using Spring Cloud Stream configuration: