Spring Boot Integration with RabbitMQ
First, the Messaging Middleware Application Scenario
In modern software development, messaging middleware applications have become increasingly popular for enhancing service capabilities and decoupling system components. In this article, we will explore the integration of RabbitMQ with Spring Boot.
Second, the News Service Middleware Overview
Most applications utilize asynchronous communication systems to improve service capabilities through message middleware, thereby enhancing decoupling ability. Messaging services rely on two key concepts: message brokers and destinations.
Message Broker
A message broker is responsible for receiving messages from producers and routing them to the designated destinations. There are two primary forms of message queues: queues and topics.
Queues (Queue)
Queues are point-to-point messaging systems where a producer sends a message to a queue, and the message is retrieved by a single consumer from the queue.
Topics (Topic)
Topics are publish-subscribe messaging systems where a producer sends a message to a topic, and multiple consumers subscribe to the topic to receive the message.
Third, RabbitMQ Profile
RabbitMQ is an open-source implementation of the Advanced Message Queuing Protocol (AMQP). It is developed by the Erlang programming language and is widely used in the industry.
Core Concepts
- Message: A message is anonymous and consists of a message header and a message body. The message body is opaque, and the message header contains optional attributes such as routing keys, priority, and delivery mode.
- Publisher: A publisher is a producer client application that sends messages to the message broker.
- Exchange: An exchange is a switch that receives messages from producers and routes them to the designated queues based on the exchange type.
- Queue: A queue is a container for messages that are waiting to be consumed by the consumer.
- Binding: A binding is a key-based routing rule that associates a queue with an exchange.
- Connection: A connection is an Internet connection, such as a TCP connection.
- Channel: A channel is an independent bi-directional data connection flow that multiplexes multiple channels over a single TCP connection.
- Consumer: A consumer is a client application that consumes messages from the queue.
- Virtual Host: A virtual host represents a group of exchanges, queues, and related objects.
Fourth, RabbitMQ Operating Mechanism
The message routing in RabbitMQ is based on the Advanced Message Queuing Protocol (AMQP). The message routing process is similar to JMS, but with additional features such as exchanges and bindings.
Exchange Types
There are four types of exchanges: direct, fanout, topic, and headers. Each exchange type has its own routing policy.
Fifth, RabbitMQ Installation
We will use Docker to install RabbitMQ. The latest version of RabbitMQ can be obtained from the official Docker Hub.
Sixth, Integration with RabbitMQ
We will create a Spring Boot project to demonstrate the integration with RabbitMQ.
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.gf</groupId>
<artifactId>springboot-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>springboot-rabbitmq</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
MyAMQPConfig
package com.gf.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.support.converter.MessageConverter;
@Configuration
public class MyAMQPConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
SpringbootRabbitmqApplicationTests
package com.gf;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootRabbitmqApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
AmqpAdmin amqpAdmin;
@Test
public void contextLoads() {}
@Test
public void create() {
// Create Exchange
amqpAdmin.declareExchange(new DirectExchange("exchange.direct"));
amqpAdmin.declareExchange(new FanoutExchange("exchange.fanout"));
amqpAdmin.declareExchange(new TopicExchange("exchange.topic"));
// Create Queue
amqpAdmin.declareQueue(new Queue("direct.queue", true));
amqpAdmin.declareQueue(new Queue("fanout.queue", true));
// Bind Queue
amqpAdmin.declareBinding(new Binding("direct.queue", Binding.DestinationType.QUEUE, "exchange.direct", "direct.queue", null));
amqpAdmin.declareBinding(new Binding("fanout.queue", Binding.DestinationType.QUEUE, "exchange.direct", "fanout.queue", null));
amqpAdmin.declareBinding(new Binding("direct.queue", Binding.DestinationType.QUEUE, "exchange.fanout", "", null));
amqpAdmin.declareBinding(new Binding("fanout.queue", Binding.DestinationType.QUEUE, "exchange.fanout", "", null));
amqpAdmin.declareBinding(new Binding("direct.queue", Binding.DestinationType.QUEUE, "exchange.topic", "direct.#.", null));
amqpAdmin.declareBinding(new Binding("fanout.queue", Binding.DestinationType.QUEUE, "exchange.topic", "direct.*.", null));
}
@Test
public void send2Direct() {
Map<String, Object> map = new HashMap<>();
map.put("msg", "which is a point to point message");
map.put("data", Arrays.asList("helloworld", 123, true));
rabbitTemplate.convertAndSend("exchange.direct", "direct.queue", map);
}
@Test
public void send2Topic() {
Map<String, Object> map = new HashMap<>();
map.put("msg", "This is a broadcast message");
map.put("data", Arrays.asList("topic message", 123, true));
rabbitTemplate.convertAndSend("exchange.fanout", "", map);
}
@Test
public void receive() {
Object o = rabbitTemplate.receiveAndConvert("direct.queue");
o.getClass();
System.out.println(o.getClass());
System.out.println(o);
}
}
SpringbootRabbitmqApplication
package com.gf;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableRabbit
public class SpringbootRabbitmqApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootRabbitmqApplication.class, args);
}
}
MQService
package com.gf.service;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class MQService {
@RabbitListener(queues = "fanout.queue")
public void receive(Message message) {
System.out.println("receive the message:" + new String(message.getBody()));
}
}
This is a basic example of integrating RabbitMQ with Spring Boot. You can customize the configuration and add more features as needed.