Spring Cloud Stream AWS Kinesis Binder: Difference between revisions

From NovaOrdis Knowledge Base
Jump to navigation Jump to search
 
(40 intermediate revisions by the same user not shown)
Line 10: Line 10:
* [[Amazon_Kinesis_Streams#Amazon_Kinesis_Stream_Access_with_Spring_Data_Cloud|Amazon Kinesis Streams]]
* [[Amazon_Kinesis_Streams#Amazon_Kinesis_Stream_Access_with_Spring_Data_Cloud|Amazon Kinesis Streams]]


=Playground Example=
=Overview=


{{External|https://github.com/ovidiuf/playground/tree/master/spring/cloud/spring-cloud-stream/spring-cloud-stream-kinesis}}
=Dependencies=
 
<syntaxhighlight lang='groovy'>
dependencies {
    implementation('org.springframework.cloud:spring-cloud-stream-binder-kinesis:1.0.0.RELEASE')
}
</syntaxhighlight>
 
=<span id='Playground_Example'></span>Playground Examples=
 
{{External|[https://github.com/ovidiuf/playground/tree/master/spring/cloud/spring-cloud-stream/01-kinesis-producer Playground Kinesis Producer]}}
{{External|[https://github.com/ovidiuf/playground/tree/master/spring/cloud/spring-cloud-stream/02-kinesis-consumer Playground Kinesis Consumer]}}
 
=Message Propagation=
 
:::[[Image:SpringCloudStreamAmazonKinesisBinder.png]]
 
==Sending==
 
The payload passed to the message as follows:
 
<syntaxhighlight lang='java'>
MessageChannel outputChannel = ...;
 
String payload = "..."
Message m = new GenericMessage<>(payload);
outputChannel.send(m);
</syntaxhighlight>
 
is recovered at the other end as follows:
 
<syntaxhighlight lang='java'>
@StreamListener(InputChannelFactory.INPUT_CHANNEL_NAME)
public void handle(Message<String> e) {
  String payload = e.getPayload();
}
</syntaxhighlight>
 
For unclear reasons, custom headers are not propagated. Probably configuration of "embeddedHeaders", somewhere; by default it only contains "correlationId", "sequenceSize", "sequenceNumber", "contentType", "originalContentType".
 
As a side effect of simply creating the message, the "id" and "timestamp" headers are added by the [[Spring Messaging]] layer:
 
"id" -> "4a96cc04-1116-94de-268b-d442b0e86683"
"timestamp" -> "1545163131036"
 
The "contentType" header is added by interceptor, in pre-send.
 
As part of the sending sequence, the headers are (by default) "embedded" and serialized together with the payload. Other header modes are possible (see org.springframework.cloud.stream.binder.HeaderMode). See org.springframework.cloud.stream.binder.AbstractMessageChannelBinder serializeAndEmbedHeadersIfApplicable().
 
At the Kinesis layer (org.springframework.integration.aws.outbound.KinesisMessageHandler) a new PutRecordRequest instance is created. This is where the following headers are added:
* "aws_stream" (AwsHeaders.STREAM)
* "aws_partitionKey" (AwsHeaders.PARTITION_KEY)
* "aws_sequenceNumber" (AwsHeaders.SEQUENCE_NUMBER)
 
==Receiving==
 
It seems that KinesisMessageDrivenChannelAdapter is polling (org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.processTask()).
 
If there are "records", those records turned into messages "sent" to the @StreamListener.
 
A "raw" record contains a sequence number, an approximate arrival timestamp, the partitioin key and the data. This layer adds the following headers:
* "aws_receivedStream" (AwsHeaders.RECEIVED_STREAM)
* "aws_shard" (AwsHeaders.SHARD)
* "aws_receivedPartitionKey" (AwsHeaders.RECEIVED_PARTITION_KEY)
* "aws_receivedSequenceNumber" (AwsHeaders.RECEIVED_SEQUENCE_NUMBER)
 
=Consumer Group Support=
 
Also see: {{Internal|Spring_Cloud_Stream#Consumer_Group|Spring Cloud Stream Consumer Groups}}
 
=Configuration=
 
{{External|[https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc Amazon Kinesis Binder Manual]}}
 
Also see:
{{Internal|Spring Cloud Stream#Configuration|Spring Cloud Stream Configuration}}
{{Internal|Spring_Integration_Configuration|Spring Integration Configuration}}
 
 
{{Warn|The Kinesis binder configuration namespace is shifted with one position to the right relative to "spring.cloud.stream". Configure carefully, otherwise you may run into hard-to-debug configuration errors.}}
 
spring:
  cloud:
    stream:
      '''bindings''':
        <''channel-name''>:
          destination: "..."
      <font color='SandyBrown'>kinesis</font>:
        '''bindings:'''
          <''channel-name''>:
            consumer:
              shardIteratorType: TRIM_HORIZON
 
==Stream Auto-Creation==
 
1.0.0 cannot be configured to fail on inexistent stream, instead of creating it. See org.springframework.cloud.stream.binder.kinesis.provisioning.KinesisStreamProvisioner line 135.
 
Also see:
{{Internal|Spring Cloud Stream#Configuration|Spring Cloud Stream Configuration}}
{{Internal|Spring_Integration_Configuration#Channel_Auto-Creation|Spring Integration Configuration}}
 
==Consumer Configuration==
 
===Shard Iterator===
 
The default value is LATEST, and it may sometimes miss records that were sent by a different channel in the same JVM. TRIM_HORIZON seems to work better.
 
<syntaxhighlight lang='yaml'>
spring:
  cloud:
    stream:
      bindings:
        my-channel:
          destination: "my-channel"
      kinesis:
        bindings:
          my-channel:
            consumer:
              shardIteratorType: TRIM_HORIZON
</syntaxhighlight>
 
See: {{Internal|Amazon_Kinesis#Shard_Iterator|Shard Iterator}}
 
==Producer Configuration==
 
===Header Handling===
 
By default., the Spring Cloud Stream AWS Kinesis Binders embeds headers (such as contentType). To suppress this, configure the producer's "headerMode" using Spring Cloud Stream configuration:
 
{{Internal|Spring_Cloud_Stream#Header_Handling|Spring Cloud Stream Configuration - Producer Header Handling}}

Latest revision as of 00:01, 20 December 2018

External

Internal

Overview

Dependencies

dependencies {
    implementation('org.springframework.cloud:spring-cloud-stream-binder-kinesis:1.0.0.RELEASE')
}

Playground Examples

Playground Kinesis Producer
Playground Kinesis Consumer

Message Propagation

SpringCloudStreamAmazonKinesisBinder.png

Sending

The payload passed to the message as follows:

MessageChannel outputChannel = ...;

String payload = "..."
Message m = new GenericMessage<>(payload);
outputChannel.send(m);

is recovered at the other end as follows:

@StreamListener(InputChannelFactory.INPUT_CHANNEL_NAME)
public void handle(Message<String> e) {
  String payload = e.getPayload();
}

For unclear reasons, custom headers are not propagated. Probably configuration of "embeddedHeaders", somewhere; by default it only contains "correlationId", "sequenceSize", "sequenceNumber", "contentType", "originalContentType".

As a side effect of simply creating the message, the "id" and "timestamp" headers are added by the Spring Messaging layer:

"id" -> "4a96cc04-1116-94de-268b-d442b0e86683"
"timestamp" -> "1545163131036"

The "contentType" header is added by interceptor, in pre-send.

As part of the sending sequence, the headers are (by default) "embedded" and serialized together with the payload. Other header modes are possible (see org.springframework.cloud.stream.binder.HeaderMode). See org.springframework.cloud.stream.binder.AbstractMessageChannelBinder serializeAndEmbedHeadersIfApplicable().

At the Kinesis layer (org.springframework.integration.aws.outbound.KinesisMessageHandler) a new PutRecordRequest instance is created. This is where the following headers are added:

  • "aws_stream" (AwsHeaders.STREAM)
  • "aws_partitionKey" (AwsHeaders.PARTITION_KEY)
  • "aws_sequenceNumber" (AwsHeaders.SEQUENCE_NUMBER)

Receiving

It seems that KinesisMessageDrivenChannelAdapter is polling (org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.processTask()).

If there are "records", those records turned into messages "sent" to the @StreamListener.

A "raw" record contains a sequence number, an approximate arrival timestamp, the partitioin key and the data. This layer adds the following headers:

  • "aws_receivedStream" (AwsHeaders.RECEIVED_STREAM)
  • "aws_shard" (AwsHeaders.SHARD)
  • "aws_receivedPartitionKey" (AwsHeaders.RECEIVED_PARTITION_KEY)
  • "aws_receivedSequenceNumber" (AwsHeaders.RECEIVED_SEQUENCE_NUMBER)

Consumer Group Support

Also see:

Spring Cloud Stream Consumer Groups

Configuration

Amazon Kinesis Binder Manual

Also see:

Spring Cloud Stream Configuration
Spring Integration Configuration



The Kinesis binder configuration namespace is shifted with one position to the right relative to "spring.cloud.stream". Configure carefully, otherwise you may run into hard-to-debug configuration errors.

spring:
  cloud:
    stream:
      bindings:
       <channel-name>:
         destination: "..."
      kinesis:
        bindings:
         <channel-name>:
           consumer:
             shardIteratorType: TRIM_HORIZON

Stream Auto-Creation

1.0.0 cannot be configured to fail on inexistent stream, instead of creating it. See org.springframework.cloud.stream.binder.kinesis.provisioning.KinesisStreamProvisioner line 135.

Also see:

Spring Cloud Stream Configuration
Spring Integration Configuration

Consumer Configuration

Shard Iterator

The default value is LATEST, and it may sometimes miss records that were sent by a different channel in the same JVM. TRIM_HORIZON seems to work better.

spring:
  cloud:
    stream:
      bindings:
        my-channel:
          destination: "my-channel"
      kinesis:
         bindings:
           my-channel:
             consumer:
               shardIteratorType: TRIM_HORIZON

See:

Shard Iterator

Producer Configuration

Header Handling

By default., the Spring Cloud Stream AWS Kinesis Binders embeds headers (such as contentType). To suppress this, configure the producer's "headerMode" using Spring Cloud Stream configuration:

Spring Cloud Stream Configuration - Producer Header Handling