阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

RabbitMQ集群跨网段消息迁移

188次阅读
没有评论

共计 10100 个字符,预计需要花费 26 分钟才能阅读完成。

需求背景

将阿里云同一个 VPC 下的 RabbitMQ 集群的消息从一个网段集群迁移到另一个网段集群。消息中间件的消息是即时消费,为何还有历史消息,因为是历史遗留问题。故要迁移

整个网络拓扑图如下

注意:

若对于跨 VPC 网络

1. 确保各主机网络互通

2. 配置好各主机名

两边安全组出方向开发:15672、25672、5672、4369 端口

否在在加入集群会出现问题

RabbitMQ 集群跨网段消息迁移

资源清单

主机名

IP地址

角色

备注

node171

172.20.0.171

老的 MQ 集群_1

 

node172

172.20.0.172

老的 MQ 集群_2

 

node173

192.168.0.173

MQ集群_1

 

node174

192.168.0.174

新的 MQ 集群_2

 

基础软件及环境信息

操作系统:CentOS Linux release 7.3.1611

Erlang:Erlang/OTP 20 [erts-9.3.3.3]

RabbitMQ:rabbitmq_server-3.7.8

集群的部署

node171、node172 组成集群 A

node173、node174 组成集群 B

这里的环境部署略

创建测试账户

在【node171 上进行操作】

rabbitmqctl add_user root root123

rabbitmqctl add_vhost kcvhost

rabbitmqctl set_permissions -p kcvhost root  “.*” “.*” “.*”

rabbitmqctl add_user admin admin123

rabbitmqctl set_permissions -p kcvhost admin  “.*” “.*” “.*”

rabbitmqctl set_user_tags admin administrator

rabbitmq-plugins enable rabbitmq_management

rabbitmqctl stop_app

rabbitmqctl start_app

生成测试数据

消息生产者代码:

package com.zjkj.rabbitmq.demo;
 
import Java.io.IOException;
import java.util.concurrent.TimeoutException;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
 
/**
 * 消息的生产者
 * @author zjkj
 *
 */
public class Rabbitmq_Producer {

private static final String EXCHANGE_NAME = “exchange_test_3”;
private static final String ROUTING_KEY = “routingkey_demo”;
private static final String QUEUE_NAME = “queue_test_3”;
private static final String IP_ADDRESS = “172.20.0.171”;
private static final int PORT = 5672; //RabbitMQ 服务默认端口号为 5672

public static void main(String[] args) throws IOException,TimeoutException,InterruptedException{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername(“root”);
factory.setPassword(“root123”);
Connection connection = factory.newConnection(); // 创建连接
Channel channel = connection.createChannel(); // 创建信道
 // 创建一个 type=”direct”、持久化、非自动删除的交换器
channel.exchangeDeclare(EXCHANGE_NAME, “direct”,true, false, null);
// 创建一个持久化、非排他的、非自动删除的队列
channel.queueDeclare(QUEUE_NAME, true, false, false,null);
// 将交换器与队列通过路由键绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// 发送一条持久化的消息:hello world!
for(int i=1;i<=100000;i++){
String msg = “ 交换器_1 与队列 1 绑定:Message_”+i;
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
}
// 关闭资源
channel.close();
connection.close();
 
}
 
}

消费者代码

package com.zjkj.rabbitmq.demo;
 
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
 
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
 
/**
 * 消息的消费者
 * @author zjkj
 *
 */
public class Rabbitmq_Consumer {
 
private static final String QUEUE_NAME = “queue_test_3”;
private static final String IP_ADDRESS = “192.168.6.171”;
private static final  int PORT = 5672;

public static void main(String[] args) throws IOException,TimeoutException,InterruptedException{
Address[] addresses = new Address[]{
new Address(IP_ADDRESS,PORT)
};
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(“root”);
factory.setPassword(“root123”);
// 这里的连接方式与生产者的 demo 略有不同,注意区别
Connection connection = factory.newConnection(addresses); // 创建连接
final Channel channel = connection.createChannel(); // 创建信道
channel.basicQos(64);// 设置客户端最多接收未被 ack 的消息的个数

/**
 * 这里采用了继承 DefaultConsumer 的方式来实现消费,有过 RabbitMQ 使用经验的开发者
 * 可能喜欢使用 QueueingConsumer 的方式来实现消费
 * 因为使用 QueueingConsumer 会有一些隐患。
 * 同时在 RabbitMQ Java 客户端 4.0.0 版本开始将 QueueingConsumer 标记为 @Deprecated 了
 */
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)
  throws IOException{
System.out.println(“recv message : ” + new String(body));
try{
TimeUnit.SECONDS.sleep(1);

}catch(InterruptedException e){
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(QUEUE_NAME,true, consumer);
// 等待回调函数执行完毕之后,关闭资源
TimeUnit.SECONDS.sleep(5);
channel.close();
connection.close();
 
}
 
}

查看集群中诸如用户数、交换器数量、队列数量等

[root@node171 rabbitmq]# rabbitmqctl list_users

Listing users …

admin  [monitoring]

guest  [administrator]

root    []

[root@node171 rabbitmq]# rabbitmqctl list_exchanges

Listing exchanges for vhost / …

amq.topic      topic

amq.headers    headers

exchange_test_3 direct

amq.direct      direct

exchange_test_2 direct

amq.rabbitmq.trace      topic

amq.match      headers

        direct

exchange_test_1 direct

amq.fanout      fanout

[root@node171 rabbitmq]# rabbitmqctl list_queues

Timeout: 60.0 seconds …

Listing queues for vhost / …

queue_test_3    100000

queue_test_2    200

queue_test_1    10000

迁移方案

迁移步骤

1. 停止所有生产者和消费者的应用程序

2. 将集群 B 中的机器依次一台一台加入集群 A 中,并确认所有队列镜像完成

3. 剔除集群 A 中的一台一台机器

4. 将应用指向集群 B

方案 1【不可行】

将集群 B 中的一台机器加入集群 A 中,然后再集群 B 中的另一他机器已加入集群,然后剔除集群 A 中的一台机器,然后再剔除集群 A 中的另一台机器

此方案对于 RabbitMQ 的普通集群也即是 Cluster 模式是无效的

1. 停止 A 集群中的所有连接

2. 将集群 B 中的一台节点加入到 A 集群中

将集群 A 中的.erlang.cookie 的值拷贝到集群 B 中的 node173 上

[root@node171 rabbitmq]# cat .erlang.cookie

ORMTFBMHOXOGFKRLQSPU[root@node171 rabbitmq]#

[root@node173 plugins]# cp /var/lib/rabbitmq/.erlang.cookie  /var/lib/rabbitmq/erlang.cookie.bak

[root@node173 plugins]# chmod 700 /var/lib/rabbitmq/.erlang.cookie

 [root@node173 plugins]# vi /var/lib/rabbitmq/.erlang.cookie

ORMTFBMHOXOGFKRLQSPU
 

[root@node173 plugins]# chmod 400 /var/lib/rabbitmq/.erlang.cookie

[root@node173 plugins]# ls -lrth /var/lib/rabbitmq/.erlang.cookie

-r——– 1 rabbitmq rabbitmq 21 Oct 24 18:51 /var/lib/rabbitmq/.erlang.cookie

3. 将集群 B 中的 node173 加入到集群 A 中

[root@node173 rabbitmq]# service rabbitmq-server start

Redirecting to /bin/systemctl start  rabbitmq-server.service

[root@node173 rabbitmq]# rabbitmqctl stop_app

Stopping rabbit application on node mq_173@node173 …

[root@node173 rabbitmq]# rabbitmqctl reset

Resetting node mq_173@node173 …

[root@node173 rabbitmq]# rabbitmqctl join_cluster mq171@node171

Clustering node mq_173@node173 with mq171@node171

[root@node173 rabbitmq]# rabbitmqctl start_app

Starting node mq_173@node173 …

 completed with 3 plugins.

4. 同样的方法将集群 B 中的 node174 加入到集群 A 中

[root@node174 rabbitmq]# rabbitmqctl cluster_status

Cluster status of node mq_174@node174 …

[{nodes,[{disc,[mq_174@node174]}]},

 {running_nodes,[mq_174@node174]},

 {cluster_name,<<“mq_174@node174”>>},

 {partitions,[]},

 {alarms,[{mq_174@node174,[]}]}]

 [root@node174 rabbitmq]# rabbitmqctl stop_app

Stopping rabbit application on node mq_174@node174 …

[root@node174 rabbitmq]# rabbitmqctl reset

Resetting node mq_174@node174 …

[root@node174 rabbitmq]# rabbitmqctl join_cluster mq171@node171

Clustering node mq_174@node174 with mq171@node171

[root@node174 rabbitmq]# rabbitmqctl start_app

Starting node mq_174@node174 …

 completed with 0 plugins.

  5. 将集群 A 中的 node171 剔除集群

[root@node171 rabbitmq]# rabbitmqctl stop

Stopping and halting node mq171@node171 …

这时访问 node172 的 Web 集群管理

RabbitMQ 集群跨网段消息迁移

同样在 node173 上的 Web 管理界面查看

RabbitMQ 集群跨网段消息迁移

至此对于普通的集群模式,这种方案是不行的。

方案 2【可行】

若 RabbitMQ 采用镜像队列,将集群 A 中的消息数据迁移到集群 B 中,

集群 A 中的 node171、node172 采用镜像队列

构建集群 A 的镜像队列环境

1. 首先集群 A 中的 node172 加入集群中

【在 node172 上操作】

[root@node172 ~]# rabbitmqctl stop_app

Stopping rabbit application on node mq172@node172 …

[root@node172 ~]# rabbitmqctl reset

Resetting node mq172@node172 …

 [root@node172 ~]# rabbitmqctl join_cluster mq171@node171

Clustering node mq172@node172 with mq171@node171

ra[root@node172 ~]# rabbitmqctl start_app

Starting node mq172@node172 …

2. 设置镜像策略

【在 node171 上操作】

[root@node171 ~]# rabbitmqctl set_policy ha-all -p kcvhost “^” ‘{“ha-mode”:”all”,”ha-sync-mode”:”automatic”}’

Setting policy “ha-all” for pattern “^” to “{“ha-mode”:”all”,”ha-sync-mode”:”automatic”}” with priority “0” for vhost “kcvhost” …

 

[root@node171 ~]# rabbitmqctl set_policy rabbit_mirror “^” ‘{“ha-mode”:”all”}’

Setting policy “rabbit_mirror” for pattern “^” to “{“ha-mode”:”all”}” with priority “0” for vhost “/” …

开始集群 A 中的镜像队列迁移

1. 停止所有消息的生产者和消费者相关应用服务

2.停止集群 A 中的所有机器,并备份原始数据

【node171、node172】都要操作

Node172 执行如下:

[root@node172 ~]# service rabbitmq-server stop

Redirecting to /bin/systemctl stop  rabbitmq-server.service

[root@node172 ~]# cd /var/lib/rabbitmq/

[root@node172 rabbitmq]# ls

mnesia

[root@node172 rabbitmq]# cp -rf mnesia mnesia.20181025.bak

[root@node172 rabbitmq]# service rabbitmq-server start

Redirecting to /bin/systemctl start  rabbitmq-server.service

node171 执行如下:

[root@node171 ~]# service rabbitmq-server stop

Redirecting to /bin/systemctl stop  rabbitmq-server.service

[root@node171 ~]# cd /var/lib/rabbitmq/

[root@node171 rabbitmq]# cp -rf mnesia mnesia.20181025.bak

[root@node171 rabbitmq]# service rabbitmq-server start

Redirecting to /bin/systemctl start  rabbitmq-server.service

2. 首先将集群 B 的 node173 机器加入到集群 A 中

[root@node173 network-scripts]# service rabbitmq-server stop

Redirecting to /bin/systemctl stop rabbitmq-server.service

[root@mq04 rabbitmq]# cp -rf /var/lib/rabbitmq /var/lib/rabbitmq.bak

[root@mq04 rabbitmq]# cd /var/lib/rabbitmq

[root@mq04 rabbitmq]# rm -rf .erlang.cookie  mnesia/

[root@mq01 rabbitmq]# scp .erlang.cookie  root@mq04:/var/lib/rabbitmq

The authenticity of host ‘mq04 (192.168.0.232)’ can’t be established.

ECDSA key fingerprint is SHA256:zgAicKOpvRLLCyhdUbpNvyanKYrPt/Pp9g+Sdq9mAoo.

ECDSA key fingerprint is MD5:15:7d:1e:c2:86:d5:4a:40:63:df:f5:4e:65:c4:24:62.

Are you sure you want to continue connecting (yes/no)? yes

Warning: Permanently added ‘mq04’ (ECDSA) to the list of known hosts.

root@mq04’s password:

Permission denied, please try again.

root@mq04’s password:

.erlang.cookie                                                                  100%  20    19.6KB/s  00:00

[root@mq04 rabbitmq]# chmod 400 .erlang.cookie

[root@mq04 rabbitmq]# chown -R rabbitmq:rabbitmq .erlang.cookie

[root@mq04 rabbitmq]# service rabbitmq-server start

Redirecting to /bin/systemctl start rabbitmq-server.service

[root@mq04 rabbitmq]# rabbitmqctl stop_app

Stopping rabbit application on node mq04@mq04 …

[root@mq04 rabbitmq]# rabbitmqctl reset

Resetting node mq04@mq04 …

对于阿里云 ECS 一定要在安全组先临时开启 15672、25672、5672、4369 端口

[root@node173 network-scripts]# rabbitmqctl join_cluster mq171@node171

Clustering node mq_173@node173 with mq171@node171

[root@node173 network-scripts]# rabbitmqctl start_app

Starting node mq_173@node173 …

 completed with 3 plugins.

3. 然后再将 B 集群中的 node174 机器加入到集群 A 中

 使用上面同样的方法,将 node174 加入到集群中去

4. 剔除集群 A 中的 node171、node172 机器

Node172 上执行

[root@mq02 rabbitmq]# rabbitmqctl stop_app

Stopping rabbit application on node mq02@mq02 …

[root@mq02 rabbitmq]# service rabbitmq-server stop

Redirecting to /bin/systemctl stop rabbitmq-server.service

[root@mq02 rabbitmq]# cp -rf /var/lib/rabbitmq /var/lib/rabbitmq.bak

[root@mq02 rabbitmq]# service rabbitmq-server start

Redirecting to /bin/systemctl start rabbitmq-server.service

[root@mq02 rabbitmq]# rabbitmqctl stop_app

Stopping rabbit application on node mq02@mq02 …

[root@mq02 rabbitmq]# rabbitmqctl reset

Resetting node mq02@mq02 …

同样的 node171 上执行同样的命令

对于采用镜像队列集群,此方案可行

正文完
星哥说事-微信公众号
post-qrcode
 
星锅
版权声明:本站原创文章,由 星锅 2022-01-21发表,共计10100字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中