阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Openfire集群源码分析

444次阅读
没有评论

共计 6471 个字符,预计需要花费 17 分钟才能阅读完成。

如果用户量增加后为了解决吞吐量问题,需要引入集群,在 openfire 中提供了集群的支持,另外也实现了两个集群插件:hazelcast 和 clustering。为了了解情况集群的工作原理,我就沿着 openfire 的源代码进行了分析,也是一次学习的过程。

首先理解集群的一些简单概念
集群的目的是让多个实例像一个实例一样运行,这样就可以通过增长实例来增长计算能力。也就是所谓的分布式计算问题,这其中最为关注的一个特性就是——CAP 理论,也就是所谓的一致性、可用性、分区容错性。集群中最核心解决的问题就是 CAP。
CAP 综合理解就是我上面写的,多个实例像一个实例一样运行。
 
所以所谓集群就是把一些数据共享或者同步到不同的实例上,这样系统使用同样的算法,取的结果当然应该是相同啦。所以一些数据库的主从复制,缓存数据集群都是类似这种解决方法。只是代码实现质量和处理规模的问题。
 
有了这个基础我们再来看看 openfire 是怎么解决这个问题的。
 
openfire 的集群设计
 
1、哪些需要进行集群间的同步
 对于 openfire 而言,有这几方面的数据需要进行保证集群间的同步:数据库存的数据、缓存数据、session。貌似就这些吧?
数据库
因为对于 openfire 来说基本上是透明的,所以这块就交给数据库本身来实现。
缓存数据
缓存是存在内存里的,所以这部分是要同步的
session
session 在 openfire 并不需要所有实例同步,但是需要做用户路由缓存,否则发消息时找不到对应的会话。由此用户路由还是要同步的。
 
2、缓存的设计
  • 缓存接口
openfire 里对缓存的数据容器提供了一个包装接口,这个接口提供了缓存数据的基本方法,用于统一数据操作。
public interface Cache<K,V> extends java.util.Map<K,V>

如果不开启集群时缓存的默认缓存容器类是:public class DefaultCache<K, V>,实际上 DefaultCache 就是用一个 Hashmap 来存数据的。

  • 缓存工厂类
为了保证缓存是可以扩展的,提供了一个工厂类:
public class CacheFactory
CacheFactory 类中会管理所有的缓存容器,如下代码:
/**
     * Returns the named cache, creating it as necessary.
     *
     * @param name         the name of the cache to create.
     * @return the named cache, creating it as necessary.
     */
    @SuppressWarnings("unchecked")
    public static synchronized <T extends Cache> T createCache(String name) {T cache = (T) caches.get(name);
        if (cache != null) {return cache;
        }
        cache = (T) cacheFactoryStrategy.createCache(name);
 
        log.info("Created cache [" + cacheFactoryStrategy.getClass().getName() + "] for" + name);
 
        return wrapCache(cache, name);
    }

上面代码中会通过缓存工厂策略对象来创建一个缓存容器,最后 warpCache 方法会将此容器放入到 caches 中。

  • 缓存工厂类的策略
在 CacheFactory 中默认是使用一个 DefaultLocalCacheStrategy 来完成缓存创建的。另外还提供了在集群条件下的缓存策略接入。也就是通过实例化不同的策略来切换缓存管理方案。比如后面要提到的 hazelcast 就是通过这个来替换了本地缓存策略的。从接口的设计上来看,openfire 的缓存策略也就是为了集群与非集群的实现。
 
3、集群的设计
在 openfire 中的集群主要包括:集群管理、数据同步管理、集群计算任务。
 
集群管理者
在 openfire 中主要是一个类来实现:ClusterManager,在 ClusterManager 中实现了集群实例的加入、退出管理,因为没有使用主从结构,所以 ClusterManager 实现了一个无中心管理,不知道我理解的对不对。因为只要当前实实例启用了集群,ClusterManager 就会主动的加载集群管理并与其他的集群进行同步。
 
  • startup
startup 是启动集群的方法,代码:
 
public static synchronized void startup() {if (isClusteringEnabled() && !isClusteringStarted()) {initEventDispatcher();
            CacheFactory.startClustering();}
    }
首先要判断是否开启了集群并且当前集群实例未运行时才去启动。
先是初始化了事件分发器,用于处理集群的同步事情。
 
然后就是调用 CacheFactory 的 startClustering 来运行集群。在 startClustering 方法中主要是这几个事情:
    • 会使用集群的缓存工厂策略来启动,同时使自己加入到集群中。
    • 开启一个线程用于同步缓存的状态
 
在前面 startup 中的 initEventDispatcher 方法, 在这里会注册一个分发线程监听到集群事件,收到事件后会执行 joinedCluster 或者 leftCluster 的操作,joinedCluster 就是加入到集群中的意思。
 
在 joinedCluster 时会将本地的缓存容器都转换为集群缓存。由此便完成了集群的初始化并加入到集群中了。
 
  • shutdown
shutdown 相对简单点就是退出集群,并且将缓存工厂恢复为本地缓存。
 
同步管理
上面主要是讲了如何管理集群,接着比较重要的就是如何在集群间同步数据呢?这部分主要是看具体的分布式计算系统的实现了,从 openfire 来说就是将数据放到集群缓存中,然后通过集群组件来完成的,比如使用 hazelcast。
 
因为使用缓存来解决,所以在 CacheFactory 中才会有这些么多关于集群的处理代码,特别是对于缓存策略的切换,以及集群任务处理都在 CacheFactory 作为接口方法向外公开。这样也把集群的实现透明了。
 
集群计算任务 
在这之前一直没有提到集群中的计算问题,因为既然有了集群是不是可以利用集群的优势进行一些并行计算呢?这部分我倒没有太过确定,只是看到相关的代码所以简单列一下。
 
在 CacheFactory 类中有几个方法:doClusterTask、doSynchronousClusterTask,这两个都是 overload 方法,参数有所不同而已。这几个方法就是用于执行一些计算任务的。就看一下 doClusterTask:
public static void doClusterTask(final ClusterTask<?> task) {cacheFactoryStrategy.doClusterTask(task);
    }

这里有个限定就是必须是 ClusterTask 派生的类才行,看看它的定义:

public interface ClusterTask<V> extends Runnable, Externalizable {V getResult();
 
}

主要是为了异步执行和序列化,异步是因为不能阻塞,而序列化当然就是为了能在集群中传送。

再看 CacheFactory 的 doClusterTask 方法可以发现,它只不过是代理了缓存策略工厂的 doClusterTask,具体的实现还是要看集群实现的。
 
看一看 hazelcast 的实现简单理解 openfire 集群
在 openfire 中有集群的插件实现,这里就以 hazelcast 为例子简单的做一下分析与学习。
 
  • 缓存策略工厂类(ClusteredCacheFactory)
 
ClusteredCacheFactory 实现了 CacheFactoryStrategy,代码如下:
public class ClusteredCacheFactory implements CacheFactoryStrategy {

首先是 startCluster 方法用于启动集群,主要完成几件事情:

    • 设置缓存序列化工具类,ClusterExternalizableUtil。这个是用于集群间数据复制时的序列化工具
    • 设置远程 session 定位器,RemoteSessionLocator,因为 session 不同步,所以它主要是用于多实例间的 session 读取
    • 设置远程包路由器 ClusterPacketRouter,这样就可以在集群中发送消息了
    • 加载 Hazelcast 的实例设置 NodeID,以及设置 ClusterListener
 
在前面说起集群启动时提到了缓存切换,那具体实现时是如何做的呢?
 
因为集群启动后就要是 CacheFactory.joinedCluster 方法来加入集群的。看一下加入的代码:
/**
     * Notification message indicating that this JVM has joined a cluster.
     */
    @SuppressWarnings("unchecked")
    public static synchronized void joinedCluster() {cacheFactoryStrategy = clusteredCacheFactoryStrategy;
        // Loop through local caches and switch them to clustered cache (copy content)
        for (Cache cache : getAllCaches()) {// skip local-only caches
            if (localOnly.contains(cache.getName())) continue;
            CacheWrapper cacheWrapper = ((CacheWrapper) cache);
            Cache clusteredCache = cacheFactoryStrategy.createCache(cacheWrapper.getName());
            clusteredCache.putAll(cache);
            cacheWrapper.setWrappedCache(clusteredCache);
        }
        clusteringStarting = false;
        clusteringStarted = true;
        log.info("Clustering started; cache migration complete");
    }

这里可以看到会读取所有的缓存容器并一个个的使用 Wrapper 包装一下,然后用同样的缓存名称去 createCache 一个新的 Cache,这步使用的是切换后的集群缓存策略工厂,也就是说会使用 ClusteredCacheFactory 去创建新的缓存容器。最后再将 cache 写入到新的 clusteredCache 里,这样就完成了缓存的切换。

当然这里还是要看一下 ClusteredCacheFactory 的 createCache 实现:
public Cache createCache(String name) {// Check if cluster is being started up
        while (state == State.starting) {// Wait until cluster is fully started (or failed)
            try {Thread.sleep(250);
            }
            catch (InterruptedException e) {// Ignore
            }
        }
        if (state == State.stopped) {throw new IllegalStateException("Cannot create clustered cache when not in a cluster");
        }
        return new ClusteredCache(name, hazelcast.getMap(name));
    }
 

这里使用的是 ClusteredCache,而且最重要的是传入的第二个 map 参数换成了 hazelcast 的了,这样之后再访问这个缓存容器时已经不再是原先的本地 Cache 了,已经是 hazelcast 的 map 对象。hazelcast 会自动对 map 的数据进行同步管理,这也就完成了缓存同步的功能。

  • 集群计算

那就看 hazelcast 的实现吧,在 ClusteredCacheFactory 中 doClusterTask 举个例子吧:

public void doClusterTask(final ClusterTask task) {if (cluster == null) {return; }
        Set<Member> members = new HashSet<Member>();
        Member current = cluster.getLocalMember();
        for(Member member : cluster.getMembers()) {if (!member.getUuid().equals(current.getUuid())) {members.add(member);
            }
        }
        if (members.size() > 0) {// Asynchronously execute the task on the other cluster members
            logger.debug("Executing asynchronous MultiTask:" + task.getClass().getName());
            hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME).submitToMembers(new CallableTask<Object>(task), members);
        } else {logger.warn("No cluster members selected for cluster task" + task.getClass().getName());
        }
    }

过程就是,先获取到集群中的实例成员,当然要排除自己。然后 hazelcast 提供了 ExecutorService 来执行这个 task,方法就是 submiteToMembers。这样就提交了一个运算任务。只不过具体是如何分配计算并汇集结果倒真不太清楚。

总结

花了一天时间看了一下 openfire 的集群,顺手就写了一篇文章,确实也到了一些东西。和一些网友沟通中好像目前大家更愿意使用 redies 来完成缓存共享,以及通过代理来实现集群,而不愿意使用 openfire 的集群方案。这部分我没有遇到如何大的并发量需求确实不知道区别在哪里。以后有机会还是动手试试写一个 redies 的插件。

CentOS 下 Openfire 详细安装过程 http://www.linuxidc.com/Linux/2012-09/69539.htm

CentOS 5.4 下基于 Jabber/XMPP 协议的 Openfire 服务器配置笔记 http://www.linuxidc.com/Linux/2012-02/55497.htm

Ubuntu 12.04 安装 Openfire http://www.linuxidc.com/Linux/2012-07/64945.htm

Openfire 在使用 MySQL 数据库后的中文乱码问题解决 http://www.linuxidc.com/Linux/2014-03/97989.htm

通过 Nginx 实现 Openfire 集群的负载均衡  http://www.linuxidc.com/Linux/2015-09/122943.htm

Openfire 的详细介绍:请点这里
Openfire 的下载地址:请点这里

本文永久更新链接地址:http://www.linuxidc.com/Linux/2016-07/133531.htm

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-21发表,共计6471字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中

星哥玩云

星哥玩云
星哥玩云
分享互联网知识
用户数
4
文章数
19348
评论数
4
阅读量
7825008
文章搜索
热门文章
开发者必备神器:阿里云 Qoder CLI 全面解析与上手指南

开发者必备神器:阿里云 Qoder CLI 全面解析与上手指南

开发者必备神器:阿里云 Qoder CLI 全面解析与上手指南 大家好,我是星哥。之前介绍了腾讯云的 Code...
星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛 NAS-6:抖音视频同步工具,视频下载自动下载保存 前言 各位玩 NAS 的朋友好,我是星哥!...
云服务器部署服务器面板1Panel:小白轻松构建Web服务与面板加固指南

云服务器部署服务器面板1Panel:小白轻松构建Web服务与面板加固指南

云服务器部署服务器面板 1Panel:小白轻松构建 Web 服务与面板加固指南 哈喽,我是星哥,经常有人问我不...
我把用了20年的360安全卫士卸载了

我把用了20年的360安全卫士卸载了

我把用了 20 年的 360 安全卫士卸载了 是的,正如标题你看到的。 原因 偷摸安装自家的软件 莫名其妙安装...
星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛 NAS-3:安装飞牛 NAS 后的很有必要的操作 前言 如果你已经有了飞牛 NAS 系统,之前...
阿里云CDN
阿里云CDN-提高用户访问的响应速度和成功率
随机文章
星哥带你玩飞牛 NAS-9:全能网盘搜索工具 13 种云盘一键搞定!

星哥带你玩飞牛 NAS-9:全能网盘搜索工具 13 种云盘一键搞定!

星哥带你玩飞牛 NAS-9:全能网盘搜索工具 13 种云盘一键搞定! 前言 作为 NAS 玩家,你是否总被这些...
星哥带你玩飞牛NAS-16:不再错过公众号更新,飞牛NAS搭建RSS

星哥带你玩飞牛NAS-16:不再错过公众号更新,飞牛NAS搭建RSS

  星哥带你玩飞牛 NAS-16:不再错过公众号更新,飞牛 NAS 搭建 RSS 对于经常关注多个微...
如何免费使用强大的Nano Banana Pro?附赠邪修的用法

如何免费使用强大的Nano Banana Pro?附赠邪修的用法

如何免费使用强大的 Nano Banana Pro?附赠邪修的用法 前言 大家好,我是星哥,今天来介绍谷歌的 ...
CSDN,你是老太太喝粥——无齿下流!

CSDN,你是老太太喝粥——无齿下流!

CSDN,你是老太太喝粥——无齿下流! 大家好,我是星哥,今天才思枯竭,不写技术文章了!来吐槽一下 CSDN。...
浏览器自动化工具!开源 AI 浏览器助手让你效率翻倍

浏览器自动化工具!开源 AI 浏览器助手让你效率翻倍

浏览器自动化工具!开源 AI 浏览器助手让你效率翻倍 前言 在 AI 自动化快速发展的当下,浏览器早已不再只是...

免费图片视频管理工具让灵感库告别混乱

一言一句话
-「
手气不错
安装并使用谷歌AI编程工具Antigravity(亲测有效)

安装并使用谷歌AI编程工具Antigravity(亲测有效)

  安装并使用谷歌 AI 编程工具 Antigravity(亲测有效) 引言 Antigravity...
自己手撸一个AI智能体—跟创业大佬对话

自己手撸一个AI智能体—跟创业大佬对话

自己手撸一个 AI 智能体 — 跟创业大佬对话 前言 智能体(Agent)已经成为创业者和技术人绕...
240 元左右!五盘位 NAS主机,7 代U硬解4K稳如狗,拓展性碾压同价位

240 元左右!五盘位 NAS主机,7 代U硬解4K稳如狗,拓展性碾压同价位

  240 元左右!五盘位 NAS 主机,7 代 U 硬解 4K 稳如狗,拓展性碾压同价位 在 NA...
仅2MB大小!开源硬件监控工具:Win11 无缝适配,CPU、GPU、网速全维度掌控

仅2MB大小!开源硬件监控工具:Win11 无缝适配,CPU、GPU、网速全维度掌控

还在忍受动辄数百兆的“全家桶”监控软件?后台偷占资源、界面杂乱冗余,想查个 CPU 温度都要层层点选? 今天给...
星哥带你玩飞牛NAS-8:有了NAS你可以干什么?软件汇总篇

星哥带你玩飞牛NAS-8:有了NAS你可以干什么?软件汇总篇

星哥带你玩飞牛 NAS-8:有了 NAS 你可以干什么?软件汇总篇 前言 哈喽各位玩友!我是是星哥,不少朋友私...