We’ve already seen a few examples of how to use Red Hat Fuse, the American software company’s agile integration solution. (Head to our Blog to find out more!) And today, we’ll take things a step further by taking a look at how we can perform asynchronous integration in Red Hat.
In the real world, not all connections are synchronous, and applications often use some form of messaging system to communicate with each another. This may be to boost communication reliability, facilitate delivery to multiple recipients, or to accommodate the receiving of information at the desired pace of the user.
What are we going to use for our example?
- We’ll use ActiveMQ as our messaging system, which is arguably the most popular open-source broker. It’s also included in the solution provided by Red Hat Fuse. We’re now ready to perform the asynchronous integration in Red Hat.
- For this example, we’ll be using the following versions of integration microservices made with Spring Boot to perform the asynchronous integration:
- Red Hat Fuse 7.11.1
- Spring Boot 2.5.13
- ActiveMQ 5.11.0
1. Docker Compose and configuration
As we know, one of the advantages of Red Hat Fuse is its cloud and agile centric approach. For this, we’ll create a docker compose using the image generated from our integration and an instance of ActiveMQ. This last part is as follows:
image: rmohr/activemq mem_limit: 1G hostname: sandbox-activemq container_name: sandbox-activemq ports: - 8161:8161 - 61616:61616 volumes: - ./activemq.xml:/opt/apache-activemq-5.15.6/conf/activemq.xml
We can state the following with regard to this configuration:
- Port 8161 is the administration port, and will allow us to access the management console via the URL http://localhost:8161/admin/index.jsp.
- Port 61616 is the TCP port used for ActiveMQ
- The activemq.xml file, which can be found inside the same image, will allow us to configure the behavior of ActiveMQ. This includes, among other things, the management of users that have access to queues and/or topics.
If we require authentication in order to use the queues, we must include the following code snippet in the activemq.xml file:
<plugins> <simpleAuthenticationPlugin anonymousAccessAllowed="true"> <users> <authenticationUser username="privateUser" password="P#s5W0rd" groups="admins" /> <authenticationUser username="system" password="manager" groups="admins,publishers,consumers"/> </users> </simpleAuthenticationPlugin> <authorizationPlugin> <map> <authorizationMap> <authorizationEntries> <authorizationEntry queue="privateQueue" read="admins" write="admins" admin="admins" /> <authorizationEntry queue=">" write="anonymous,admins" read="anonymous,admins" admin="anonymous,admins" /> <authorizationEntry topic="ActiveMQ.Advisory.>" write="*" read="*" admin="*" /> </authorizationEntries> </authorizationMap> </map> </authorizationPlugin> </plugins>
With this code we’re configuring the following:
- simpleAuthenticationPlugin: This allows us to manually configure users, passwords, and the groups with which they are associated. The other option for securing access, and how it should be done in real-time environments, is via the JAAS plugin.
- anonymousAccessAllowed: This allows us to indicate whether or not we’ll allow anonymous access. This means some queues can be secured and others left open. The default user and associated group is anonymous.
- authorizationEntries: This is the default or specific configuration that we wish to apply to the queues.
- authenticationUser: The user system must be included to avoid errors when performing operations using the console.
For this example, we will create two types of queues:
- A private one, called privateQueue, that can only be created, written, or read by administrator users.
- And a public one, that can be written, read, or created by any user. Public queues are indicated by the ‘>’ symbol.
2. Steps for sending messages
Now that we’ve correctly configured it, we can create the resource that will allow us to send messages. The idea is to create a REST service that stores the information it receives in an ActiveMQ queue.
rest().post("activemq/public").produces(MediaType.APPLICATION_JSON_VALUE) .consumes(MediaType.APPLICATION_JSON_VALUE).route() .routeId("postPublicQueue").log("Message incoming - ${body}") .to("jmsComponent:queue:publicQueue?exchangePattern=InOnly") .setHeader(Exchange.HTTP_RESPONSE_CODE, constant(202));
1.As we can see from the configuration, we need to tell our Red Hat Fuse integration not to wait for a response, as it won’t receive any from ActiveMQ. This is done via the ‘exchangePattern=InOnly‘ configuration. If not, we’ll receive an error message similar to the following on every call:
org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 20000 millis due reply message with correlationID
2. Now, if we were to make the following call, we could send a message to the public queue.
curl --location --request POST 'http://localhost:8080/camel/activemq/public' \ --header 'Content-Type: application/json' \ --data-raw '{"prop":"value"}'
3. If we wanted to write as an anonymous user in a private queue, as in the following example, we would trigger a security alert.
rest().post("activemq/public/notAllowed") .produces(MediaType.APPLICATION_JSON_VALUE) .consumes(MediaType.APPLICATION_JSON_VALUE).route() .routeId("postPublicNotAllowedQueue").log("Message incoming - ${body}") .to("jmsComponent:queue:privateQueue?exchangePattern=InOnly") .setHeader(Exchange.HTTP_RESPONSE_CODE, constant(202));
4. For the private queue this is a bit more complicated. First, we must create a ActiveMQConnectionFactory type connection that enables us to set up a username and password. This is because the default connection type uses SingleConnectionFactory and does not support authentication. The creation of this configuration bean, which will allow us to enter other values, such as the broker’s URL, will also be useful for passing it to the jms component constructor, that we are using for the cases in which no authentication-is required.
@Bean public ConnectionFactory activeMQConnectionFactory() { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); connectionFactory.setBrokerURL(brokerUrl); return connectionFactory; } @Bean public JmsComponent jmsComponent() throws JMSException { JmsComponent jms = new JmsComponent(); jms.setConnectionFactory(activeMQConnectionFactory()); return jms; } }
And secondly, we must indicate the username and password in the connection string. If we want a default version of this, this can be configured at the application.properties file level. However, in this case, we simply want to use it for a resource. Therefore, we will include it as part of the connection string of the activemq component.
rest().post("activemq/private").produces(MediaType.APPLICATION_JSON_VALUE) .consumes(MediaType.APPLICATION_JSON_VALUE).route() .routeId("postPrivateQueue").log("Private message incoming - ${body}") .to("jmsComponent:queue:privateQueue?connectionFactory=activeMQConnectionFactory&exchangePattern=InOnly&username=privateUser&password={{activemq.privateQueue.password}}") .setHeader(Exchange.HTTP_RESPONSE_CODE, constant(202));
3. Reading messages in the queue
We’ll now take a look at how to create an API resource that allows us to read from the queue. This is the easiest part, since it simply involves replacing the to method, which allows us to produce, with the from method that allows us to consume. The configuration will remain the same.
// Consumers from("jmsComponent:queue:publicQueue").log("Reading public message incoming - ${body}").end(); from( "jmsComponent:queue:privateQueue?connectionFactory=activeMQConnectionFactory&exchangePattern=InOnly&username=privateUser&password={{activemq.privateQueue.password}}") .log("Reading private message incoming - ${body}").end();
Now, if we go back to previous methods, we’ll see how we can send a message that will be read by the consuming routes.
4. Conclusion
As you’ve seen, performing asynchronous integration in Red Hat across systems is easy and requires minimal configuration. Plus, we can create the image and deploy it conveniently for use on-premises or in the cloud. Need Red Hat support or help with your integrations? Don’t hesitate to contact us!