Ch03 AMQP
Yang Haoran 11/30/2022 RabbitMQMQ

# Spring AMQP
# Simple queue
application.yml
spring:
rabbitmq:
host: 124.220.33.202
port: 30608
virtual-host: vhost
username: admin
password: 123456
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# Provider
TestClass
package com.example.consumer;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
class ConsumerApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
// @Test
// void contextLoads() {
// }
@Test
public void textMq(){
String queueName = "simple.queue";
String message = "hello from yhr";
rabbitTemplate.convertAndSend(queueName, message);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
添加test方法快捷键 alt+insert
# Consumer
package com.example.provider.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.stereotype.Component;
/**
* @author YHR
* @date 30/11/2022 29:13:46
* @description
*/
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg){
System.out.println("get the message: " + msg);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

# Work queue
多个消费者同时处理消息

如果不配置消息预取消息会平分,不管处理能力如何


# Publish/Subscribe
# Fanout



package com.example.consumer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author YHR
* @date 30/11/2022 08:14:19
* @description
*/
@Configuration
public class FanoutConfig {
//fanout
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("yhr.fanout");
}
//queue1
@Bean
public Queue queue1(){
return new Queue("yhr.queue1");
}
//queue2
@Bean
public Queue queue2(){
return new Queue("yhr.queue2");
}
//bind fanout with queue1 and queue2
@Bean
public Binding binding1(Queue queue1, FanoutExchange fanoutExchange){
return BindingBuilder
.bind(queue1)
.to(fanoutExchange);
}
@Bean
public Binding binding2(Queue queue2, FanoutExchange fanoutExchange){
return BindingBuilder
.bind(queue2)
.to(fanoutExchange);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
Listener:
@RabbitListener(queues = "yhr.queue1")
public void listenFanoutQueue1(String msg){
System.out.println("get the message from yhr.queue1 : " + msg);
}
@RabbitListener(queues = "yhr.queue2")
public void listenFanoutQueue2(String msg){
System.out.println("get the message from yhr.queue2 : " + msg);
}
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
publisher:
@Test
void testFanoutPublish() {
String exchangeName = "yhr.fanout";
String msg = "Hello, everyone! I'm bubu";
rabbitTemplate.convertAndSend(exchangeName, "", msg);
}
1
2
3
4
5
6
7
2
3
4
5
6
7

# DirectExchange

# 接受
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "yhr.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}))
public void directQueue1(String msg){
System.out.println("DirectQueue1: " + msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "yhr.direct", type = ExchangeTypes.DIRECT),
key = {"red","yellow"}))
public void directQueue2(String msg){
System.out.println("DirectQueue2: " + msg);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 发送
@Test
void testDirectPublish() {
String exchangeName = "yhr.direct";
String msg = "Hello, everyone! I'm red";
rabbitTemplate.convertAndSend(exchangeName, "red", msg);
}
1
2
3
4
5
6
7
2
3
4
5
6
7

# TopicExchange(在listener中声明queue)


- 代码提示ctrl+p
# 发送
@Test
void testTopicPublish() {
String exchangeName = "yhr.topicExchange";
String msg = "Hello, everyone! I'm man.yang";
rabbitTemplate.convertAndSend(exchangeName, "man.hao", msg);
}
1
2
3
4
5
6
2
3
4
5
6
# 接受
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topicExchange.queue1"),
exchange = @Exchange(name = "yhr.topicExchange", type = ExchangeTypes.TOPIC),
key = "man.#"))
public void topicExchangeQueue1(String msg){
System.out.println("TopicExchangeQueue1: " + msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topicExchange.queue2"),
exchange = @Exchange(name = "yhr.topicExchange", type = ExchangeTypes.TOPIC),
key = "man.hao"))
public void topicExchangeQueue2(String msg){
System.out.println("TopicExchangeQueue2: " + msg);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Send Object to queue


<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.14.1</version>
</dependency>
1
2
3
4
5
2
3
4
5
# 发送
@SpringBootApplication
public class PublisherApplication {
public static void main(String[] args) {
SpringApplication.run(PublisherApplication.class, args);
}
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
@Test
void testObjectPublish(){
Map<String, Object> msg = new HashMap<>();
msg.put("name", "yang");
msg.put("age", 23);
rabbitTemplate.convertAndSend("ObjectQueue", msg);
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 接受
@RabbitListener(queues = "ObjectQueue")
public void objectListener(Map<String, Object> msg){
System.out.println("ObjectMessage: " + msg);
}
1
2
3
4
5
2
3
4
5

https://github.com/0YHR0/SpringAMQP-rabbitmq-template