Ir al contenido principal

Cómo hacer una integración asíncrona utilizando Red Hat Fuse

Red Hat Fuse

Ya hemos visto varios ejemplos de cómo utilizar Red Hat Fuse, la solución para realizar integraciones ágiles de Red Hat. ¡Búscalos en nuestro Blog! Y hoy daremos un paso más y veremos cómo podemos realizar una integración asíncrona en Red Hat

En el mundo real, no todas las conexiones se realizan de forma síncrona, y es muy habitual que distintas aplicaciones utilicen un sistema de mensajería para la comunicación entre ellas. Ya sea para aumentar la fiabilidad en la comunicación, para permitir el envío a varios destinatarios o para acomodar la recepción de información al ritmo deseado por el consumidor. 

¿Qué utilizaremos para nuestro ejemplo?

  • Utilizaremos ActiveMQ como sistema de mensajería, posiblemente el broker de código abierto más popular. Que además también se encuentra incluido en la solución aportada por Red Hat Fuse. Ahora, ya estamos apunto de realizar la integración asíncrona en Red Hat.
  • Para el ejemplo nos basaremos en microservicios de integración realizados con Spring Boot y en el cual utilizaremos las siguientes versiones para la realización de la integración asíncrona:
    • Red Hat Fuse 7.11.1
    • Spring Boot 2.5.13
    • ActiveMQ 5.11.0

1. Docker Compose y configuración

Como sabemos, uno de los puntos fuertes de Red Hat Fuse es su clara orientación hacia la nube y agile. Por lo que prepararemos un docker compose con la imagen creada de nuestra integración y una instancia de ActiveMQ. Esta última parte, queda de la siguiente forma:

image: rmohr/activemq
mem_limit: 1G
hostname: sandbox-activemq
container_name: sandbox-activemq
ports:
  - 8161:8161
  - 61616:61616
volumes:
  - ./activemq.xml:/opt/apache-activemq-5.15.6/conf/activemq.xml

Sobre esta configuración podemos indicar lo siguiente:

  • El puerto 8161 es el de administración y el que nos permitirá acceder a la consola de gestión a través de la URL http://localhost:8161/admin/index.jsp. 
  • El puerto 61616 es el puerto TCP utilizado para la comunicación con ActiveMQ
  • El fichero activemq.xml, el cual se puede encontrar dentro de la misma imagen, es el fichero que nos permitirá configurar el comportamiento de ActiveMQ. Y entre otras cosas, la gestión de los usuarios que tengan acceso a las colas y/o topics. 

Si queremos autenticación para hacer uso de las colas deberemos incluir el siguiente trozo de código en el fichero activemq.xml:

<plugins>
    <simpleAuthenticationPlugin anonymousAccessAllowed="true">
        <users>
            <authenticationUser username="privateUser" password="P#s5W0rd" groups="admins" />
            <authenticationUser username="system" password="manager" groups="admins,publishers,consumers"/>
        </users>
    </simpleAuthenticationPlugin>
    <authorizationPlugin>
        <map>
            <authorizationMap>
                <authorizationEntries>
                    <authorizationEntry queue="privateQueue" read="admins" write="admins" admin="admins" /> 
                    <authorizationEntry queue=">" write="anonymous,admins" read="anonymous,admins" admin="anonymous,admins" />
                    <authorizationEntry topic="ActiveMQ.Advisory.>" write="*" read="*" admin="*" />
                </authorizationEntries>
            </authorizationMap>
        </map>
     </authorizationPlugin>
</plugins>

Con este código estamos configurando lo siguiente:

  • simpleAuthenticationPlugin: Nos permite configurar manualmente usuarios, contraseñas y los grupos a los que están asociados. La otra opción para securizar el acceso, y cómo debe hacerse en entornos reales, sería a través del plugin JAAS. 
  • anonymousAccessAllowed: Nos permite indicar que también permitiremos el acceso anónimo. De esta forma podemos tener algunas colas securizadas y otras no. El usuario y grupo asociado por defecto es anonymous
  • authorizationEntries: Es la configuración que queremos aplicar a las colas de forma genérica o específica. 
  • authenticationUser: Hay que añadir al usuario system para evitar errores al realizar operaciones desde la consola. 

Para nuestro ejemplo crearemos dos tipos de colas:

  • Una privada, denominada privateQueue, en la cual solo puedan crear, escribir o leer los usuarios administradores. 
  • Y el resto, pública donde podrán escribir, leer o crear cualquier usuario. Indicado por el símbolo ‘>’. 

2. Pasos para el envío de mensajes

Ahora que lo tenemos correctamente configurado podemos proceder a crear el recurso que nos permita realizar el envío de mensajes. La idea es crear un servicio REST que almacene la información que recibe en una cola de ActiveMQ. 

rest().post("activemq/public").produces(MediaType.APPLICATION_JSON_VALUE)
  .consumes(MediaType.APPLICATION_JSON_VALUE).route()
 .routeId("postPublicQueue").log("Message incoming - ${body}")
  .to("jmsComponent:queue:publicQueue?exchangePattern=InOnly")
  .setHeader(Exchange.HTTP_RESPONSE_CODE, constant(202));

1.Como vemos en la configuración, necesitamos indicar a nuestra integración con Red Hat Fuse que no se quede esperando por respuesta, ya que no recibirá ninguna del ActiveMQ. Esto lo podemos hacer a través de la configuración ‘exchangePattern=InOnly‘. De otra forma, en cada llamada, recibiremos un mensaje de error similar al siguiente:

org.apache.camel.ExchangeTimedOutException: The OUT message was not received
within: 20000 millis due reply message with correlationID

2. Ahora sí hiciéramos la siguiente invocación podríamos mandar un mensaje a la cola pública. 

curl --location --request POST 'http://localhost:8080/camel/activemq/public' \
--header 'Content-Type: application/json' \
--data-raw '{"prop":"value"}'

3. Si quisiéramos escribir con un usuario anónimo en una cola privada, como en el siguiente ejemplo, nos saltará una excepción de seguridad. 

rest().post("activemq/public/notAllowed")
  .produces(MediaType.APPLICATION_JSON_VALUE)
  .consumes(MediaType.APPLICATION_JSON_VALUE).route()
  .routeId("postPublicNotAllowedQueue").log("Message incoming - ${body}")
  .to("jmsComponent:queue:privateQueue?exchangePattern=InOnly")
  .setHeader(Exchange.HTTP_RESPONSE_CODE, constant(202));

4. Y para la cola privada será un poco más complicado. Primero creamos una conexión de tipo ActiveMQConnectionFactory que permita configurar un usuario y contraseña. Ya que el tipo de conexión por defecto usa SingleConnectionFactory y este no admite la autenticación. La creación de este bean de configuración, que nos permitirá indicar otros valores como la URl del broker, también nos servirá para pasárselo al constructor del componente jms, que estamos utilizando para los ejemplos sin autenticación. 

 @Bean
  public ConnectionFactory activeMQConnectionFactory() {
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
    connectionFactory.setBrokerURL(brokerUrl);
    return connectionFactory;
  }

  @Bean
  public JmsComponent jmsComponent() throws JMSException {
    JmsComponent jms = new JmsComponent();
    jms.setConnectionFactory(activeMQConnectionFactory());
    return jms;
  }
}

Y segundo, será indicar en la cadena de conexión el usuario y contraseña. Si lo quisiéramos de forma generalizada podríamos configurarlo a nivel del fichero application.properties. Pero en este caso solo se lo queremos aplicar a un recurso. Y por tanto lo pondremos como parte de la cadena de conexión del componente activemq

rest().post("activemq/private").produces(MediaType.APPLICATION_JSON_VALUE)
  .consumes(MediaType.APPLICATION_JSON_VALUE).route()
  .routeId("postPrivateQueue").log("Private message incoming - ${body}")
.to("jmsComponent:queue:privateQueue?connectionFactory=activeMQConnectionFactory&exchangePattern=InOnly&username=privateUser&password={{activemq.privateQueue.password}}")
  .setHeader(Exchange.HTTP_RESPONSE_CODE, constant(202));

3. Lectura de mensajes en la cola

Ahora veremos cómo hacer un recurso de la API que nos permita leer de la cola. Esta es la parte más sencilla, pues simplemente será cambiar el método to que nos permite producir, por el método from que nos permite consumir. La configuración será la misma. 

// Consumers
from("jmsComponent:queue:publicQueue").log("Reading public message incoming - ${body}").end();

from(
    "jmsComponent:queue:privateQueue?connectionFactory=activeMQConnectionFactory&exchangePattern=InOnly&username=privateUser&password={{activemq.privateQueue.password}}")
  .log("Reading private message incoming - ${body}").end();

Ahora si volvemos a utilizar los métodos anteriores, veremos cómo enviamos un mensaje y es leído por las rutas consumidoras. 

4. Conclusión

Como habrás observado, realizar una integración asíncrona en Red Hat entre sistemas es fácil y con apenas configuración. Además podemos crear la imagen y desplegarla cómodamente para su utilización en local o en la nube. ¿Necesitas ayuda en tus integraciones? ¿Soporte con Red Hat? ¡No dudes en contactarnos!