阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

MQTT服务器的搭建与测试pub/sub通信过程

129次阅读
没有评论

共计 5855 个字符,预计需要花费 15 分钟才能阅读完成。

MQTT 是一个即时通讯协议,采用轻量级发布和订阅消息传输机制。专门设计用于低带宽或者高昂的网络费用的通信过程中。以及提供三种不同质量的消息服务:

  • 1.”至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
  • 2.”至少一次”,确保消息到达,但消息重复可能会发生。
  • 3.”只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。

对于实现了 MQTT 协议的消息代理软件有众多。Mosquitto,npm 社区的 mosca,Apache 社区的 ActivityMQ 等等

分别尝试了这三种的搭建过程,最 Mosquitto 容易上手。

安装 Mosquitto

# 下载源代码包
wget http://mosquitto.org/files/source/mosquitto-1.4.5.tar.gz
# 解压
tar zxfv mosquitto-1.4.5.tar.gz
# 进入目录
cd mosquitto-1.4.5
# 编译
make
# 安装
sudo make install

运行 Mosquitto

Mosquitto 的配置文件存放在 /etc/mosquitto/mosquitto.conf

配置文件具体的配置内容为:

# =================================================================
# General configuration
# =================================================================

# 客户端心跳的间隔时间
#retry_interval 20

# 系统状态的刷新时间
#sys_interval 10

# 系统资源的回收时间,0 表示尽快处理
#store_clean_interval 10

# 服务进程的 PID
#pid_file /var/run/mosquitto.pid

# 服务进程的系统用户
#user mosquitto

# 客户端心跳消息的最大并发数
#max_inflight_messages 10

# 客户端心跳消息缓存队列
#max_queued_messages 100

# 用于设置客户端长连接的过期时间,默认永不过期
#persistent_client_expiration

# =================================================================
# Default listener
# =================================================================

# 服务绑定的 IP 地址
#bind_address

# 服务绑定的端口号
#port 1883

# 允许的最大连接数,- 1 表示没有限制
#max_connections -1

# cafile:CA 证书文件
# capath:CA 证书目录
# certfile:PEM 证书文件
# keyfile:PEM 密钥文件
#cafile
#capath
#certfile
#keyfile

# 必须提供证书以保证数据安全性
#require_certificate false

# 若 require_certificate 值为 true,use_identity_as_username 也必须为 true
#use_identity_as_username false

# 启用 PSK(Pre-shared-key)支持
#psk_hint

# SSL/TSL 加密算法,可以使用“openssl ciphers”命令获取
# as the output of that command.
#ciphers

# =================================================================
# Persistence
# =================================================================

# 消息自动保存的间隔时间
#autosave_interval 1800

# 消息自动保存功能的开关
#autosave_on_changes false

# 持久化功能的开关
persistence true

# 持久化 DB 文件
#persistence_file mosquitto.db

# 持久化 DB 文件目录
#persistence_location /var/lib/mosquitto/

# =================================================================
# Logging
# =================================================================

# 4 种日志模式:stdout、stderr、syslog、topic
# none 则表示不记日志,此配置可以提升些许性能
log_dest none

# 选择日志的级别(可设置多项)
#log_type error
#log_type warning
#log_type notice
#log_type information

# 是否记录客户端连接信息
#connection_messages true

# 是否记录日志时间
#log_timestamp true

# =================================================================
# Security
# =================================================================

# 客户端 ID 的前缀限制,可用于保证安全性
#clientid_prefixes

# 允许匿名用户
#allow_anonymous true

# 用户 / 密码文件,默认格式:username:password
#password_file

# PSK 格式密码文件,默认格式:identity:key
#psk_file

# pattern write sensor/%u/data
# ACL 权限配置,常用语法如下:
# 用户限制:user <username>
# 话题限制:topic [read|write] <topic>
# 正则限制:pattern write sensor/%u/data
#acl_file

# =================================================================
# Bridges
# =================================================================

# 允许服务之间使用“桥接”模式(可用于分布式部署)
#connection <name>
#address <host>[:<port>]
#topic <topic> [[[out | in | both] qos-level] local-prefix remote-prefix]

# 设置桥接的客户端 ID
#clientid

# 桥接断开时,是否清除远程服务器中的消息
#cleansession false

# 是否发布桥接的状态信息
#notifications true

# 设置桥接模式下,消息将会发布到的话题地址
# $SYS/broker/connection/<clientid>/state
#notification_topic

# 设置桥接的 keepalive 数值
#keepalive_interval 60

# 桥接模式,目前有三种:automatic、lazy、once
#start_type automatic

# 桥接模式 automatic 的超时时间
#restart_timeout 30

# 桥接模式 lazy 的超时时间
#idle_timeout 60

# 桥接客户端的用户名
#username

# 桥接客户端的密码
#password

# bridge_cafile:桥接客户端的 CA 证书文件
# bridge_capath:桥接客户端的 CA 证书目录
# bridge_certfile:桥接客户端的 PEM 证书文件
# bridge_keyfile:桥接客户端的 PEM 密钥文件
#bridge_cafile
#bridge_capath
#bridge_certfile
#bridge_keyfile

# 自己的配置可以放到以下目录中
include_dir /etc/mosquitto/conf.d

启动 Mosquitto 服务:

mosquitto -c /etc/mosquitto/mosquitto.conf -d (MQTT 协议使用 1883 端口,查看该端口验��是否启动成功)

java 使用 MQTT 的订阅发布

使用 https://github.com/fusesource/mqtt-client 来实现 java 客户端的调用。

Maven 依赖为:

        <dependency>
            <groupId>org.fusesource.mqtt-client</groupId>
            <artifactId>mqtt-client</artifactId>
            <version>1.12</version>
        </dependency>

订阅 (Sub) 端 HelloWorld:

订阅一个名为 foo 的主题,消息级别为 AT_LEAST_ONCE

             MQTT mqtt = new MQTT();
                try {mqtt.setHost("tcp://192.168.2.112:1883");
                    System.out.println("start");
                    BlockingConnection connection = mqtt.blockingConnection();
                    connection.connect();
                    System.out.println(connection == null);
                    connection.subscribe(new Topic[] {new Topic("foo", QoS.AT_LEAST_ONCE) });
                    while (true) {Message message = connection.receive();
                        System.out.println("MQTTFutureClient.Receive Message" + "Topic Title :" + message.getTopic()
                                + "context :" + String.valueOf(message.getPayloadBuffer()));
                    }
                } catch (Exception e) { }
                System.out.println("end");

发布 (pub) 端:

给一个名为 foo 的主题推送消息,消息级别为 AT_LEAST_ONCE

                MQTT mqtt = new MQTT();
                try {mqtt.setHost("tcp://192.168.2.112:1883");
                    System.out.println("start");
                    BlockingConnection connection = mqtt.blockingConnection();
                    connection.connect();
                    System.out.println(connection == null);
                    for (int i = 0; i <100000; i++) {
                        connection.publish("foo", "HelloWQEQWEQ".getBytes(), QoS.AT_LEAST_ONCE, false);
                    }
                } catch (Exception e) { }
                System.out.println("end");

通过运行便可以看到消息成功 pub 到 sub 端。

至于连接类型,该客户端提供了三种

  • BlockingConnection 阻塞式
  • CallbackConnection 回调函数式
  • FutureConnection 异步式

java 客户端还能制定更多的通讯细节

// 连接前清空会话信息 , 若设为 false,MQTT 服务器将持久化客户端会话的主体订阅和 ACK 位置,默认为 true
            mqtt.setCleanSession(CLEAN_START);
            // 设置心跳时间
            // , 定义客户端传来消息的最大时间间隔秒数,服务器可以据此判断与客户端的连接是否已经断开,从而避免 TCP/IP 超时的长时间等待
            mqtt.setKeepAlive(KEEP_ALIVE);
            // 设置客户端 id, 用于设置客户端会话的 ID。在 setCleanSession(false); 被调用时,MQTT 服务器利用该 ID 获得相应的会话。
            // 此 ID 应少于 23 个字符,默认根据本机地址、端口和时间自动生成
            mqtt.setClientId(CLIENT_ID);
 // 设置“遗嘱”消息的内容,默认是长度为零的消息 mqtt.setWillMessage("willMessage");
             * // 设置“遗嘱”消息的 QoS,默认为 QoS.ATMOSTONCE
             * mqtt.setWillQos(QoS.AT_LEAST_ONCE);
             * // 若想要在发布“遗嘱”消息时拥有 retain 选项,则为 true mqtt.setWillRetain(true);
             * // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息
             * mqtt.setWillTopic("willTopic");
             */

            // == 失败重连接设置说明
            // 设置重新连接的次数
            // , 客户端已经连接到服务器,但因某种原因连接断开时的最大重试次数,超出该次数客户端将返回错误。- 1 意为无重试上限,默认为 -1
            mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);
            // 设置重连的间隔时间 , 首次重连接间隔毫秒数,默认为 10ms
            mqtt.setReconnectDelay(RECONNECTION_DELAY);
            // 客户端首次连接到服务器时,连接的最大重试次数,超出该次数客户端将返回错误。- 1 意为无重试上限,默认为 -1
            // mqtt.setConnectAttemptsMax(10L);

MQTT 提供的 pub/sub 确实比 redis 的 pub/sub 机制强大些。

本文永久更新链接地址:http://www.linuxidc.com/Linux/2016-10/136359.htm

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-21发表,共计5855字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中