WildFly HornetQ-Based Messaging Subsystem Concepts
External
- Messaging chapter in "EAP 6.4 Administration and Configuration Guide" https://access.redhat.com/documentation/en-US/JBoss_Enterprise_Application_Platform/6.4/html/Administration_and_Configuration_Guide/chap-Messaging.html
- EAP 5 HornetQ User Guide (there isn't a newer one in EAP 6) https://access.redhat.com/site/documentation/en-US/JBoss_Enterprise_Application_Platform/5/html/HornetQ_User_Guide/index.html
- HornetQ User Manual http://docs.jboss.org/hornetq/2.4.0.Final/docs/user-manual/html/index.html
Internal
Overview
This page contains a descriptions of the concepts behind WildFly HornetQ-based messaging subsystem.
Address
A server maintains a mapping between an address and a set of queues. Zero or more queues can be bound to a single address. Each queue can be bound with an optional message filter. When a message is routed, it is routed to the set of queues bound to the message's address. If any of the queues are bound with a filter expression, then the message will only be routed to the subset of bound queues which match that filter expression. Other entities, such as diverts can also be bound to an address and messages will also be routed there.
Addresses can be configured in:
- cluster connections
- topics (implicitly, by the name of the topic)
Configuration applying generically to a class of messages can be specified by associating configuration to an address. See <address-settings>.
Queue
A queue is the primary message handling mechanism at core level. Queues may be exposed as JMS queues. Queues can be bound with an optional filter expression. If a filter expression is supplied then the server will only route messages that match that filter expression to any queues bound to the address.
Many queues can be bound to a single address. A particular queue is only bound to a maximum of one address.
Durable Queue
If a queue is durable, the persistent messages transiting the queue are stored on persistent storage, so they survive a server crash or restart.
A non-durable queue does not sure message survivability in case of crash or restart, even if the message is declared persistent.
Temporary Queue
A temporary queue is automatically deleted when the client connection is closed, if they are not explicitly deleted before that.
JMS Queue
HornetQ core is JMS agnostic. It does not have the concept of a JMS queue. A JMS queue would be implemented as a single address (the JMS queue name) to which one queue is bound - there is an one-to-one mapping between the core queue and the JMS queue.
By convention, all JMS queues map to core queues where the core queue name has the string "jms.queue." prepended to it. For example, the JMS queue named "novaordis" would map to the core queue with the name "jms.queue.novaordis". The address at which the core queue is bound is also given by the core queue name, following the same pattern "jms.queue.<core-queue-name>"
Settings for a specific JMS queue can be configured by adding an "address-setting" that matches the queue name. For more details see Configure Settings for a JMS Queue.
Topic
The HornetQ core has no concept of a topic. Topics are available only at JMS level. A JMS topic is implemented as a single address to which many queues are bound. Each queue represents a subscription of the topic.
For details on how to deploy a JMS topic see:
By convention, all core queues that make a JMS topic are bound under the core address "jms.topic.<topic-name>". If a JMS messages is sent to a JMS topic with name "novaordis.announcements" it will get routed on the server to any core queues bound to the address "jms.topic.novaordis.announcements".
Acceptors and Connectors
Acceptor
An acceptor is a HornetQ mechanism that defines which types of connections are accepted by a HornetQ server. A specific acceptor matches a specific connector that initiates the connection. The connectors can be thought of as "the initiating end of the connection" and they are either clients or other server nodes that attempt to cluster.
Connector
A connector is a HornetQ mechanism that defines how a client or another server can connect to a server. The connector information is used by the HornetQ clients. A specific connector matches a specific acceptor. A server may broadcast its connectors over network, as a way to make itself known and allow clients (and other servers in the same cluster) to connect to it.
Persistence
Paging
Security
HornetQ does not allow creation of unauthenticated connections, unless the security is explicitly disabled. The connection's user name and password are authenticated against the "other" security domain, and the user must belong to the "guest" role. The security domain name is not explicitly specified in the configuration, but the default value is "other", and the following declaration has the same effect as the default configuration:
<subsystem xmlns="urn:jboss:domain:messaging:1.2"> <hornetq-server> ... <security-domain>other</security-domain> ...
For more details about the relationship between security domains and security realms, see Relationship between a Security Realm and a Security Domain.
Users can be added to the ApplicationRealm with add-user.sh. The user can be added to a certain role by assigning it to a "group" with the same name when adding with add-user.sh.
For general security configuration details, see:
For details on how to secure destinations, see:
For details on how to secure cluster connections, see:
Connection Factory
HornetQ exposes two different types of ConnectionFactories: simple ConnectionFactories and pooled ConnectionFactories.
Simple ConnectionFactory
<connection-factory name="InVmConnectionFactory"> <connectors> <connector-ref connector-name="in-vm"/> </connectors> <entries> <entry name="java:/ConnectionFactory"/> </entries> </connection-factory>
<connection-factory name="RemoteConnectionFactory"> <connectors> <connector-ref connector-name="netty"/> </connectors> <entries> <entry name="java:jboss/exported/jms/RemoteConnectionFactory"/> </entries> <ha>true</ha> <block-on-acknowledge>true</block-on-acknowledge> <retry-interval>1000</retry-interval> <retry-interval-multiplier>1.0</retry-interval-multiplier> <reconnect-attempts>-1</reconnect-attempts> </connection-factory>
Pooled ConnectionFactory
<pooled-connection-factory name="hornetq-ra"> <transaction mode="xa"/> <connectors> <connector-ref connector-name="in-vm"/> </connectors> <entries> <entry name="java:/JmsXA"/> </entries> </pooled-connection-factory>
The pooled connection factory is used to deploy resource adapters.
Sending Messages
send()
By default, non-transactional persistent messages are sent in blocking mode: the sending thread blocks until the acknowledgement for the message is received from the server. The configuration elements that controls this behavior, for both durable and non-durable messages, are block-on-durable-send and block-on-non-durable-send. The actual values a server is configured with can be obtained via CLI:
/subsystem=messaging/hornetq-server=active-node-pair-A/connection-factory=HighlyAvailableLoadBalancedConnectionFactory:read-attribute(name=block-on-durable-send)
Relationship between block-on-durable-send and block-on-acknowledge, it is probably related to "asynchronous send acknowledgements". More documentation: http://docs.jboss.org/hornetq/2.4.0.Final/docs/user-manual/html_single/#non-transactional-sends. http://docs.jboss.org/hornetq/2.4.0.Final/docs/user-manual/html_single/#send-guarantees.nontrans.acks, http://docs.jboss.org/hornetq/2.4.0.Final/docs/user-manual/html_single/#asynchronous-send-acknowledgements.
HornetQ allows for an optimization called "Asynchronous Send Acknowledgements": the acknowledgment is sent back on a separate stream. The send() method still blocks until it receives the acknowledgment.
Writing to Journal
Once a persistent message reaches the server, it must be written to the journal. Writing to journal is controlled by the journal-sync-transactional and journal-sync-non-transactional. Both are true by default.
The actual value can be read with CLI:
/subsystem=messaging/hornetq-server=active-node-pair-A:read-attribute(name=journal-sync-non-transactional)
Also see:
Replication
This section only makes sense if the server is in HA mode and message replication was enabled. Replication is asynchronous, but it is initiated before the message is written in the persistent storage. Full explanation:
ConnectionFactory Load Balancing
See below
Connection Factory High Availability
It is turned on with:
<connection-factory name="RemoteConnectionFactory">
...
<ha>true</ha>
...
</connection-factory>
Pooled Connection Factory High Availability
Note that for pooled connection factories, the <ha>true</ha> setting only applies to the outbound configuration - when the connection is used by message producers. When the inbound functionality of the pooled connection factory is used, as it is the case with the MDB containers, the container must specify it want HA functionality on the pooled connection by turning on specific activation configuration properties:
Resource Adapter
The HornetQ JMS provider is exposed as a resource adapter to the applications deployed within the application server. The relevant configuration is:
<jms-connection-factories> <pooled-connection-factory name="hornetq-ra"> <transaction mode="xa"/> <connectors> <connector-ref connector-name="in-vm"/> </connectors> <entries> <entry name="java:/JmsXA"/> </entries> </pooled-connection-factory> </jms-connection-factories>
The pooled connection factory associated with the resource adapter is normally used by MDB containers to receive messages from remote HornetQ instance.
This is an example of how a regular JEE component (servlet in this case) can use the pooled connection factory associated with the resource adapter to send messages to the remote HornetQ instance: https://github.com/NovaOrdis/playground/tree/master/jboss/messaging/sending-and-receiving-from-remote-destinations and https://github.com/NovaOrdis/playground/tree/master/jboss/messaging/sending-and-receiving-from-injected-remote-destinations. Note that the pooled connection factory must be configured as described here Configuring a Remote HornetQ JMS Server as a Resource Adapter.
This is an example of how to configure an EAP instance so MDB containers can use a remote HornetQ cluster: Configuring a Remote HornetQ JMS Server as a Resource Adapter. More details are provided there, but essentially, the pooled connection factory declares a number of connectors, each connector to a remote node in the cluster:
<connectors> <netty-connector name="remote-hornetq-node-1-connection" socket-binding="remote-hornetq-node-1-socket-binding"/> <netty-connector name="remote-hornetq-node-2-connection" socket-binding="remote-hornetq-node-2-socket-binding"/> <netty-connector name="remote-hornetq-node-3-connection" socket-binding="remote-hornetq-node-3-socket-binding"/> </connectors> ... <jms-connection-factories> <pooled-connection-factory name="hornetq-remote-ra"> ... <connectors> <connector-ref connector-name="remote-hornetq-node-1-connection"/> <connector-ref connector-name="remote-hornetq-node-2-connection"/> <connector-ref connector-name="remote-hornetq-node-3-connection"/> </connectors> ... </pooled-connection-factory> </jms-connection-factories>
Clustering
- EAP 6.4 Administation and Configuration Manual - HornetQ Clustering https://access.redhat.com/documentation/en-US/JBoss_Enterprise_Application_Platform/6.4/html/Administration_and_Configuration_Guide/sect-HornetQ_Clustering.html
Clustering in this context means establishing a mesh of HornetQ nodes. The main purpose of creating a cluster is to spread message processing load across more than one node, via load balancing.
Load Balancing
Load balancing is the capability of a system to distribute load among different equivalent processing nodes. A HornetQ cluster can distribute messages among its component nodes. HornetQ implements load balancing at two different levels:
ConnectionFactory (Client-Side) Load Balancing
A connection factory has the capability to establish connections to more than one HornetQ nodes if it was configured to do so, and load balance between those connections, effectively load balancing messages between nodes. This capability is activated by declaring more than one connector to be associated with the connection factory.
TODO: example fragment.
Even after load balancing was configured as described above, it is, by default, not round robin, but dictated by internal implementation details. The load balancing policy can be explicitly configured, though, in the connection factory declaration, as shown below:
<connection-factory|pooled-connection-factory ...>
...
<connection-load-balancing-policy-class-name>
org.hornetq.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy
</connection-load-balancing-policy-class-name>
</connection-factory|pooled-connection-factory>
Load balancing among nodes will only work in if target nodes are clustered, meaning there is a clustered connection and implicitly a core bridge between them. If a core bridge does not exist, and the nodes are independent of each other, the first connection created to one of the nodes, chosen non-deterministically, will be used by the connection factory to send all the messages, and load balancing policy, even if explicitly configured, will not be used. Load balancing will not kick in respective of the fact the ConnectionFactory is creating and using connections sequentially or concurrently.
The configured load balancing policy is effective only if the connection factory (and respectively the resource adaptor, if the connection factory was used to back a resource adapter) is accessed and used concurrently - connections are created and used to send messages from multiple threads. If the connection factory is accessed sequentially in a loop, the messages are not load balanced among nodes, they will all go to the node that first established a connection. In different words, if one connection is capable of handling the load, traffic is not load balanced
All a client needs to do in order to load balance messages across a number of HornetQ nodes is to look up a connection factory that was configured to load balance amongst those nodes. As explained above, the nodes need to be clustered for the load balancing to be available, and the client must use the connection factory concurrently. For an example of how such a connection factory is configured see
For more details about ConnectionFactory configuration see
Symmetric Cluster
The most common cluster topology is a symmetric cluster. In a symmetric cluster, each HornetQ node connects directly over dedicated cluster connections to every other node in the cluster, and each cluster connection defines its max-hops attribute to be equal with 1. Each active node in the cluster acts as an independent HornetQ server and manages its own connections and messages. HornetQ insures that messages can be intelligently load balanced between the servers in the cluster, according to the number of consumers on each node, and whether they are ready for messages. HornetQ also has the ability to automatically redistribute messages between nodes of a cluster to prevent starvation on any particular node. HornetQ achieves this by insuring that each node knows about all the queues that exist on all the other nodes and what consumers they have.
Clustering does not automatically insure high availability. Go here for more details on HornetQ high availability.
Cluster Connection
The elements that turns a HornetQ instance into a clustered HornetQ instance is the presence of one or more cluster connections in the configuration, and the configuration setting that tells the instance that is clustered: <clustered>true</clustered>. Since EAP 6.1, the <clustered> element is redundant, the simple presence of a cluster connection turns on clustering. For more details see <clustered> post EAP 6.1.
A cluster connections represents a communication channel between the cluster connection origin and its target.
The cluster connections is unidirectional. it allows the flow of payload and control messages from the origin to the target only. If a cluster connection is established between A -> B and messages arrive to node B, they'll stay on node B even if the same destination is deployed on both nodes.
An origin node can open one or more cluster connections. Each cluster connection can be established to a target node and a target node only, by explicitly declaring the cluster connection in its configuration. Note that even more than one target connectors can be listed in a cluster connection configuration, only the first one will be used, the others will be ignored. See A Cluster Connection has One Target Only.
The cluster connection is then used to send payload and traffic control messages into the target node. A core bridge is automatically created internally when a cluster connection is declared to another node.
Core Bridge
A core bridge is a construct that consumes messages from a source queue and forwards them to a target queue.
A core bridge is automatically created when a cluster connection is declared. Messages are passed between nodes over core bridges. Core bridges consume messages from a source queue and forward them to a target queue deployed on a HornetQ node which may or may not be in the same cluster.
Cluster Connection Configuration
For an example on how to configure a WildFly HornetQ-based messaging cluster see:
Cluster Connection Load Balancing
A cluster connection load balances messages amongst its target and its originator, according to a round robin algorithm, provided that the destination is deployed on both nodes. This feature is called message redistribution and it is subject to configuration (resdistribution-delay, etc.). This capability can be combined with Connection Factory Load Balancing features.
Cluster Connections in Log Messages
The cluster connections appear in log messages as follows:
sf.<cluster-connection-name-as-declared-on-the-initiator-node>.<target-hornetq-node-UUID>
Example:
sf.load-balancing.ec55aece-195d-11e6-9d57-c3467f86a8cb
Broadcast and Discovery Groups
Broadcast Group
A broadcast group is a HornetQ mechanism used to advertise connector information over the network. If the HornetQ server is configured for high availability, thus has an active and a stand-by node, the broadcast group advertises connector pairs: a live server connector and a stand-by server connector.
Broadcast groups use internally either UDP multicast or JGroups channels.
Discovery Group
A discovery group defines how connector information broadcasted over a broadcast group is received from the broadcast endpoint. Discovery groups are used by JMS clients and cluster connections to obtain the initial connection information in order to download the actual topology.
A discovery group maintains lists of connectors (or connector pairs, in case the server is HA), one connector per server. Each broadcast updates the connector information.
A discovery group implies a broadcast group, so they require either UDP or JGroups.
Does "discovery group" intrinsically mean multicast - or we can have "static" discovery groups where the connectors are statically declared?
High Availability
- EAP 6.4 Administration and Configuration Guide - HornetQ High Availability https://access.redhat.com/documentation/en-US/JBoss_Enterprise_Application_Platform/6.4/html/Administration_and_Configuration_Guide/sect-High_Availability.html
- Configure high-availability and fail-over for HornetQ in JBoss EAP 6 https://access.redhat.com/solutions/169683
High Availability in this context means the ability of HornetQ to continue functioning after the failure of one or more nodes. High availability is implemented on the server side by using a pair of active/stand-by (backup) broker nodes, and on the client side by logic that allows client connections to automatically migrate from the active sever to the stand-by server in event of active server failure.
High Availability does not necessarily mean that the load is spread across more than one active node. Go here for more details about HornetQ clustering for load balancing.
Replication Types
For step-by-step instruction on how to configure such a topology see:
For an example of a shared filesystem-based replication diagram, see Collocated Topology below.
Message Replication
In a message replication configuration, the active and stand-by nodes do not share filesystem-based data stores. The message replication is done via network traffic over cluster connections. A backup server will only replicate with a live server within the same group name (<backup-group-name>). Note that a backup group name only has two nodes: the active and the stand-by.
For message replication, the journals, and not individual messages are replicated. Since only the persistent messages make it to the journal, only the persistent messages are replicated.
When the backup server comes on-line, it attempts to connect to its live server to attempt synchronization. While the synchronization is going on, the node is unavailable as backup server. If the backup server comes online and no live server is available, the backup server will wait until the live server is available in the cluster.
if the live server fails, the fully synchronized backup server attempts to become active. The backup server will activate only if the live server has failed and the backup server is able to connect to more than half of the servers in the cluster. If more than half of the other servers in the cluster also fail to respond it would indicate a general network failure and the backup server will wait to retry the connection to the live server.
Fail-back: To get to the original state after failover, it is necessary to start the live server and wait until it is fully synchronized with the backup server. When this has been achieved, the original live server will become active only after the backup server is shut down. This happens automatically if the <allow-failback> attribute is set to true.
How is the Replication Done
Configuration
For step-by-step instruction on how to configure such a topology see:
Dedicated Topology
For step-by-step instruction on how to configure such a topology see:
Collocated Topology
The diagram shows an example of collocated topology where the high availability is insured by writing journals on a shared file system.
For step-by-step instruction on how to configure such a topology see:
Collocated Topology with Message Replication, no Load Balancing
The diagram shows an example of collocated topology where the high availability is achieved by message replication. No shared filesystem is required.
For step-by-step instruction on how to configure such a topology see:
Collocated Topology with Load Balancing
Collocated Topology with Message Replication and Load Balancing
Server State Replication
TODO Explain this:
HornetQ does not replicate full server state between live and backup servers. When the new session is automatically recreated on the backup it will not have any knowledge of messages already sent or acknowledged in that session. Any in-flight sends or acknowledgments at the time of fail-over might also be lost.
Client-Side Failover
Playground Example
Basics
In order for the failover to occur, the ConnectionFactory must be declared to be <ha>true</ha> (see below) and upon encountering an exception, the client must recreate the connection.
<connection-factory name="RemoteConnectionFactory"> <connectors> <connector-ref connector-name="netty"/> </connectors> <entries> <entry name="java:jboss/exported/jms/RemoteConnectionFactory"/> </entries> <ha>true</ha> <block-on-acknowledge>true</block-on-acknowledge> <retry-interval>1000</retry-interval> <retry-interval-multiplier>1.0</retry-interval-multiplier> <reconnect-attempts>1</reconnect-attempts> </connection-factory>
Failover Limitations
Due to the way HornetQ was designed, the failover is not fully transparent and it requires application’s cooperation.
There are two notable situations when the application will be notified of live server failure:
- The application performs a blocking operations (for example a message send()). In this situation, if a live server failure occurs, the client side messaging runtime will interrupt the send operation and make it throw a JMSExcepiton.
- The live server failure occurs during a transaction. In this case, the client-side messaging runtime rolls back the transaction.
Automatic Client Fail-Over
- EAP 6.4 Administration and Configuration Guide - Automatic Client Failover https://access.redhat.com/documentation/en-US/JBoss_Enterprise_Application_Platform/6.4/html/Administration_and_Configuration_Guide/sect-High_Availability.html#Automatic_Client_Failover
- EAP 6.4 Administration and Configuration Guide - Application-Level Failover https://access.redhat.com/documentation/en-US/JBoss_Enterprise_Application_Platform/6.4/html/Administration_and_Configuration_Guide/sect-High_Availability.html#Application-Level_Failover
HornetQ clients can be configured with knowledge of live and backup servers, so that in event of live server failure, the client will detect this and reconnect to the backup server. The backup server will then automatically recreate any sessions and consumers that existed on each connection before fail-over, thus saving the user from having to hand-code manual reconnection logic. HornetQ clients detect connection failure when they have not received packets from the server within the time given by 'client-failure-check-period'.
Client code example:
final Properties env = new Properties(); env.put(Context.INITIAL_CONTEXT_FACTORY, "org.jboss.naming.remote.client.InitialContextFactory"); env.put("jboss.naming.client.connect.timeout", "10000"); env.put(Context.PROVIDER_URL, "remote://<host1>:4447,remote://<host2>:4447"); env.put(Context.SECURITY_PRINCIPAL, "username"); env.put(Context.SECURITY_CREDENTIALS, "password"); Context context = new InitialContext(env); ConnectionFactory cf = (ConnectionFactory) context.lookup("jms/RemoteConnectionFactory");
If <host1> (i.e. your "live" server) is down it will automatically try <host2> (i.e. your "backup" server).
If you want the JMS connections to move back to the live when it comes back then you should set <allow-failback> to "true" on both servers.
Failover in Case of Administrative Shutdown of the Live Server
HornetQ allows the possibility to specify the client-side failover behavior in case of administrative shutdown of the live server. There are two options:
- Client does not fail over to the backup server on administrative shutdown of the live server. If the connection factory is configured to contain other live server connectors, the client will reconnect to those; if not, it will issue a warning log entry and close the connection.
- Client does fail over to the backup server on administrative shutdown of the live server. If there are no other live servers available, this is probably a sensible option.
WildFly Clustering and HornetQ High Availability
HornetQ High Availability is configured independently of WildFly Clustering, so a configuration in which WildFly nodes are running in a non-clustered configuration but the embedded HornetQ instances are configured for High Availability is entirely possible.
Generic JMS Client with HornetQ
Related:
Pre-Conditions for a JMS Client to Work
No JNDI Authentication
WildFly requires invocations into the remoting subsystems to be authenticated. JNDI uses remoting, so in order to make JNDI calls we either need to enable remoting authentication and make remoting client calls under an authenticated identity, or disable remoting authentication. See:
Authenticated User Present
Add an "ApplicationRealm" user that belongs to the "guest" role. That identity will be used to create JMS connections.
For more details on using add-user.sh, see:
Playground Example
Threading Model
Externa HornetQ Thread Management: https://access.redhat.com/documentation/en-US/JBoss_Enterprise_Application_Platform/6.4/html/Administration_and_Configuration_Guide/sect-Thread_Management.html
Also see:
MDB Support
Large Messages
A message is considered "large" if its size exceed the "min-large-message-size" configuration element. By default, messages over 100KiB are considered large. While being handled by the broker, large messages are stored individually in "largemessage" folder inside the servers' data directory.
The size of a "large" message is configured on the connection factory:
<connection-factory name="..."> ... <min-large-message-size>3145728</min-large-message-size> </connection-factory>
Note that 'journal-buffer-size' must be adjusted to be larger or equal to 'min-large-message-size', otherwise the broker will fail when it handles large messages with:
HQ224016: Caught exception: java.lang.IllegalStateException: Can't write records bigger than the bufferSize(501760) on the journal