RabbitMQ Spring tutorial

На сайті rabbitmq.com вже є докладні приклади і клієнт для java. Однак якщо в проекті використовується спрінг, то набагато зручніше використовувати бібліотеку Spring AMQP. Ця стаття містить реалізацію всіх шести офіційних прикладів роботи з RabbitMQ.

Відразу посилання на проекти на GitHub.

Для прикладів я буду використовувати найпростіше додаток на спринге. Після того, як користувач перейде по опредленной засланні, в RabbitMQ буде надсилатися повідомлення яке буде відправлятися в один з листенеров. Листенер в свою чергу буде просто виводити повідомлення в лог. На хабре вже були переклади офіційних туториалов на php і python, і я думаю багато хто вже знайомі з принципами роботи rabbitmq, тому я сконцентруюся на роботі саме з Spring AMQP.

Підготовка
Установка RabbitMQ
Установка RabbitMQ детально описана на офіційному сайті. Тут проблем виникнути не повинно.

Налаштування Spring
Для простоти я використовував Spring Boot. Він відмінно підходить, щоб швидко розгорнути програми на спринге і не займатися його довгим кофигурированием. При цьому сам Spring AMQP я буду конфігурувати «класичним способом» — тобто так, як я конфигурировал в реальному проекті без Spring Boot (хіба що в ConnectionFactory не описані деякі специфічні для heroku речі).

Вміст мінімального pom.xml необхідного нам для запуску. Тут вже є Spring boot Spring AMQP.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>rabbitmq</groupId>
<artifactId>example-1</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.2.4.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
</project>

Основний файл конфігурації. Крім імені класу, його вміст буде однаковим для всіх наших прикладів.

package com.rabbitmq.example1;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Import;

@EnableAutoConfiguration
@ComponentScan
@Import(RabbitConfiguration.class)
public class Example1Configuration {
public static void main(String[] args) throws Exception {
SpringApplication.run(Example1Configuration.class, args);
}
}


Приклад 1. «Hello World!»


Для роботи з RabbitMQ нам потрібні такі бины:
— сonnectionFactory — для з'єднання з RabbitMQ;
— rabbitAdmin — для реєстрації/скасування реєстрації черг і т. п.;
— rabbitTemplate — для відправки повідомлень (producer);
— myQueue1 — власне чергу куди надсилаємо повідомлення;
— messageListenerContainer — приймає повідомлення (consumer).

Код конфігурації для цих бинов
package com.rabbitmq.example1;

import org.apache.log4j.Logger;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;

public class RabbitConfiguration {
Logger logger = Logger.getLogger(RabbitConfiguration.class);

//налаштовуємо з'єднання з RabbitMQ
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
return connectionFactory;
}

@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}

@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}

//оголошуємо чергу з ім'ям queue1
@Bean
public Queue myQueue1() {
return new Queue("queue1");
}

//оголошуємо контейнер, який буде містити листенер для повідомлень
@Bean
public SimpleMessageListenerContainer messageListenerContainer1() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueNames("queue1");
container.setMessageListener(new MessageListener() {
//тут ловимо повідомлення з queue1
public void onMessage(Message message) {
logger.info("received from queue1 : " + new String(message.getBody()));
}
});
return container;
}
}


В цьому і наступних прикладах в якості продюсера буде контролер, який буде посилати повідомлення в rabbitmq.

package com.rabbitmq.example1;

import org.apache.log4j.Logger;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
public class SampleController {
Logger logger = Logger.getLogger(SampleController.class);

@Autowired
AmqpTemplate template;

@RequestMapping("/emit")
@ResponseBody
String queue1() {
logger.info("Emit to queue1");
template.convertAndSend("queue1","Message to queue");
return "Emit to queue";
}
}

Тепер, якщо запустити Example1Configuration і перейти у браузері за адресою http://localhost:8080/emit, то в консолі ми побачимо щось типу:

2015-06-23 21:16:26.250 INFO 6460 — [nio-8080-exec-2] com.rabbitmq.example1.SampleController: Emit to queue1
2015-06-23 21:16:26.252 INFO 6460 — [cTaskExecutor-1] c.rabbitmq.example1.RabbitConfiguration: received from queue 1: Message to queue

Розглянемо докладніше отриманий результат. Тут ми SampleController.java відправляємо повідомлення:

template.convertAndSend("queue1","Message to queue");

А тут ми його отримуємо:

public void onMessage(Message message) {
logger.info("received from queue 1: " + new String(message.getBody()));
}

Нічого складного, але описувати всі листенеры в конфігурації не зовсім зручно, особливо коли їх стає багато. Набагато зручніше конфигуририровать листенеры анотаціями.

Приклад 1.1. «Hello World!» на анотаціях
Замість листенера в конфігурації додамо в проект клас RabbitMqListener, в який будуть приходити повідомлення. Відповідно messageListenerContainer1 вже не потрібен.

RabbitMqListener — це звичайний компонент(@Component) спринга з методом, позначеним анотацией @RabbitListener. В цьому метод будуть приходити повідомлення. При цьому ми можемо отримувати як повне повідомлення Message заголовками і тілом як масив байт, так і просто сконвертоване тіло у тому вигляді, в якому ми його відправляли.

@RabbitListener(queues = "queue1")
public void processQueue1(String message) {
logger.info("Received from queue 1: " + message);
}

Вихідний код RabbitMqListener.java і оновленого RabbitConfiguration.java
package com.rabbitmq.example1annotated;

import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@EnableRabbit //потрібно для активації обробки анотацій @RabbitListener
@Component
public class RabbitMqListener {
Logger logger = Logger.getLogger(RabbitMqListener.class);

@RabbitListener(queues = "queue1")
public void processQueue1(String message) {
logger.info("Received from queue 1: " + message);
}
}


package com.rabbitmq.example1annotated;

import org.apache.log4j.Logger;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfiguration {
Logger logger = Logger.getLogger(RabbitConfiguration.class);
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
return connectionFactory;
}

@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}

@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}

@Bean
public Queue myQueue1() {
return new Queue("queue1");
}

}


Приклад 2. Work Queues


В даному прикладі одну чергу слухають вже два листенера. Для емуляції корисної роботи використовуємо Thread.sleep. Важливо, що листенеры однієї черги можуть бути і на різних інстансах програми. Так можна розпаралелити чергу на кілька комп'ютерів або нод в хмарі.

@RabbitListener(queues = "query-example-2")
public void worker1(String message) throws InterruptedException {
logger.info("worker 1 : " + message);
Thread.sleep(100 * random.nextInt(20));
}

@RabbitListener(queues = "query-example-2")
public void worker2(String message) throws InterruptedException {
logger.info("worker 2 : " + message);
Thread.sleep(100 * random.nextInt(20));
}

Результат:

2015-06-23 22:03:48.018 INFO 6784 — [nio-8080-exec-1] com.rabbitmq.example2.SampleController: Emit to queue
2015-06-23 22:03:48.029 INFO 6784 — [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener: worker 2: Message 1
2015-06-23 22:03:48.029 INFO 6784 — [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener: worker 1: Message 0
2015-06-23 22:03:48.830 INFO 6784 — [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener: worker 2: 2 Message
2015-06-23 22:03:49.331 INFO 6784 — [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener: worker 2: Message 3
2015-06-23 22:03:49.432 INFO 6784 — [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener: worker 2: Message 4
2015-06-23 22:03:49.634 INFO 6784 — [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener: worker 1: Message 5
2015-06-23 22:03:49.733 INFO 6784 — [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener: worker 2: Message 6
2015-06-23 22:03:49.735 INFO 6784 — [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener: worker 1: Message 7
2015-06-23 22:03:50.236 INFO 6784 — [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener: worker 1: Message 8
2015-06-23 22:03:50.537 INFO 6784 — [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener: worker 1: Message 9

Вихідний код RabbitMqListener.java і оновленого SampleController.java
package com.rabbitmq.example2;

import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Random;

@Component
public class RabbitMqListener {
Logger logger = Logger.getLogger(RabbitMqListener.class);
Random random = new Random();

@RabbitListener(queues = "query-example-2")
public void worker1(String message) throws InterruptedException {
logger.info("worker 1 : " + message);
Thread.sleep(100 * random.nextInt(20));
}

@RabbitListener(queues = "query-example-2")
public void worker2(String message) throws InterruptedException {
logger.info("worker 2 : " + message);
Thread.sleep(100 * random.nextInt(20));
}

} 

package com.rabbitmq.example2;

import org.apache.log4j.Logger;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;


@Controller
public class SampleController {
Logger logger = Logger.getLogger(SampleController.class);

@Autowired
AmqpTemplate template;

@RequestMapping("/queue")
@ResponseBody
String queue1() {
logger.info("Emit to queue");
for(int i = 0;i < 10;i++)
template.convertAndSend("query-example-2","Message " + i);
return "Emit to queue";
}
}


Приклад 3. Publish/Subscribe


Тут одне і те ж повідомлення приходить відразу двом консьюмерам.

2015-06-23 22:12:24.669 INFO 1664 — [nio-8080-exec-1] com.rabbitmq.приклад 3.SampleController: Emit to exchange-example-3
2015-06-23 22:12:24.684 INFO 1664 — [cTaskExecutor-1] com.rabbitmq.приклад 3.RabbitMqListener: accepted on worker 1: Fanout message
2015-06-23 22:12:24.684 INFO 1664 — [cTaskExecutor-1] com.rabbitmq.приклад 3.RabbitMqListener: accepted on worker 2: Fanout message

Для цього, підключимо обидві черги до FanoutExchange:

@Bean
public FanoutExchange fanoutExchangeA(){
return new FanoutExchange("exchange-example-3");
}

@Bean
public Binding binding1(){
return BindingBuilder.bind(myQueue1()).to(fanoutExchangeA());
}

@Bean
public Binding binding2(){
return BindingBuilder.bind(myQueue2()).to(fanoutExchangeA());
}

І будемо відправляти не в чергу, а в exchange exchange-example-3:

template.setExchange("exchange-example-3");
template.convertAndSend("Fanout message");

Кожен раз вказувати exchange необов'язково. Його можна вказати і один раз при створенні RabbitTemplate.

Повні вихідні коди
package com.rabbitmq.приклад 3;

import org.apache.log4j.Logger;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@EnableRabbit
@Configuration
public class RabbitConfiguration {
Logger logger = Logger.getLogger(RabbitConfiguration.class);

@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
return connectionFactory;
}

@Bean
public AmqpAdmin amqpAdmin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
return rabbitAdmin;
}

@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
return rabbitTemplate;
}


@Bean
public Queue myQueue1() {
return new Queue("query-example-3-1");
}

@Bean
public Queue myQueue2() {
return new Queue("query-example-3-2");
}

@Bean
public FanoutExchange fanoutExchangeA(){
return new FanoutExchange("exchange-example-3");
}

@Bean
public Binding binding1(){
return BindingBuilder.bind(myQueue1()).to(fanoutExchangeA());
}

@Bean
public Binding binding2(){
return BindingBuilder.bind(myQueue2()).to(fanoutExchangeA());
}

}

package com.rabbitmq.приклад 3;

import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Random;

@Component
public class RabbitMqListener {
Logger logger = Logger.getLogger(RabbitMqListener.class);
Random random = new Random();

@RabbitListener(queues = "query-example-3-1")
public void worker1(String message) {
logger.info("accepted on worker 1 : " + message);
}

@RabbitListener(queues = "query-example-3-2")
public void worker2(String message) {
logger.info("accepted on worker 2 : " + message);
}

}

package com.rabbitmq.приклад 3;

import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;


@Controller
public class SampleController {
Logger logger = Logger.getLogger(SampleController.class);

@Autowired
RabbitTemplate template;

@RequestMapping("/")
@ResponseBody
String home() {
return "Empty mapping";
}

@RequestMapping("/emit")
@ResponseBody
String emit() {
logger.info("Emit to exchange-example-3");
template.setExchange("exchange-example-3");
template.convertAndSend("Fanout message");
return "Emit to exchange-example-3";
}
}



Приклад 4. Routing


Тут використовується routing key, в залежності від якого повідомлення може потрапити в одну з черг або відразу в обидві. Для цього замість FanoutExchange використовуємо DirectExchange:

@Bean
public DirectExchange directExchange(){
return new DirectExchange("exchange-example-4");
}

@Bean
public Binding errorBinding1(){
return BindingBuilder.bind(myQueue1()).to(directExchange()).with("error");
}

@Bean
public Binding errorBinding2(){
return BindingBuilder.bind(myQueue2()).to(directExchange()).with("error");
}

@Bean
public Binding infoBinding(){
return BindingBuilder.bind(myQueue2()).to(directExchange()).with("info");
}

@Bean
public Binding warningBinding(){
return BindingBuilder.bind(myQueue2()).to(directExchange()).with("warning");
}


І при відправленні використовуємо вказуємо Routing key, наприклад, так:

template.convertAndSend("info", "Info");

В результаті отримуємо:

2015-06-23 22:29:24.480 INFO 5820 — [nio-8080-exec-2] com.rabbitmq.приклад 4.SampleController: Emit as info
2015-06-23 22:29:24.483 INFO 5820 — [cTaskExecutor-1] com.rabbitmq.приклад 4.RabbitMqListener: accepted on worker 2: Info
2015-06-23 22:29:29.721 INFO 5820 — [nio-8080-exec-4] com.rabbitmq.приклад 4.SampleController: Emit as error
2015-06-23 22:29:29.727 INFO 5820 — [cTaskExecutor-1] com.rabbitmq.приклад 4.RabbitMqListener: accepted on worker 2: Error
2015-06-23 22:29:29.731 INFO 5820 — [cTaskExecutor-1] com.rabbitmq.приклад 4.RabbitMqListener: accepted on worker 1: Error
2015-06-23 22:29:36.779 INFO 5820 — [nio-8080-exec-5] com.rabbitmq.приклад 4.SampleController: Emit as warning
2015-06-23 22:29:36.781 INFO 5820 — [cTaskExecutor-1] com.rabbitmq.приклад 4.RabbitMqListener: accepted on worker 2: Warning

Повні вихідні коди
package com.rabbitmq.приклад 4;

import org.apache.log4j.Logger;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@EnableRabbit
@Configuration
public class RabbitConfiguration {
Logger logger = Logger.getLogger(RabbitConfiguration.class);

@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
return connectionFactory;
}

@Bean
public AmqpAdmin amqpAdmin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
return rabbitAdmin;
}

@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setExchange("exchange-example-4");
return rabbitTemplate;
}

@Bean
public Queue myQueue1() {
return new Queue("query-example-4-1");
}

@Bean
public Queue myQueue2() {
return new Queue("query-example-4-2");
}

@Bean
public DirectExchange directExchange(){
return new DirectExchange("exchange-example-4");
}

@Bean
public Binding errorBinding1(){
return BindingBuilder.bind(myQueue1()).to(directExchange()).with("error");
}

@Bean
public Binding errorBinding2(){
return BindingBuilder.bind(myQueue2()).to(directExchange()).with("error");
}

@Bean
public Binding infoBinding(){
return BindingBuilder.bind(myQueue2()).to(directExchange()).with("info");
}

@Bean
public Binding warningBinding(){
return BindingBuilder.bind(myQueue2()).to(directExchange()).with("warning");
}

}

package com.rabbitmq.приклад 4;

import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Random;

@Component
public class RabbitMqListener {
Logger logger = Logger.getLogger(RabbitMqListener.class);
Random random = new Random();

@RabbitListener(queues = "query-example-4-1")
public void worker1(String message) {
logger.info("accepted on worker 1 : " + message);
}

@RabbitListener(queues = "query-example-4-2")
public void worker2(String message) {
logger.info("accepted on worker 2 : " + message);
}

}

package com.rabbitmq.приклад 4;

import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
public class SampleController {
Logger logger = Logger.getLogger(SampleController.class);

@Autowired
RabbitTemplate template;

@RequestMapping("/")
@ResponseBody
String home() {
return "Empty mapping";
}

@RequestMapping("/emit/error")
@ResponseBody
String error() {
logger.info("Emit as error");
template.convertAndSend("error", "Error");
return "Emit as error";
}

@RequestMapping("/emit/info")
@ResponseBody
String info() {
logger.info("Emit as info");
template.convertAndSend("info", "Info");
return "Emit as info";
}

@RequestMapping("/emit/warning")
@ResponseBody
String warning() {
logger.info("Emit as warning");
template.convertAndSend("warning", "Warning");
return "Emit as warning";
}
}


Приклад 5. Topics


Тут замість DirectExchange використовуємо TopicExchange
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("exchange-example-5");
}

@Bean
public Binding binding1(){
return BindingBuilder.bind(myQueue1()).to(topicExchange()).with(".*.orange.*");
}

@Bean
public Binding binding2(){
return BindingBuilder.bind(myQueue2()).to(topicExchange()).with(".*.*.rabbit");
}

@Bean
public Binding binding3(){
return BindingBuilder.bind(myQueue2()).to(topicExchange()).with("lazy.#");
}

В результаті отримуємо:

2015-06-23 22:42:28.414 INFO 6560 — [nio-8080-exec-1] com.rabbitmq.example5.SampleController: Emit 'to 1 and 2' to 'quick.orange.rabbit'
2015-06-23 22:42:28.428 INFO 6560 — [cTaskExecutor-1] com.rabbitmq.example5.RabbitMqListener: accepted on worker 2: to 1 and 2
2015-06-23 22:42:28.428 INFO 6560 — [cTaskExecutor-1] com.rabbitmq.example5.RabbitMqListener: accepted on worker 1: to 1 and 2
2015-06-23 22:42:55.802 INFO 6560 — [nio-8080-exec-2] com.rabbitmq.example5.SampleController: Emit 'to 2' to 'lazy.black.cat'
2015-06-23 22:42:55.805 INFO 6560 — [cTaskExecutor-1] com.rabbitmq.example5.RabbitMqListener: accepted on worker 2: to 2

Повні вихідні коди
package com.rabbitmq.example5;

import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;


@Controller
public class SampleController {
Logger logger = Logger.getLogger(SampleController.class);

@Autowired
RabbitTemplate template;

@RequestMapping("/")
@ResponseBody
String home() {
return "Empty mapping";
}

@RequestMapping("/emit/{key}/{message}")
@ResponseBody
String error(@PathVariable("key") String key, @PathVariable("message") String message) {
logger.info(String.format("Emit '%s' to '%s'",message,key));
template.convertAndSend(key, message);
return String.format("Emit '%s' to '%s'",message,key);
}
}

package com.rabbitmq.example5;

import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Random;

@Component
public class RabbitMqListener {
Logger logger = Logger.getLogger(RabbitMqListener.class);
Random random = new Random();

@RabbitListener(queues = "query-example-5-1")
public void worker1(String message) {
logger.info("accepted on worker 1 : " + message);
}

@RabbitListener(queues = "query-example-5-2")
public void worker2(String message) {
logger.info("accepted on worker 2 : " + message);
}

}

package com.rabbitmq.example5;

import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;


@Controller
public class SampleController {
Logger logger = Logger.getLogger(SampleController.class);

@Autowired
RabbitTemplate template;

@RequestMapping("/")
@ResponseBody
String home() {
return "Empty mapping";
}

@RequestMapping("/emit/{key}/{message}")
@ResponseBody
String error(@PathVariable("key") String key, @PathVariable("message") String message) {
logger.info(String.format("Emit '%s' to '%s'",message,key));
template.convertAndSend(key, message);
return String.format("Emit '%s' to '%s'",message,key);
}
}



Приклад 6. Remote procedure call (RPC)


Spring AMQP дозволяє використовувати convertSendAndReceive, щоб отримати відповідь на відправлене повідомлення. При цьому, при дефолтні налаштування, у разі якщо у нас RabbitMQ версії до 3.4.0, то для відповідного повідомлення буде створена тимчасова чергу. Цей спосіб не дуже продуктивний, тому його використовувати не рукомендуется і слід створити самому також чергу для зворотних повідомлень і вказати її як ReplyQueue у RabbitTemplate. Якщо ж у нас RabbitMQ версії 3.4.0 і вище, то буде використаний механізм Direct reply-to, який набагато швидше. Детальніше документації по Spring AMQP.

Таким чином здійснити віддалений виклик процедури можна лише одним рядком:

String response = (String) template.convertSendAndReceive("query-example-6",message);

А так процедура обробляється на консьюмере:

@RabbitListener(queues = "query-example-6")
public String worker1(String message) throws InterruptedException {
logger.info("received on worker : " + message);
Thread.sleep(3000); //емуляціях корисну роботу
return "received on worker : " + message;
}

В результаті отримуємо:

2015-06-23 23:12:36.677 INFO 6536 — [nio-8080-exec-5] com.rabbitmq.example6.SampleController: Emit 'Hello world'
2015-06-23 23:12:36.679 INFO 6536 — [cTaskExecutor-1] com.rabbitmq.example6.RabbitMqListener: Received on worker: Hello world
2015-06-23 23:12:39.681 INFO 6536 — [nio-8080-exec-5] com.rabbitmq.example6.SampleController: Received on producer 'Received on worker: Hello world'

Повні вихідні коди
package com.rabbitmq.example6;

import org.apache.log4j.Logger;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@EnableRabbit
@Configuration
public class RabbitConfiguration {
Logger logger = Logger.getLogger(RabbitConfiguration.class);

@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
return connectionFactory;
}

@Bean
public AmqpAdmin amqpAdmin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
return rabbitAdmin;
}

@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setQueue("query-example-6");
rabbitTemplate.setReplyTimeout(60 * 1000);
//no reply to - we use direct-reply-to
return rabbitTemplate;
}

@Bean
public Queue myQueue() {
return new Queue("query-example-6");
}
}

package com.rabbitmq.example6;

import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Random;

@Component
public class RabbitMqListener {
Logger logger = Logger.getLogger(RabbitMqListener.class);

@RabbitListener(queues = "query-example-6")
public String worker1(String message) throws InterruptedException {
logger.info("Received on worker : " + message);
Thread.sleep(3000);
return "Received on worker : " + message;
}
}

package com.rabbitmq.example6;

import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;


@Controller
public class SampleController {
Logger logger = Logger.getLogger(SampleController.class);

@Autowired
RabbitTemplate template;

@RequestMapping("/")
@ResponseBody
String home() {
return "Empty mapping";
}

@RequestMapping("/process/{message}")
@ResponseBody
String error(@PathVariable("message") String message) {
logger.info(String.format("Emit '%s'",message));
String response = (String) template.convertSendAndReceive("query-example-6",message);
logger.info(String.format("Received on producer '%s'",response));
return String.valueOf("returned from worker : " + response);
}
}


Висновок
У себе я використовував RabbitMQ в проекті в хмарному хостингу heroku. Використовувати RabbitMQ в heroku досить просто — достатньо підключити одного з провайдерів RabbitMQ в консолі адміністрування і тоді в змінних оточення з'явиться адреса для доступу до кролику. Ця адреса потрібно використовувати при створенні connectionFactory.

@Bean
public ConnectionFactory connectionFactory()
{
//отримуємо адресу AMQP у провайдера
String uri = System.getenv("CLOUDAMQP_URL");
if (uri == null) //значить ми запущені локально і потрібно підключатися до локальної rabbitmq
uri = "amqp://guest:guest@localhost";
URI, url = null;
try
{
url = new URI(uri);
} catch (URISyntaxException e)
{
e.printStackTrace(); //тут помилка вкрай малоймовірна
}

CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(url.getHost());
connectionFactory.setUsername(url.getUserInfo().split(":")[0]);
connectionFactory.setPassword(url.getUserInfo().split(":")[1]);
if (StringUtils.isNotBlank(url.getPath()))
connectionFactory.setVirtualHost(url.getPath().replace("/", ""));
connectionFactory.setConnectionTimeout(3000);
connectionFactory.setRequestedHeartBeat(30);
return connectionFactory;
}

В іншому код мало відрізняється від наведеного в прикладі 4(Routing).

Використані джерела
Сторінка проекту Spring AMQP
Сторінка проекту Spring Boot
Сторінка з прикладами RabbitMQ

Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.