Spring Cloud Stream
External
- https://cloud.spring.io/spring-cloud-stream/
- https://cloud.spring.io/spring-cloud-stream-app-starters/
- Reference Spring Cloud Stream Core https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_spring_cloud_stream_core
- GitHub Cloud Stream Examples https://github.com/spring-cloud/spring-cloud-stream-samples
- JavaDoc https://docs.spring.io/spring-cloud-stream/docs/current/api/
Internal
Overview
Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems. Spring applications use Spring Cloud Stream libraries to bind to messaging middleware, using a middleware-specific-neutral API, Spring Messaging. The application uses input and output channel abstractions injected into it by Spring Cloud Stream runtime. Channels are connected to brokers through middleware-specific binder implementations.
Spring Cloud Stream builds upon Spring Boot to create standalone production-grade applications. Spring Initializr lists Spring Cloud Stream under "Cloud Messaging"
Spring Cloud Stream and Spring Integration
Spring Cloud Stream is built on the concepts and patterns defined by Enterprise Integration Patterns and relies on the Spring Integration implementation to provide connectivity to message brokers, so it supports the foundation, semantics, and configuration options that are already established by Spring Integration.
Also see:
Concepts
Binder
The binder is the component providing integration with external messaging systems. Spring Cloud Stream automatically detects and uses a binder found on the classpath. For more details on binder detection see Binder Detection below. The actual destination, which can be a Kafka topic, a RabbitMQ exchange or Kinesis stream, it is selected at boot based on configuration read from application arguments, environment variables, application.yml or application.properties. The binder implementation is responsible for connectivity, message routing to and from producers and consumers, data type conversion, etc. While binders handle most of their responsibility in a transparent manner, they still require some minimal configuration. Spring documentation sometimes refer to the binder as "destination binder".
Available binders:
Rabbit MQ
Kafka
AWS Kinesis
Google Cloud PubSub
TestSupportBinder
Channel
An input channel is an object present in the memory of a Spring application whose implementation is transparently created by Spring Cloud Stream and injected int the application. The application uses input channel to receive messages. Similarly, an output channel is also an object created and injected by Spring Cloud Stream into the application. Application sends outbound messages via output channels. The Spring Cloud Stream API conveniently provides factory interfaces for producing input channels (Sink), output channels (Source) and both input and output channels (Processor). Once instantiated, the channels are available as beans, registered under their channel name. That is why is important to use unique names for channels within the same application, otherwise they'll collide and the application name won't be able to start. The channel attributes can be configured in application.yml as follows:
...
spring.cloud.stream.bindings:
some-input-channel:
destination: "Blue-Destination"
content-type: "application/json"
...
some-output-channel
destination: "Red-Destination"
...
Channel Name
The channel name is used by the configuration to specify properties associated with the channel, and thus configure the channel. @Input, @Output and @StreamListener are parameterized with it. In the example above, "some-input-channel" and "some-output-channel" are channel names.
Persistent Publish/Subscribe
Communication between applications follows a publish-subscribe model, where data is broadcast through shared destinations (topics). Communicating through shared topics rather than point-to-point queues reduces coupling between microservices.
Message
The canonical data structure used by producers and consumers to communicate with binders, and thus other applications via external messaging systems. Also see:
Destination
The destination can be a queue, topic, or others.
Consumer Group
The microservice model relies on the ability of an application to scale horizontally by increasing the number of "equivalent" components that are supposed to perform the same function and thus spread the load. However, when doing so, different instances of the service are placed in a competing consumer relationship, where only one of the instances is expected to handle a given message. Spring Cloud Stream models this behavior through the concept of a consumer group. Each consumer binding can use the spring.cloud.stream.bindings.<channelName>.group property to specify a group name. All groups that subscribe to a given destination receive a copy of published data, but only one member of each group receives a given message from that destination. By default, when a group is not specified, Spring Cloud Stream assigns the application to an anonymous and independent single-member consumer group that is in a publish-subscribe relationship with all other consumer groups. In general, it is preferable to always specify a consumer group when binding an application to a given destination. When scaling up a Spring Cloud Stream application, you must specify a consumer group for each of its input bindings. Doing so prevents the application’s instances from receiving duplicate message.
Consumer group subscriptions are durable: a binder implementation ensures that group subscriptions are persistent and that, once at least one subscription for a group has been created, the group receives messages, even if they are sent while all applications in the group are stopped.
Various binder implementations come with their own consumer group support peculiarities:
TODO: working example, semantics and operations, I need it to spread the load across equivalent microservices.
Consumer Types
Spring Cloud Stream has message-driven consumers, which are also known as asynchronous consumers and polled consumers, which are also known as synchronous consumers. A synchronous consumer allows controlling the rate at which messages are consumed.
Partition
Spring Cloud Stream offers the possibility to partition the physical communication medium provided by a broker topic into multiple partitions. Partitioning can be used whether the broker itself is naturally partitioned (Kafka) or not (RabbitMQ). Partitioning is a critical concept in stateful processing, where it is critical to ensure that all related data is processed together, for either performance or consistency reasons. To set up a partition, both the data-producing and the data-consuming ends must be configured similarly.
Content Type Negotiation
Schema Evolution Support
Serialization
This mechanism is used while sending messages outside the current address space. Also see Deserialization below. The sequence of code that serializes the message is similar to:
The default format to send payload content out is byte[]. This is hardcoded in AbstractMessageConverter as serializedPayloadClass, which is byte[].
Deserialization
Also see Serialization above.
Programming Model
Writing Spring Cloud Stream Applications
The central element of a Spring Cloud Stream Applications is a binder implementation, typically a dedicated class annotated with @EnableBinding annotation.
@EnableBinding
@EnableBinding triggers immediate connectivity to a message broker, provided that the corresponding libraries are in the classpath. It is common practice to create a separate component annotated with @EnableBinding, for example "KinesisBinder". The annotation is itself meta-annotated with @Configuration and triggers the configuration of the Spring Cloud Stream infrastructure. The annotation must be configured with one or more channel factory interfaces that declare input and output channels, which will be initialized to be used by the application and exposed as components in the Spring application context. They are typically message channels for channel-based binders such as RabbitMQ or Kafka, but they may represent other concepts for other messaging technologies, such as Kinesis streams. Aside from generating channels for each binding and registering them as Spring beans, for each bound interface, Spring Cloud Stream generates a bean that implements the interface itself. You can have access to the interfaces representing the individual channels:
@Autowired private MessageChannel output;
or bindings:
@Autowired private Sink sink;
by auto-wiring either of them in your application.
@EnableBinding-annotated Binder
In the following example, we are building a Kinesis binder that bootstraps the Kinesis connection machinery, provided that the right dependencies are in the classpath, and exposes two channels - one input channel and one output channel to the application:
@EnableBinding({OutputChannelFactory.class, InputChannelFactory.class})
public class KinesisBinder {
}
public interface OutputChannelFactory {
String OUTPUT_CHANNEL_NAME = "channel-to-kinesis";
@Output(OUTPUT_CHANNEL_NAME)
MessageChannel outputChannel();
}
public interface InputChannelFactory {
String INPUT_CHANNEL_NAME = "channel-from-kinesis";
@Input(INPUT_CHANNEL_NAME)
SubscribableChannel inputChannel();
}
Mapping Channels to Destinations
Channels are mapped to destinations via configuration, see destination. If the destination does not exist on the broker, it is created, using the channel's name.
Send Events to Stream
If the application configuration correctly configures the "channel-to-kinesis" binding with the right destination, content type, etc. as shown below in the Configuration section, the classes declared above are sufficient to send events to the stream:
@Component
public class StreamEventProducer {
@Autowired
private OutputChannelFactory outputChannelFactory;
public void sendData(String payload) {
Message m = new GenericMessage<>(payload);
outputChannelFactory.outputChannel().send(m);
}
}
The Spring Could Stream layer performs the serialization necessary to send the message outside the current address space. For more details see:
@StreamListener
To receive data, we could declare a different component and annotate one of its methods with @StreamListener. @StreamListener designates a message handler method for event handling. The annotation is parameterized with the name of the channel we are supposed to receive messages on. The class containing the @StreamListener-annotated handler method should be registered with the Spring application context to be available for handling, so it must be annotated with @Component or similar.
Note that the corresponding channel factory class must be configured on the @EnableBinding-annotated binder, otherwise the bean corresponding to the inbound channel won't be available and @StreamListener annotation will not work.
This annotation is an alternative to Spring Integration patterns and it is modeled after Spring Messaging annotations @MessageMapping, @JmsListener, @RabbitListener, etc.The handler method is invoked after each received message. The method has a flexible signature, as described by @MessageMapping. In the most generic case, the handler method receives a Message instance that gives access to headers and payload. The payload is a byte[] that can be deserialized by the handler method.
@Component
public class StreamEventConsumer {
@StreamListener(InputChannelFactory.INPUT_CHANNEL_NAME)
public void handle(Message m) {
// ...
}
}
GenericMessage [payload=byte[8], headers={ aws_shard=shardId-000000000000, id=fdee96a5-a036-ec82-026e-9a2cb80b9240, contentType=application/json, aws_receivedStream=settlements, aws_receivedPartitionKey=1, aws_receivedSequenceNumber=49589493687900371459841566447413171213458080873777725442, timestamp=1540413946289}]
Alternatively, the conversion can be made automatically. How?
@Component
public class StreamEventConsumer {
@StreamListener(InputChannelFactory.INPUT_CHANNEL_NAME)
public void handle(KinesisEvent e) {
// ...
}
}
Content-based routing with @StreamListener: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_using_streamlistener_for_content_based_routing.
Playground Examples
Binder Detection
Spring Cloud Stream automatically detects and uses a binder found on the classpath. More: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_binder_detection, multiple binders on the classpath: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#multiple-binders
Channel Factory Interfaces
Sink is an input channel factory. It uses the @Input annotation to indicate that an input binding target will be created by the framework:
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();
}
Source is an output channel factory. It uses the @Output annotation to indicate that an output binding target will be created by the framework:
public interface Source {
String OUTPUT = "output";
@Output(Source.OUTPUT)
MessageChannel output();
}
Processor is an factory for both input and output channels:
public interface Processor extends Source, Sink {
}
The @Input and @Output annotations can be used directly to mark input and output channel factory methods on a custom interface:
public interface MyCustomBidirectionalOutfit {
String INPUT_CHANNEL_NAME = "input-channel";
String OUTPUT_CHANNEL_NAME = "output-channel";
@Input(INPUT_CHANNEL_NAME)
SubscribableChannel inputChannel();
@Output(OUTPUT_CHANNEL_NAME)
MessageChannel outputChannel();
}
Sink, Source, Processor and custom interfaces annotated with @Input and @Output can be used to parameterize the @EnableBinding annotation.
Annotations
Error Handling
In presence of errors, Spring Cloud Stream attempts to retry message handling a number of times with Spring Retry RetryTemplate. Over a certain error threshold, the exceptions thrown by the message handlers are propagated back to the binder. The binder either forwards the error to the application, by invoking a custom error handler (application error handling), or to the messaging system (system error handling).
Application Error Handling
System Error Handling
Testing
TODO: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_testing
Configuration
Channel Configuration
Binding properties are supplied by using the format of spring.cloud.stream.bindings.<channel-name>.<property>=<value>. The <channel-name> represents the name of the channel specified in the @Input and @Output annotation configurations.
Example:
spring.cloud.stream.bindings:
input-channel:
destination: test-kinesis-stream-1
content-type: application/json
producer:
partitionKeyExpression: "1"
output-channel:
destination: test-kinesis-stream-2
content-type: application/json
Notable properties:
- destination the name of the messaging broker destination (depending on dynamic binding at boot, the destination can be a Kafka topic, a RabbitMQ exchange, Kinesis stream, etc.) If not set, the channel name is used instead. If the channel is bound as a consumer, it could be bound to multiple destinations, and the destination names can be specified as comma-separated String values. The default value of this property cannot be overridden.
- content-type What does this do, exactly?
Producer Configuration
Header Handling
The default behavior depends on the binder implementation. Some binders, such as AWS Kinesis Binder, attempt to embed headers as "contentType". To suppress this behavior, use:
spring.cloud.stream.bindings: <channel-name>: destination: <destination-name> ... producer: headerMode: none
Binder Configuration
org.springframework.cloud.stream.config.type
org.springframework.cloud.stream.config.inheritEnvironment
org.springframework.cloud.stream.config.environment
org.springframework.cloud.stream.config.defaultCandidate
Message Propagation
Binder Visualization and Control, Health and Metrics
Spring Cloud Stream supports visualization and control of the Bindings through Actuator endpoints.