Spring Cloud Stream: Difference between revisions

From NovaOrdis Knowledge Base
Jump to navigation Jump to search
 
(31 intermediate revisions by the same user not shown)
Line 10: Line 10:


* [[Spring Cloud#Projects|Spring Cloud]]
* [[Spring Cloud#Projects|Spring Cloud]]
* [[Spring Integration]]


=Overview=
=Overview=
Line 19: Line 20:
=Spring Cloud Stream and Spring Integration=
=Spring Cloud Stream and Spring Integration=


Spring Cloud Stream is built on the concepts and patterns defined by [[EIP|Enterprise Integration Patterns]] and relies on the [[Spring Integration]] implementation to provide connectivity to message brokers, so it supports the foundation, semantics, and configuration options that are already established by [[Spring Integration]]. <font color=darkgray>TO DO: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_spring_integration_support</font>
Spring Cloud Stream is built on the concepts and patterns defined by [[EIP|Enterprise Integration Patterns]] and relies on the [[Spring Integration]] implementation to provide connectivity to message brokers, so it supports the foundation, semantics, and configuration options that are already established by [[Spring Integration]].  
 
Also see: {{Internal|Spring Integration|Spring Integration}}
<font color=darkgray>TO DO: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_spring_integration_support</font>


=Concepts=
=Concepts=
Line 30: Line 34:


Available binders:
Available binders:
* [[Spring Cloud Stream RabbitMQ Binder|RabbitMQ Binder]]
 
* [[Spring Cloud Stream Kafka Binder|Kafka Binder]]
====Rabbit MQ====
* [[Spring Cloud Stream AWS Kinesis Binder|AWS Kinesis Binder]]
{{Internal|Spring Cloud Stream RabbitMQ Binder|RabbitMQ Binder}}
* [[Spring Cloud Stream Google Cloud PubSub Binder|Google Cloud PubSub Binder]]
 
* [[Spring Cloud Stream TestSupportBinder|TestSupportBinder]]
====Kafka====
{{Internal|Spring Cloud Stream Kafka Binder|Kafka Binder}}
 
====AWS Kinesis====
{{Internal|Spring Cloud Stream AWS Kinesis Binder|AWS Kinesis Binder}}
 
====Google Cloud PubSub====
 
{{Internal|Spring Cloud Stream Google Cloud PubSub Binder|Google Cloud PubSub Binder}}
 
====TestSupportBinder====
{{Internal|Spring Cloud Stream TestSupportBinder|TestSupportBinder}}


==Channel==
==Channel==
Line 49: Line 64:
   some-output-channel
   some-output-channel
     destination: "Red-Destination"
     destination: "Red-Destination"
    ...
</syntaxhighlight>
</syntaxhighlight>


===Channel Name===
===Channel Name===


The channel name is used by the configuration to specify properties associated with the channel, and thus [[Spring_Cloud_Stream#Channel_Configuration|configure the channel]]. [[@Input]], [[@Output]] and [[#@StreamListener|@StreamListener]] are parameterized with it.
The channel name is used by the configuration to specify properties associated with the channel, and thus [[Spring_Cloud_Stream#Channel_Configuration|configure the channel]]. [[@Input]], [[@Output]] and [[#@StreamListener|@StreamListener]] are parameterized with it. In the example above, "some-input-channel" and "some-output-channel" are channel names.


==Persistent Publish/Subscribe==
==Persistent Publish/Subscribe==
Line 61: Line 77:
==Message==
==Message==


The canonical data structure used by producers and consumers to communicate with [[#Binder|binders]], and thus other applications via external messaging systems.
The canonical data structure used by producers and consumers to communicate with [[#Binder|binders]], and thus other applications via external messaging systems. Also see: {{Internal|Spring_Messaging#Message|Spring Messaging - Message}}


==Destination==
==Destination==
Line 72: Line 88:


Consumer group subscriptions are durable: a binder implementation ensures that group subscriptions are persistent and that, once at least one subscription for a group has been created, the group receives messages, even if they are sent while all applications in the group are stopped.
Consumer group subscriptions are durable: a binder implementation ensures that group subscriptions are persistent and that, once at least one subscription for a group has been created, the group receives messages, even if they are sent while all applications in the group are stopped.
Various binder implementations come with their own consumer group support peculiarities:
* [[Spring_Cloud_Stream_AWS_Kinesis_Binder#Consumer_Group_Support|AWS Kinesis Binder Consumer Groups]]
<font color=darkgray>TODO: working example, semantics and operations, I need it to spread the load across equivalent microservices.</font>


==Consumer Types==
==Consumer Types==


Spring Cloud Stream has <span id='Message-Driven'></span>'''message-driven''' consumers, which are also known as asynchronous consumers and <span id='Polled'></span>'''polled''' consumers, which are also known as synchronous consumers. A synchronous consumer allows controlling the rate at which messages are consumed. <font color=darkgray>TODO: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#spring-cloud-streams-overview-using-polled-consumers</font>
Spring Cloud Stream has <span id='Message-Driven'></span>'''message-driven''' consumers, which are also known as asynchronous consumers and <span id='Polled'></span>'''polled''' consumers, which are also known as synchronous consumers. A synchronous consumer allows controlling the rate at which messages are consumed.  
 
<font color=darkgray>TODO: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#spring-cloud-streams-overview-using-polled-consumers</font>


==Partition==
==Partition==


Spring Cloud Stream offers the possibility to partition the physical communication medium provided by a broker topic into multiple partitions. Partitioning can be used whether the broker itself is naturally partitioned (Kafka) or not (RabbitMQ). Partitioning is a critical concept in stateful processing, where it is critical to ensure that all related data is processed together, for either performance or consistency reasons. To set up a partition, both the data-producing and the data-consuming ends must be configured similarly. <font color=darkgray>TODO https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#spring-cloud-stream-overview-partitioning</font>
Spring Cloud Stream offers the possibility to partition the physical communication medium provided by a broker topic into multiple partitions. Partitioning can be used whether the broker itself is naturally partitioned (Kafka) or not (RabbitMQ). Partitioning is a critical concept in stateful processing, where it is critical to ensure that all related data is processed together, for either performance or consistency reasons. To set up a partition, both the data-producing and the data-consuming ends must be configured similarly.  
 
<font color=darkgray>TODO https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#spring-cloud-stream-overview-partitioning</font>


==Content Type Negotiation==
==Content Type Negotiation==
Line 109: Line 134:
====@EnableBinding====
====@EnableBinding====


@EnableBinding triggers immediate connectivity to a message broker, provided that the corresponding libraries are in the [[#Binder_Detection|classpath]]. The annotation is itself meta-annotated with [[@Configuration]] and triggers the configuration of the Spring Cloud Stream infrastructure. The annotation must be configured with one or more [[#Channel_Factory_Interfaces|channel factory interfaces]] that declare [[Spring_Cloud_Stream#Input_Channel|input]] and [[Spring_Cloud_Stream#Output_Channel|output]] channels, which will be initialized to be used by the application and exposed as components in the Spring application context. They are typically message channels for channel-based binders such as RabbitMQ or Kafka, but they may represent other concepts for other messaging technologies. Aside from generating channels for each binding and registering them as Spring beans, for each bound interface, Spring Cloud Stream generates a bean that implements the interface itself. You can have access to the interfaces representing the individual channels:
[[@EnableBinding]] triggers immediate connectivity to a message broker, provided that the corresponding libraries are in the [[#Binder_Detection|classpath]]. It is common practice to create a separate component annotated with [[@EnableBinding]], for example "KinesisBinder". The annotation is itself meta-annotated with [[@Configuration]] and triggers the configuration of the Spring Cloud Stream infrastructure. The annotation must be configured with one or more [[#Channel_Factory_Interfaces|channel factory interfaces]] that declare [[Spring_Cloud_Stream#Input_Channel|input]] and [[Spring_Cloud_Stream#Output_Channel|output]] channels, which will be initialized to be used by the application and exposed as components in the Spring application context. They are typically message channels for channel-based binders such as RabbitMQ or Kafka, but they may represent other concepts for other messaging technologies, such as Kinesis streams. Aside from generating channels for each binding and registering them as Spring beans, for each bound interface, Spring Cloud Stream generates a bean that implements the interface itself. You can have access to the interfaces representing the individual channels:


<syntaxhighlight lang='java'>
<syntaxhighlight lang='java'>
Line 166: Line 191:


     public void sendData(String payload) {
     public void sendData(String payload) {
         KinesisEvent e = new KinesisEvent(payload);
 
         outputChannelFactory.outputChannel().send(new GenericMessage<>(e));
         Message m = new GenericMessage<>(payload);
         outputChannelFactory.outputChannel().send(m);
     }
     }
}
}
Line 176: Line 202:
====@StreamListener====
====@StreamListener====


{{External|[https://docs.spring.io/spring-cloud-stream/docs/current/api/org/springframework/cloud/stream/annotation/StreamListener.html @StreamListener javadoc]}}
To receive data, we could declare a different component and annotate one of its methods with [[@StreamListener]].  @StreamListener designates a  <span id='Message_Handler'></span>'''message handler''' method for event handling. The annotation is parameterized with the [[#Channel_Name|name]] of the channel we are supposed to receive messages on. The class containing the @StreamListener-annotated handler method should be registered with the Spring application context to be available for handling, so it must be annotated with [[@Component]] or similar.


To receive data, we could declare a different component and annotate one of its methods with @StreamListener.  @StreamListener designates a  <span id='Message_Handler'></span>'''message handler''' method for event handling.  The annotation is parameterized with the [[#Channel_Name|name]] of the channel we are supposed to receive messages on. The class containing the @StreamListener-annotated handler method should be registered with the Spring application context to be available for handling, so it must be annotated with [[@Component]] or similar.
Note that the corresponding channel factory class must be configured on the [[#.40EnableBinding-annotated_Binder|@EnableBinding-annotated binder]], otherwise the bean corresponding to the inbound channel won't be available and @StreamListener annotation will not work.


This annotation is an alternative to [[Spring_Cloud_Stream#Spring_Cloud_Stream_and_Spring_Integration|Spring Integration patterns]] and it is modeled after [[Spring Messaging]] annotations [[@MessageMapping]], [[@JmsListener]], [[@RabbitListener]], etc.The handler method is invoked after each received message. The method has a flexible signature, as described by [[@MessageMapping]]. In the most generic case, the handler method receives a [[Spring_Messaging#Message_Interface|Message]] instance that gives access to headers and payload. The payload is a byte[] that can be deserialized by the handler method.
This annotation is an alternative to [[Spring_Cloud_Stream#Spring_Cloud_Stream_and_Spring_Integration|Spring Integration patterns]] and it is modeled after [[Spring Messaging]] annotations [[@MessageMapping]], [[@JmsListener]], [[@RabbitListener]], etc.The handler method is invoked after each received message. The method has a flexible signature, as described by [[@MessageMapping]]. In the most generic case, the handler method receives a [[Spring_Messaging#Message_Interface|Message]] instance that gives access to headers and payload. The payload is a byte[] that can be deserialized by the handler method.
Line 219: Line 245:
<font color=darkgray>Content-based routing with @StreamListener: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_using_streamlistener_for_content_based_routing</font>.
<font color=darkgray>Content-based routing with @StreamListener: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_using_streamlistener_for_content_based_routing</font>.


====Playground Example====
====<span id='Playground_Example'></span>Playground Examples====


{{External|[https://github.com/ovidiuf/playground/tree/master/spring/cloud/spring-cloud-stream/spring-cloud-stream-kinesis Playground Spring Cloud Stream Kinesis Example]}}
{{External|[https://github.com/ovidiuf/playground/tree/master/spring/cloud/spring-cloud-stream/01-kinesis-producer Playground Spring Cloud Stream Kinesis Producer]}}
{{External|[https://github.com/ovidiuf/playground/tree/master/spring/cloud/spring-cloud-stream/02-kinesis-consumer Playground Spring Cloud Stream Kinesis Consumer]}}


==Binder Detection==
==Binder Detection==
Line 301: Line 328:


{{External|[https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_configuration_options Configuration Options - Spring Documentation]}}
{{External|[https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_configuration_options Configuration Options - Spring Documentation]}}
{{External|[https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/_configuration_options.html Configuration Options - Spring Documentation (2)]}}


==Channel Configuration==
==Channel Configuration==
Line 311: Line 339:


<syntaxhighlight lang='yaml'>
<syntaxhighlight lang='yaml'>
spring:
spring.cloud.stream.bindings:
  cloud:
  input-channel:
    stream:
    destination: test-kinesis-stream-1
      bindings:
    content-type: application/json
        input-channel:
    producer:
          destination: test-kinesis-stream-1
      partitionKeyExpression: "1"
          content-type: application/json
  output-channel:
          producer:
    destination: test-kinesis-stream-2
            partitionKeyExpression: "1"
    content-type: application/json
        output-channel:
          destination: test-kinesis-stream-2
          content-type: application/json
</syntaxhighlight>
</syntaxhighlight>


Line 329: Line 354:
* <span id='destination'></span>'''destination''' the name of the messaging broker destination (depending on dynamic binding at boot, the destination can be a Kafka topic, a RabbitMQ exchange, Kinesis stream, etc.) If not set, the [[#Channel_Name|channel name]] is used instead.  If the channel is bound as a consumer, it could be bound to multiple destinations, and the destination names can be specified as comma-separated String values. The default value of this property cannot be overridden.
* <span id='destination'></span>'''destination''' the name of the messaging broker destination (depending on dynamic binding at boot, the destination can be a Kafka topic, a RabbitMQ exchange, Kinesis stream, etc.) If not set, the [[#Channel_Name|channel name]] is used instead.  If the channel is bound as a consumer, it could be bound to multiple destinations, and the destination names can be specified as comma-separated String values. The default value of this property cannot be overridden.
* <span id='content-type'></span>'''content-type''' <font color=darkgray>What does this do, exactly?</font>
* <span id='content-type'></span>'''content-type''' <font color=darkgray>What does this do, exactly?</font>
===Producer Configuration===
====Header Handling====
The default behavior depends on the binder implementation. Some binders, such as [[Spring Cloud Stream AWS Kinesis Binder |AWS Kinesis Binder]], attempt to embed headers as "contentType". To suppress this behavior, use:
spring.cloud.stream.bindings:
  <''channel-name''>:
    destination: <''destination-name''>
    ...
    producer:
      '''headerMode''': '''none'''


==Binder Configuration==
==Binder Configuration==
Line 339: Line 377:
  org.springframework.cloud.stream.config.defaultCandidate
  org.springframework.cloud.stream.config.defaultCandidate
</syntaxhighlight>
</syntaxhighlight>
=Message Propagation=
* [[Spring_Cloud_Stream_AWS_Kinesis_Binder#Message_Propagation|Spring Cloud Stream AWS Kinesis Message Propagation]]


=Binder Visualization and Control, Health and Metrics=
=Binder Visualization and Control, Health and Metrics=


Spring Cloud Stream supports visualization and control of the Bindings through Actuator endpoints. <font color=darkgray>TODO https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_binding_visualization_and_control
Spring Cloud Stream supports visualization and control of the Bindings through Actuator endpoints.  
 
<font color=darkgray>TODO https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_binding_visualization_and_control


TODO https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_health_indicator
TODO https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_health_indicator

Latest revision as of 19:45, 3 May 2019

External

Internal

Overview

Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems. Spring applications use Spring Cloud Stream libraries to bind to messaging middleware, using a middleware-specific-neutral API, Spring Messaging. The application uses input and output channel abstractions injected into it by Spring Cloud Stream runtime. Channels are connected to brokers through middleware-specific binder implementations.

Spring Cloud Stream builds upon Spring Boot to create standalone production-grade applications. Spring Initializr lists Spring Cloud Stream under "Cloud Messaging"

Spring Cloud Stream and Spring Integration

Spring Cloud Stream is built on the concepts and patterns defined by Enterprise Integration Patterns and relies on the Spring Integration implementation to provide connectivity to message brokers, so it supports the foundation, semantics, and configuration options that are already established by Spring Integration.

Also see:

Spring Integration

TO DO: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_spring_integration_support

Concepts

SpringCloudStreamConcepts.png

Binder

The binder is the component providing integration with external messaging systems. Spring Cloud Stream automatically detects and uses a binder found on the classpath. For more details on binder detection see Binder Detection below. The actual destination, which can be a Kafka topic, a RabbitMQ exchange or Kinesis stream, it is selected at boot based on configuration read from application arguments, environment variables, application.yml or application.properties. The binder implementation is responsible for connectivity, message routing to and from producers and consumers, data type conversion, etc. While binders handle most of their responsibility in a transparent manner, they still require some minimal configuration. Spring documentation sometimes refer to the binder as "destination binder".

Available binders:

Rabbit MQ

RabbitMQ Binder

Kafka

Kafka Binder

AWS Kinesis

AWS Kinesis Binder

Google Cloud PubSub

Google Cloud PubSub Binder

TestSupportBinder

TestSupportBinder

Channel

An input channel is an object present in the memory of a Spring application whose implementation is transparently created by Spring Cloud Stream and injected int the application. The application uses input channel to receive messages. Similarly, an output channel is also an object created and injected by Spring Cloud Stream into the application. Application sends outbound messages via output channels. The Spring Cloud Stream API conveniently provides factory interfaces for producing input channels (Sink), output channels (Source) and both input and output channels (Processor). Once instantiated, the channels are available as beans, registered under their channel name. That is why is important to use unique names for channels within the same application, otherwise they'll collide and the application name won't be able to start. The channel attributes can be configured in application.yml as follows:

...
spring.cloud.stream.bindings:
  some-input-channel:
    destination: "Blue-Destination"
    content-type: "application/json"
    ...
  some-output-channel
    destination: "Red-Destination"
    ...

Channel Name

The channel name is used by the configuration to specify properties associated with the channel, and thus configure the channel. @Input, @Output and @StreamListener are parameterized with it. In the example above, "some-input-channel" and "some-output-channel" are channel names.

Persistent Publish/Subscribe

Communication between applications follows a publish-subscribe model, where data is broadcast through shared destinations (topics). Communicating through shared topics rather than point-to-point queues reduces coupling between microservices.

Message

The canonical data structure used by producers and consumers to communicate with binders, and thus other applications via external messaging systems. Also see:

Spring Messaging - Message

Destination

The destination can be a queue, topic, or others.

Consumer Group

The microservice model relies on the ability of an application to scale horizontally by increasing the number of "equivalent" components that are supposed to perform the same function and thus spread the load. However, when doing so, different instances of the service are placed in a competing consumer relationship, where only one of the instances is expected to handle a given message. Spring Cloud Stream models this behavior through the concept of a consumer group. Each consumer binding can use the spring.cloud.stream.bindings.<channelName>.group property to specify a group name. All groups that subscribe to a given destination receive a copy of published data, but only one member of each group receives a given message from that destination. By default, when a group is not specified, Spring Cloud Stream assigns the application to an anonymous and independent single-member consumer group that is in a publish-subscribe relationship with all other consumer groups. In general, it is preferable to always specify a consumer group when binding an application to a given destination. When scaling up a Spring Cloud Stream application, you must specify a consumer group for each of its input bindings. Doing so prevents the application’s instances from receiving duplicate message.

Consumer group subscriptions are durable: a binder implementation ensures that group subscriptions are persistent and that, once at least one subscription for a group has been created, the group receives messages, even if they are sent while all applications in the group are stopped.

Various binder implementations come with their own consumer group support peculiarities:

TODO: working example, semantics and operations, I need it to spread the load across equivalent microservices.

Consumer Types

Spring Cloud Stream has message-driven consumers, which are also known as asynchronous consumers and polled consumers, which are also known as synchronous consumers. A synchronous consumer allows controlling the rate at which messages are consumed.

TODO: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#spring-cloud-streams-overview-using-polled-consumers

Partition

Spring Cloud Stream offers the possibility to partition the physical communication medium provided by a broker topic into multiple partitions. Partitioning can be used whether the broker itself is naturally partitioned (Kafka) or not (RabbitMQ). Partitioning is a critical concept in stateful processing, where it is critical to ensure that all related data is processed together, for either performance or consistency reasons. To set up a partition, both the data-producing and the data-consuming ends must be configured similarly.

TODO https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#spring-cloud-stream-overview-partitioning

Content Type Negotiation

TODO https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#content-type-management

Schema Evolution Support

TODO https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#schema-evolution

Serialization

This mechanism is used while sending messages outside the current address space. Also see Deserialization below. The sequence of code that serializes the message is similar to:

Spring Messaging usage of Jackson to Serialize Outgoing Messages

The default format to send payload content out is byte[]. This is hardcoded in AbstractMessageConverter as serializedPayloadClass, which is byte[].

Deserialization

Also see Serialization above.

Programming Model

Writing Spring Cloud Stream Applications

The central element of a Spring Cloud Stream Applications is a binder implementation, typically a dedicated class annotated with @EnableBinding annotation.

@EnableBinding

@EnableBinding triggers immediate connectivity to a message broker, provided that the corresponding libraries are in the classpath. It is common practice to create a separate component annotated with @EnableBinding, for example "KinesisBinder". The annotation is itself meta-annotated with @Configuration and triggers the configuration of the Spring Cloud Stream infrastructure. The annotation must be configured with one or more channel factory interfaces that declare input and output channels, which will be initialized to be used by the application and exposed as components in the Spring application context. They are typically message channels for channel-based binders such as RabbitMQ or Kafka, but they may represent other concepts for other messaging technologies, such as Kinesis streams. Aside from generating channels for each binding and registering them as Spring beans, for each bound interface, Spring Cloud Stream generates a bean that implements the interface itself. You can have access to the interfaces representing the individual channels:

@Autowired private MessageChannel output;

or bindings:

@Autowired private Sink sink;

by auto-wiring either of them in your application.

@EnableBinding-annotated Binder

In the following example, we are building a Kinesis binder that bootstraps the Kinesis connection machinery, provided that the right dependencies are in the classpath, and exposes two channels - one input channel and one output channel to the application:

@EnableBinding({OutputChannelFactory.class, InputChannelFactory.class})
public class KinesisBinder {
}
public interface OutputChannelFactory {

  String OUTPUT_CHANNEL_NAME = "channel-to-kinesis";

  @Output(OUTPUT_CHANNEL_NAME)
  MessageChannel outputChannel();
}
public interface InputChannelFactory {

  String INPUT_CHANNEL_NAME = "channel-from-kinesis";

  @Input(INPUT_CHANNEL_NAME)
  SubscribableChannel inputChannel();
}

Mapping Channels to Destinations

Channels are mapped to destinations via configuration, see destination. If the destination does not exist on the broker, it is created, using the channel's name.

Send Events to Stream

If the application configuration correctly configures the "channel-to-kinesis" binding with the right destination, content type, etc. as shown below in the Configuration section, the classes declared above are sufficient to send events to the stream:

@Component
public class StreamEventProducer {

    @Autowired
    private OutputChannelFactory outputChannelFactory;

    public void sendData(String payload) {

        Message m = new GenericMessage<>(payload);
        outputChannelFactory.outputChannel().send(m);
    }
}

The Spring Could Stream layer performs the serialization necessary to send the message outside the current address space. For more details see:

Serialization

@StreamListener

To receive data, we could declare a different component and annotate one of its methods with @StreamListener. @StreamListener designates a message handler method for event handling. The annotation is parameterized with the name of the channel we are supposed to receive messages on. The class containing the @StreamListener-annotated handler method should be registered with the Spring application context to be available for handling, so it must be annotated with @Component or similar.

Note that the corresponding channel factory class must be configured on the @EnableBinding-annotated binder, otherwise the bean corresponding to the inbound channel won't be available and @StreamListener annotation will not work.

This annotation is an alternative to Spring Integration patterns and it is modeled after Spring Messaging annotations @MessageMapping, @JmsListener, @RabbitListener, etc.The handler method is invoked after each received message. The method has a flexible signature, as described by @MessageMapping. In the most generic case, the handler method receives a Message instance that gives access to headers and payload. The payload is a byte[] that can be deserialized by the handler method.

@Component
public class StreamEventConsumer {

    @StreamListener(InputChannelFactory.INPUT_CHANNEL_NAME)
    public void handle(Message m) {

        // ...
    }
}
GenericMessage [payload=byte[8], headers={
     aws_shard=shardId-000000000000, 
     id=fdee96a5-a036-ec82-026e-9a2cb80b9240, 
     contentType=application/json, 
     aws_receivedStream=settlements, 
     aws_receivedPartitionKey=1, 
     aws_receivedSequenceNumber=49589493687900371459841566447413171213458080873777725442, 
     timestamp=1540413946289}]

Alternatively, the conversion can be made automatically. How?

@Component
public class StreamEventConsumer {

    @StreamListener(InputChannelFactory.INPUT_CHANNEL_NAME)
    public void handle(KinesisEvent e) {

        // ...
    }
}

Content-based routing with @StreamListener: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_using_streamlistener_for_content_based_routing.

Playground Examples

Playground Spring Cloud Stream Kinesis Producer
Playground Spring Cloud Stream Kinesis Consumer

Binder Detection

Spring Cloud Stream automatically detects and uses a binder found on the classpath. More: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_binder_detection, multiple binders on the classpath: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#multiple-binders

Channel Factory Interfaces

Sink is an input channel factory. It uses the @Input annotation to indicate that an input binding target will be created by the framework:

public interface Sink {
  String INPUT = "input";

  @Input(Sink.INPUT)
  SubscribableChannel input();
}

Source is an output channel factory. It uses the @Output annotation to indicate that an output binding target will be created by the framework:

public interface Source {
  String OUTPUT = "output";

  @Output(Source.OUTPUT)
  MessageChannel output();
}

Processor is an factory for both input and output channels:

public interface Processor extends Source, Sink {
}

The @Input and @Output annotations can be used directly to mark input and output channel factory methods on a custom interface:

public interface MyCustomBidirectionalOutfit {

  String INPUT_CHANNEL_NAME = "input-channel";
  String OUTPUT_CHANNEL_NAME = "output-channel";

  @Input(INPUT_CHANNEL_NAME)
  SubscribableChannel inputChannel();

  @Output(OUTPUT_CHANNEL_NAME)
  MessageChannel outputChannel();
}

Sink, Source, Processor and custom interfaces annotated with @Input and @Output can be used to parameterize the @EnableBinding annotation.

Annotations

Error Handling

In presence of errors, Spring Cloud Stream attempts to retry message handling a number of times with Spring Retry RetryTemplate. Over a certain error threshold, the exceptions thrown by the message handlers are propagated back to the binder. The binder either forwards the error to the application, by invoking a custom error handler (application error handling), or to the messaging system (system error handling).

Application Error Handling

TODO: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_application_error_handling

System Error Handling

TODO https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_system_error_handling

Testing

TODO: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_testing

Configuration

Configuration Options - Spring Documentation
Configuration Options - Spring Documentation (2)

Channel Configuration

Common Binding Properties - Spring Documentation

Binding properties are supplied by using the format of spring.cloud.stream.bindings.<channel-name>.<property>=<value>. The <channel-name> represents the name of the channel specified in the @Input and @Output annotation configurations.

Example:

spring.cloud.stream.bindings:
  input-channel:
    destination: test-kinesis-stream-1
    content-type: application/json
    producer:
      partitionKeyExpression: "1"
  output-channel:
    destination: test-kinesis-stream-2
    content-type: application/json

Notable properties:

  • destination the name of the messaging broker destination (depending on dynamic binding at boot, the destination can be a Kafka topic, a RabbitMQ exchange, Kinesis stream, etc.) If not set, the channel name is used instead. If the channel is bound as a consumer, it could be bound to multiple destinations, and the destination names can be specified as comma-separated String values. The default value of this property cannot be overridden.
  • content-type What does this do, exactly?

Producer Configuration

Header Handling

The default behavior depends on the binder implementation. Some binders, such as AWS Kinesis Binder, attempt to embed headers as "contentType". To suppress this behavior, use:

spring.cloud.stream.bindings:
  <channel-name>:
    destination: <destination-name>
    ...
    producer:
      headerMode: none

Binder Configuration

Binder Configuration Properties - Spring Documentation
 org.springframework.cloud.stream.config.type
 org.springframework.cloud.stream.config.inheritEnvironment
 org.springframework.cloud.stream.config.environment
 org.springframework.cloud.stream.config.defaultCandidate

Message Propagation

Binder Visualization and Control, Health and Metrics

Spring Cloud Stream supports visualization and control of the Bindings through Actuator endpoints.

TODO https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_binding_visualization_and_control

TODO https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_health_indicator

TODO https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#spring-cloud-stream-overview-metrics-emitter

Reactive Programming Support

TODO: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#spring-cloud-stream-overview-reactive-programming-support