Spring Cloud Stream

From NovaOrdis Knowledge Base
Jump to navigation Jump to search

External

Internal

Overview

Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems. Spring applications use Spring Cloud Stream libraries to bind to messaging middleware, using a middleware-specific-neutral API. The application uses input and output channel channel abstractions injected into it by Spring Cloud Stream runtime. Channels are connected to brokers through middleware-specific binder implementations.

Spring Cloud Stream builds upon Spring Boot to create standalone production-grade applications. Spring Initializr lists Spring Cloud Stream under "Cloud Messaging"

Spring Cloud Stream and Spring Integration

Spring Cloud Stream is built on the concepts and patterns defined by Enterprise Integration Patterns and relies on the Spring Integration implementation to provide connectivity to message brokers, so it supports the foundation, semantics, and configuration options that are already established by Spring Integration. TO DO: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_spring_integration_support

Concepts

SpringCloudStreamConcepts.png

Binder

The binder is the component providing integration with external messaging systems. Spring Cloud Stream automatically detects and uses a binder found on the classpath. For more details on binder detection see Binder Detection below. The actual destination, which can be a Kafka topic or a RabbitMQ exchange, it is selected at boot based on configuration read from application arguments, environment variables, application.yml or application.properties. The binder implementation is responsible for connectivity, message routing to and from producers and consumers, data type conversion, etc. While binders handle most of their responsibility in a transparent manner, they still require some minimal configuration. Spring documentation sometimes refer to the binder as "destination binder".

Available binders:

Channel

An input channel is an object present in the memory of a Spring application whose implementation is transparently created by Spring Cloud Stream and injected int the application. The application uses input channel to receive messages. Similarly, an output channel is also an object created and injected by Spring Cloud Stream into the application. Application sends messages via output channels. The Spring Cloud Stream API conveniently provides factory interfaces for producing input channels (Sink), output channels (Source) and both input and output channels (Processor).

Persistent Publish/Subscribe

Communication between applications follows a publish-subscribe model, where data is broadcast through shared destinations (topics). Communicating through shared topics rather than point-to-point queues reduces coupling between microservices.

Consumer Group

The microservice model relies on the ability of an application to scale horizontally by increasing the number of "equivalent" components that are supposed to perform the same function and thus spread the load. However, when doing so, different instances of the service are placed in a competing consumer relationship, where only one of the instances is expected to handle a given message. Spring Cloud Stream models this behavior through the concept of a consumer group. Each consumer binding can use the spring.cloud.stream.bindings.<channelName>.group property to specify a group name. All groups that subscribe to a given destination receive a copy of published data, but only one member of each group receives a given message from that destination. By default, when a group is not specified, Spring Cloud Stream assigns the application to an anonymous and independent single-member consumer group that is in a publish-subscribe relationship with all other consumer groups. In general, it is preferable to always specify a consumer group when binding an application to a given destination. When scaling up a Spring Cloud Stream application, you must specify a consumer group for each of its input bindings. Doing so prevents the application’s instances from receiving duplicate message.

Consumer group subscriptions are durable: a binder implementation ensures that group subscriptions are persistent and that, once at least one subscription for a group has been created, the group receives messages, even if they are sent while all applications in the group are stopped.

Consumer Types

Spring Cloud Stream has message-driven consumers, which are also known as asynchronous consumers and polled consumers, which are also known as synchronous consumers. A synchronous consumer allows controlling the rate at which messages are consumed. TODO: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#spring-cloud-streams-overview-using-polled-consumers

Partition

Spring Cloud Stream offers the possibility to partition the physical communication medium provided by a broker topic into multiple partitions. Partitioning can be used whether the broker itself is naturally partitioned (Kafka) or not (RabbitMQ). Partitioning is a critical concept in stateful processing, where it is critical to ensure that all related data is processed together, for either performance or consistency reasons. To set up a partition, both the data-producing and the data-consuming ends must be configured similarly. TODO https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#spring-cloud-stream-overview-partitioning

Content Type Negotiation

TODO https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#content-type-management

Schema Evolution Support

TODO https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#schema-evolution

Programming Model

Writing Spring Cloud Stream Applications

The central element of a Spring Cloud Stream Applications is a binder implementation, typically a dedicated class annotated with @EnableBinding annotation:

@EnableBinding triggers immediate connectivity to a message broker, provided that the corresponding libraries are in the classpath. The annotation is itself meta-annotated with @Configuration and triggers the configuration of the Spring Cloud Stream infrastructure. The annotation must be configured with one or more channel factory interfaces that declare input and output channels, which will be initialized to be used by the application and exposed as components in the Spring application context. They are typically message channels for channel-based binders such as RabbitMQ or Kafka, but they may represent other concepts for other messaging technologies. Aside from generating channels for each binding and registering them as Spring beans, for each bound interface, Spring Cloud Stream generates a bean that implements the interface itself. You can have access to the interfaces representing the individual channels:

@Autowired private MessageChannel output;

or bindings:

@Autowired private Sink sink;

by auto-wiring either of them in your application.

In the following example, we are building a Kinesis binder that bootstraps the Kinesis connection machinery, provided that the right dependencies are in the classpath, and exposes two channels - one input channel and one output channel to the application:

@EnableBinding({KinesisOutputChannelFactory.class, KinesisInputChannelFactory.class})
public class KinesisBinder {
}
public interface KinesisOutputChannelFactory {

  String OUTPUT_CHANNEL_BEAN_NAME = "channel-to-kinesis";

  @Output(OUTPUT_CHANNEL_BEAN_NAME)
  MessageChannel outputChannel();
}
public interface KinesisInputChannelFactory {

  String INPUT_CHANNEL_BEAN_NAME = "channel-from-kinesis";

  @Input(INPUT_CHANNEL_BEAN_NAME)
  SubscribableChannel inputChannel();
}

Provided that the right dependencies are in the classpath, and the application configuration correctly configures the "channel-to-kinesis" binding with the right destination, content type, etc. as shown below in the Configuration section, the classes declared above are sufficient to send events to the stream:

@Component
public class StreamEventProducer {

    @Autowired
    private KinesisOutputChannelFactory outputChannelFactory;

    public void sendData(String payload) {
        KinesisEvent e = new KinesisEvent(payload);
        outputChannelFactory.outputChannel().send(new GenericMessage<>(e));
    }
}

To receive data, we need to declare a different component and annotate one of its methods with @StreamListener:

   @StreamListener(GenericProcessor.INPUT)
   public void processData(GenericData data) {
       System.out.println(">>>>>>>>>>>> " + data.getText());
   }

Binder Detection

Spring Cloud Stream automatically detects and uses a binder found on the classpath. More: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_binder_detection, multiple binders on the classpath: https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#multiple-binders

Channel Factory Interfaces

Sink is an input channel factory. It uses the @Input annotation to indicate that an input binding target will be created by the framework:

public interface Sink {
  String INPUT = "input";

  @Input(Sink.INPUT)
  SubscribableChannel input();
}

Source is an output channel factory. It uses the @Output annotation to indicate that an output binding target will be created by the framework:

public interface Source {
  String OUTPUT = "output";

  @Output(Source.OUTPUT)
  MessageChannel output();
}

Processor is an factory for both input and output channels:

public interface Processor extends Source, Sink {
}

The @Input and @Output annotations can be used directly to mark input and output channel factory methods on a custom interface:

public interface MyCustomBidirectionalOutfit {

  String INPUT_CHANNEL_BEAN_NAME = "input-channel";
  String OUTPUT_CHANNEL_BEAN_NAME = "output-channel";

  @Input(INPUT_CHANNEL_BEAN_NAME)
  SubscribableChannel inputChannel();

  @Output(OUTPUT_CHANNEL_BEAN_NAME)
  MessageChannel outputChannel();
}

Sink, Source, Processor and custom interfaces annotated with @Input and @Output can be used to parameterize the @EnableBinding annotation.

Annotations

Testing

Configuration