Spring Cloud Stream

From NovaOrdis Knowledge Base
Jump to navigation Jump to search

External

Internal

Overview

Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems. Spring applications use Spring Cloud Stream libraries to bind to messaging middleware, using a middleware-specific-neutral API. The application uses input and output channel channel abstractions injected into it by Spring Cloud Stream runtime. Channels are connected to brokers through middleware-specific binder implementations.

Spring Cloud Stream builds upon Spring Boot to create standalone production-grade applications. Spring Initializr lists Spring Cloud Stream under "Cloud Messaging"

Spring Cloud Stream and Spring Integration

Spring Cloud Stream is built on the concepts and patterns defined by Enterprise Integration Patterns and relies on the Spring Integration implementation to provide connectivity to message brokers, so it supports the foundation, semantics, and configuration options that are already established by Spring Integration. TO DO: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_spring_integration_support

Concepts

SpringCloudStreamConcepts.png

Binder

The binder is the component providing integration with external messaging systems. Spring Cloud Stream automatically detects and uses a binder found on the classpath. For more details on binder detection see Binder Detection below. The actual destination, which can be a Kafka topic or a RabbitMQ exchange, it is selected at boot based on configuration read from application arguments, environment variables, application.yml or application.properties. The binder implementation is responsible for connectivity, message routing to and from producers and consumers, data type conversion, etc. While binders handle most of their responsibility in a transparent manner, they still require some minimal configuration. Spring documentation sometimes refer to the binder as "destination binder".

Available binders:

Channel

An input channel is an object present in the memory of a Spring application whose implementation is transparently created by Spring Cloud Stream and injected int the application. The application uses input channel to receive messages. Similarly, an output channel is also an object created and injected by Spring Cloud Stream into the application. Application sends messages via output channels. The Spring Cloud Stream API conveniently provides factory interfaces for producing input channels (Sink), output channels (Source) and both input and output channels (Processor). Once instantiated, the channels are available as beans, registered under their channel name. That is why is important to use unique names for channels within the same application, otherwise they'll collide and the application name won't be able to start.

Channel Name

The channel name is used by the configuration to specify properties associated with the channel, and thus configure the channel. @Input, @Output and @StreamListener are parameterized with it.

Persistent Publish/Subscribe

Communication between applications follows a publish-subscribe model, where data is broadcast through shared destinations (topics). Communicating through shared topics rather than point-to-point queues reduces coupling between microservices.

Message

The canonical data structure used by producers and consumers to communicate with binders, and thus other applications via external messaging systems.

Destination

The destination can be a queue, topic, or others.

Consumer Group

The microservice model relies on the ability of an application to scale horizontally by increasing the number of "equivalent" components that are supposed to perform the same function and thus spread the load. However, when doing so, different instances of the service are placed in a competing consumer relationship, where only one of the instances is expected to handle a given message. Spring Cloud Stream models this behavior through the concept of a consumer group. Each consumer binding can use the spring.cloud.stream.bindings.<channelName>.group property to specify a group name. All groups that subscribe to a given destination receive a copy of published data, but only one member of each group receives a given message from that destination. By default, when a group is not specified, Spring Cloud Stream assigns the application to an anonymous and independent single-member consumer group that is in a publish-subscribe relationship with all other consumer groups. In general, it is preferable to always specify a consumer group when binding an application to a given destination. When scaling up a Spring Cloud Stream application, you must specify a consumer group for each of its input bindings. Doing so prevents the application’s instances from receiving duplicate message.

Consumer group subscriptions are durable: a binder implementation ensures that group subscriptions are persistent and that, once at least one subscription for a group has been created, the group receives messages, even if they are sent while all applications in the group are stopped.

Consumer Types

Spring Cloud Stream has message-driven consumers, which are also known as asynchronous consumers and polled consumers, which are also known as synchronous consumers. A synchronous consumer allows controlling the rate at which messages are consumed. TODO: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#spring-cloud-streams-overview-using-polled-consumers

Partition

Spring Cloud Stream offers the possibility to partition the physical communication medium provided by a broker topic into multiple partitions. Partitioning can be used whether the broker itself is naturally partitioned (Kafka) or not (RabbitMQ). Partitioning is a critical concept in stateful processing, where it is critical to ensure that all related data is processed together, for either performance or consistency reasons. To set up a partition, both the data-producing and the data-consuming ends must be configured similarly. TODO https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#spring-cloud-stream-overview-partitioning

Content Type Negotiation

TODO https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#content-type-management

Schema Evolution Support

TODO https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#schema-evolution

Programming Model

Writing Spring Cloud Stream Applications

The central element of a Spring Cloud Stream Applications is a binder implementation, typically a dedicated class annotated with @EnableBinding annotation.

@EnableBinding

@EnableBinding triggers immediate connectivity to a message broker, provided that the corresponding libraries are in the classpath. The annotation is itself meta-annotated with @Configuration and triggers the configuration of the Spring Cloud Stream infrastructure. The annotation must be configured with one or more channel factory interfaces that declare input and output channels, which will be initialized to be used by the application and exposed as components in the Spring application context. They are typically message channels for channel-based binders such as RabbitMQ or Kafka, but they may represent other concepts for other messaging technologies. Aside from generating channels for each binding and registering them as Spring beans, for each bound interface, Spring Cloud Stream generates a bean that implements the interface itself. You can have access to the interfaces representing the individual channels:

@Autowired private MessageChannel output;

or bindings:

@Autowired private Sink sink;

by auto-wiring either of them in your application.

@EnableBinding-annotated Binder

In the following example, we are building a Kinesis binder that bootstraps the Kinesis connection machinery, provided that the right dependencies are in the classpath, and exposes two channels - one input channel and one output channel to the application:

@EnableBinding({OutputChannelFactory.class, InputChannelFactory.class})
public class KinesisBinder {
}
public interface OutputChannelFactory {

  String OUTPUT_CHANNEL_NAME = "channel-to-kinesis";

  @Output(OUTPUT_CHANNEL_NAME)
  MessageChannel outputChannel();
}
public interface InputChannelFactory {

  String INPUT_CHANNEL_NAME = "channel-from-kinesis";

  @Input(INPUT_CHANNEL_NAME)
  SubscribableChannel inputChannel();
}

Mapping Channels to Destinations

Channels are mapped to destinations via configuration, see destination. If the destination does not exist on the broker, it is created, using the channel's name.

Send Events to Stream

If the application configuration correctly configures the "channel-to-kinesis" binding with the right destination, content type, etc. as shown below in the Configuration section, the classes declared above are sufficient to send events to the stream:

@Component
public class StreamEventProducer {

    @Autowired
    private OutputChannelFactory outputChannelFactory;

    public void sendData(String payload) {
        KinesisEvent e = new KinesisEvent(payload);
        outputChannelFactory.outputChannel().send(new GenericMessage<>(e));
    }
}

@StreamListener

@StreamListener javadoc

To receive data, we could declare a different component and annotate one of its methods with @StreamListener. @StreamListener designates a message handler method for event handling. The annotation is parameterized with the name of the channel we are supposed to receive messages on. The class containing the @StreamListener-annotated handler method should be registered with the Spring application context to be available for handling, so it must be annotated with @Component or similar.

This annotation is an alternative to Spring Integration patterns and it is modeled after Spring Messaging annotations @MessageMapping, @JmsListener, @RabbitListener, etc.The handler method is invoked after each received message. The method has a flexible signature, as described by @MessageMapping. In the most generic case, the handler method receives a Message instance that gives access to headers and payload. The payload is a byte[] that can be deserialized by the handler method.

@Component
public class StreamEventConsumer {

    @StreamListener(InputChannelFactory.INPUT_CHANNEL_NAME)
    public void handle(Message m) {

        // ...
    }
}
GenericMessage [payload=byte[8], headers={
     aws_shard=shardId-000000000000, 
     id=fdee96a5-a036-ec82-026e-9a2cb80b9240, 
     contentType=application/json, 
     aws_receivedStream=settlements, 
     aws_receivedPartitionKey=1, 
     aws_receivedSequenceNumber=49589493687900371459841566447413171213458080873777725442, 
     timestamp=1540413946289}]

Alternatively, the conversion can be made automatically. How?

@Component
public class StreamEventConsumer {

    @StreamListener(InputChannelFactory.INPUT_CHANNEL_NAME)
    public void handle(KinesisEvent e) {

        // ...
    }
}

Content-based routing with @StreamListener: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_using_streamlistener_for_content_based_routing.

Playground Example

Playground Spring Cloud Stream Kinesis Example

Binder Detection

Spring Cloud Stream automatically detects and uses a binder found on the classpath. More: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_binder_detection, multiple binders on the classpath: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#multiple-binders

Channel Factory Interfaces

Sink is an input channel factory. It uses the @Input annotation to indicate that an input binding target will be created by the framework:

public interface Sink {
  String INPUT = "input";

  @Input(Sink.INPUT)
  SubscribableChannel input();
}

Source is an output channel factory. It uses the @Output annotation to indicate that an output binding target will be created by the framework:

public interface Source {
  String OUTPUT = "output";

  @Output(Source.OUTPUT)
  MessageChannel output();
}

Processor is an factory for both input and output channels:

public interface Processor extends Source, Sink {
}

The @Input and @Output annotations can be used directly to mark input and output channel factory methods on a custom interface:

public interface MyCustomBidirectionalOutfit {

  String INPUT_CHANNEL_NAME = "input-channel";
  String OUTPUT_CHANNEL_NAME = "output-channel";

  @Input(INPUT_CHANNEL_NAME)
  SubscribableChannel inputChannel();

  @Output(OUTPUT_CHANNEL_NAME)
  MessageChannel outputChannel();
}

Sink, Source, Processor and custom interfaces annotated with @Input and @Output can be used to parameterize the @EnableBinding annotation.

Annotations

Error Handling

In presence of errors, Spring Cloud Stream attempts to retry message handling a number of times with Spring Retry RetryTemplate. Over a certain error threshold, the exceptions thrown by the message handlers are propagated back to the binder. The binder either forwards the error to the application, by invoking a custom error handler (application error handling), or to the messaging system (system error handling).

Application Error Handling

TODO: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_application_error_handling

System Error Handling

TODO https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_system_error_handling

Testing

TODO: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_testing

Configuration

Configuration Options - Spring Documentation

Channel Configuration

Common Binding Properties - Spring Documentation

Binding properties are supplied by using the format of spring.cloud.stream.bindings.<channel-name>.<property>=<value>. The <channel-name> represents the name of the channel specified in the @Input and @Output annotation configurations.

Example:

spring:
  cloud:
    stream:
      bindings:
        input-channel:
          destination: test-kinesis-stream-1
          content-type: application/json
          producer:
            partitionKeyExpression: "1"
        output-channel:
          destination: test-kinesis-stream-2
          content-type: application/json

Notable properties:

  • destination the name of the messaging broker destination (depending on dynamic binding at boot, the destination can be a Kafka topic, a RabbitMQ exchange, Kinesis stream, etc.) If not set, the channel name is used instead. If the channel is bound as a consumer, it could be bound to multiple destinations, and the destination names can be specified as comma-separated String values. The default value of this property cannot be overridden.
  • content-type What does this do, exactly?

Binder Configuration

Binder Configuration Properties - Spring Documentation
 org.springframework.cloud.stream.config.type
 org.springframework.cloud.stream.config.inheritEnvironment
 org.springframework.cloud.stream.config.environment
 org.springframework.cloud.stream.config.defaultCandidate

Binder Visualization and Control, Health and Metrics

Spring Cloud Stream supports visualization and control of the Bindings through Actuator endpoints. TODO https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_binding_visualization_and_control

TODO https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_health_indicator

TODO https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#spring-cloud-stream-overview-metrics-emitter

Reactive Programming Support

TODO: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#spring-cloud-stream-overview-reactive-programming-support