阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Openfire的启动过程与session管理

150次阅读
没有评论

共计 10837 个字符,预计需要花费 28 分钟才能阅读完成。

说明
 
本文源码基于 Openfire4.0.2。
 
Openfire 的启动
 
    Openfire 的启动过程非常的简单,通过一个入口初始化 lib 目录下的 openfire.jar 包,并启动一个 XMPPServer 实例。
 
下面就是 ServerStarter.start 方法的代码片断:
 
Class containerClass = loader.loadClass("org.jivesoftware.openfire.XMPPServer");
containerClass.newInstance();
这样一个 openfire 实例就已经启动了。
 
XMPPServer 类
 
这个 XmppServer 类是单实例的对象,这样在服务器调用时可以获取一个实例。既然是个对象就会有构造的过程,XMPPServer 在构造过程中会对服务进行初始化,这个过程包括:
  • 初始化配置参数
  • 检查是否需要安装
  • 初始化 Module
  • 启动统计模块
  • 启动 plugin
 
基本就是这么简单,还是非常简洁明了。这里也可以大概知道在 openfire 里主要是 module 和 plugin 两类模块,一般情况下内部的模块都用 module,对于一些功能的扩展或者第三方的开发扩展使用 Plugin。官方其实也会自己写一个插件来扩展功能,说明插件还是比较灵活的。
 
提一提 Module 的加载过程
 
下面代码是 module 的加载过程
if (!setupMode) {verifyDataSource();
    // First load all the modules so that modules may access other modules while
    // being initialized
    loadModules();
    // Initize all the modules
    initModules();
    // Start all the modules
    startModules();}
 
可以看到,分了三个步骤:
加载模块:是对模块类的实例化过程,就是创建对象
初始化模块:就是调用 module.initialize(this);,其实就是调用模块的初始化方法
启动模块:module.start();,同理就是调用启动模块
 
这是因为 openfire 规范了 module 的接口抽象 Module,所有的模块都要按照这个规范实现,看代码:
 
public interface Module {/**
     * Returns the name of the module for display in administration interfaces.
     *
     * @return The name of the module.
     */
    String getName();
 
    /**
     * Initialize the module with the container.
     * Modules may be initialized and never started, so modules
     * should be prepared for a call to destroy() to follow initialize().
     *
     * @param server the server hosting this module.
     */
    void initialize(XMPPServer server);
 
    /**
     * Start the module (must return quickly). Any long running
     * operations should spawn a thread and allow the method to return
     * immediately.
     */
    void start();
 
    /**
     * Stop the module. The module should attempt to free up threads
     * and prepare for either another call to initialize (reconfigure the module)
     * or for destruction.
     */
    void stop();
 
    /**
     * Module should free all resources and prepare for deallocation.
     */
    void destroy();}
 
这也标示了 Module 的生命周期,Openfire 会管理这些 Module 的生命周期,以此来保证各个模块的启动与释放。
 
Connection 管理模块
 
整个启动过程有点奇怪,并没有看到 Openfire 是如何监听端口的,如果不监听如何获利客户端连接呢?因为 Openfire 只通过 Module 来管理的,那么对应的网络管理应该就在 Module 中。于是在 XMPPServer.loadModules 方法中看到下面的代码:
// Load this module always last since we don't want to start listening for clients
// before the rest of the modules have been started
loadModule(ConnectionManagerImpl.class.getName());
 
ConnectionManagerImpl 就是连接的管理模块,这里有个注释,就是在其他模块启动后之后再启动监听模块。
 
在 ConnectionManagerImpl 中管理了主要的连接,都是以 ConnectionListener 的来管理,这个类用于包装连接。我的理解就是一个连接抽象吧,这样对于代码来说写起来比较统一。看下面代码中 Manager 管理着哪些:
 
private final ConnectionListener clientListener;
    private final ConnectionListener clientSslListener;
    private final ConnectionListener boshListener;
    private final ConnectionListener boshSslListener;
    private final ConnectionListener serverListener;
    private final ConnectionListener componentListener;
    private final ConnectionListener componentSslListener;
    private final ConnectionListener connectionManagerListener; // Also known as 'multiplexer'
    private final ConnectionListener connectionManagerSslListener; // Also known as 'multiplexer'
    private final ConnectionListener webAdminListener;
    private final ConnectionListener webAdminSslListener;
这里面除了 server 只有一个外,其他的都是两个,其中一个是 SSL 的。它们主要是什么链接?
  • client:表示客户端连接
  • bosh:就是 HTTP 绑定的连接
  • server:服务器到服务器的 socket 连接
  • component:组件到服务器的连接
  • connectionManager:是指通过 connectionManager 连接器过来的连接
  • webAdmin:是指 web 控制台的连接
 
这里面 bosh 和 webAdmin 使用的是 http 协议,所以连接并不是长连接,其他的都是 socket。
 
openfire 里使用了 Mina 来实现 socket 网络处理。只不过看代码中对于 S2S 类型的连接使用的不是 mina,如下代码:
 
if (getType() == ConnectionType.SOCKET_S2S )
{connectionAcceptor = new LegacyConnectionAcceptor(generateConnectionConfiguration() );
}
else
{connectionAcceptor = new MINAConnectionAcceptor(generateConnectionConfiguration() );
}
 
LegacyConnectionAcceptor 是个废弃的类,但不知道为什么 s2s 还要用这个呢?看了看实现,LegacyConnectionAcceptor 就是起了一个线程,在线程里建了一个 ServerSocket。可能以后还是会迁移这部分代码吧。
 
在 connectionAcceptor 中会根据类型创建一个 ConnectionHandler 用于实现具体的业务功能,而 ConnectionHandler 都是基于 org.apache.mina.core.service.IoHandlerAdapter 派生的类,而 IoHandlerAdapter 又是 IoHandler 的适配接口,所以实质上就是 IoHandler。下面是类继承关系:
Openfire 的启动过程与 session 管理
 
在这些 Handler 里完成的主要是每个连接打开、关闭和数据收发等操作的处理。而其中比较关键的一个步骤就是在 sessionOpened 中设置了 StanzeHandler,而每种 ConnectionHandler 都有自己的 StanzeHandler 实现。以 ClientConnectionHandler 为例子,其中 ClientConnectionHandler 复写了父类的 createStanzaHandler 方法,这里面
@Override
    StanzaHandler createStanzaHandler(NIOConnection connection) {return new ClientStanzaHandler(XMPPServer.getInstance().getPacketRouter(), connection);
    }
这里使用的是 clientStanzaHandler,表示是客户端的数据节处理者。而最终的 createStanzaHandler 调用是在父类 ConnectionHandler 的 sessionOpened 完成的,
 
@Override
public void sessionOpened(IoSession session) throws Exception {// Create a new XML parser for the new connection. The parser will be used by the XMPPDecoder filter.
    final XMLLightweightParser parser = new XMLLightweightParser(StandardCharsets.UTF_8);
    session.setAttribute(XML_PARSER, parser);
    // Create a new NIOConnection for the new session
    final NIOConnection connection = createNIOConnection(session);
    session.setAttribute(CONNECTION, connection);
    session.setAttribute(HANDLER, createStanzaHandler(connection));
    // Set the max time a connection can be idle before closing it. This amount of seconds
    // is divided in two, as Openfire will ping idle clients first (at 50% of the max idle time)
    // before disconnecting them (at 100% of the max idle time). This prevents Openfire from
    // removing connections without warning.
    final int idleTime = getMaxIdleTime() / 2;
    if (idleTime > 0) {session.getConfig().setIdleTime(IdleStatus.READER_IDLE, idleTime);
    }
}

这样每一个 session 在打开时都会设置 handler,而具体的 handler 由各个派生类创建返回。这里的 StanzHandler 就是 Openfire 里的数据包处理单元。和 connection 类型一样,包处理也是对应的几个类:

 Openfire 的启动过程与 session 管理
 
注:
 
Session 模块
 
对于 Openfire 来说一个比较重要的功能就是管理 session,因为要与客户端实时的进行数据通讯,所以必须保持着连接。在 Openfire 中对于 Session 的管理都集中在 SessionManager 模块。但在前面说到连接管理时已经知道了 IoSession 的创建过程,但并没有看到 openfire 是如何管理它的。接着 ConnectionHandler 和 StanzaHandler 就能知道其中有奥秘。
 
前面知道了 ConnectionHandler 是连接的处理者,这里会有连接的创建、关闭、数据收发的处理,回到 ConnectionHandler 这个抽象类中。对于创建时 (sessionOpend) 主要是创建了 StanzaHandler,这样就把数据包的处理委托给了 StzanzHandler(派生类)。但是这个时候并没有将 session 放入到 openfire 的 session 管理模块中,而是在客户端发送数据过来后才开始的。
 
先看看 ConnectionHandler 的 messageReceived 方法:
 
@Override
public void messageReceived(IoSession session, Object message) throws Exception {// Get the stanza handler for this session
    StanzaHandler handler = (StanzaHandler) session.getAttribute(HANDLER);
    // Get the parser to use to process stanza. For optimization there is going
    // to be a parser for each running thread. Each Filter will be executed
    // by the Executor placed as the first Filter. So we can have a parser associated
    // to each Thread
    final XMPPPacketReader parser = PARSER_CACHE.get();
    // Update counter of read btyes
    updateReadBytesCounter(session);
    //System.out.println("RCVD:" + message);
    // Let the stanza handler process the received stanza
    try {handler.process((String) message, parser);
    } catch (Exception e) {Log.error("Closing connection due to error while processing message:" + message, e);
        final Connection connection = (Connection) session.getAttribute(CONNECTION);
        if (connection != null ) {connection.close();
        }
 
    }
}

在接收到数据包后获取到 StanzaHandler,然后调用了它的 process 方法,也就是让实际的包处理者去处理数据。这样就回到了 StanzeHanler,以 ClientStanzaHandler 为例子。只不过这个派生类中没有重写 process 方法,也就是说要看父类的实现:

public void process(String stanza, XMPPPacketReader reader) throws Exception {boolean initialStream = stanza.startsWith("<stream:stream") || stanza.startsWith("<flash:stream");
    if (!sessionCreated || initialStream) {if (!initialStream) {
..........
        // Found an stream:stream tag...
        if (!sessionCreated) {sessionCreated = true;
            MXParser parser = reader.getXPPParser();
            parser.setInput(new StringReader(stanza));
            createSession(parser);
        }
..........
        return;
    }
..........
}

由于代码较多,我省略了一些代码。看到这应该明白了吧,对于当前的连接没有创建 Openfire 的 session 对象时,会进行创建过程 createSession,对于不同的 StanzeHandler 会有些不一样,这里 ClientStanzaHandler 的实现就是把创建好的 session 放到本地的 LocalClientSession 中:

@Override
boolean createSession(String namespace, String serverName, XmlPullParser xpp, Connection connection)
        throws XmlPullParserException {if ("jabber:client".equals(namespace)) {// The connected client is a regular client so create a ClientSession
        session = LocalClientSession.createSession(serverName, xpp, connection);
        return true;
    }
    return false;
}
到这一个 session 算是建立完成了。
 
集群下的 session
 
之前一篇关于《Openfire 集群源码分析》提到了 session 的一些内容。其中也提到了 session 是不会向每一台服务器进行同步复制的,这就有一个问题,如果 A 用户先是连接了服务器 1,但是接下来的操作又到服务器 2,这不就会造成 session 无法找到吗?同样的问题,如果想要获取到当前所有的 client session 怎么办?
 
1、如何在集群中发消息
对于消息最终还是通过 session 来发送的,前后代码太多,就直接看一下 sessionManager 中的 getSession 方法吧。
 
public ClientSession getSession(JID from) {// Return null if the JID is null or belongs to a foreign server. If the server is
    // shutting down then serverName will be null so answer null too in this case.
    if (from == null || serverName == null || !serverName.equals(from.getDomain())) {return null;
    }
 
    // Initially Check preAuthenticated Sessions
    if (from.getResource() != null) {ClientSession session = localSessionManager.getPreAuthenticatedSessions().get(from.getResource());
        if (session != null) {return session;
        }
    }
 
    if (from.getResource() == null || from.getNode() == null) {return null;
    }
 
    return routingTable.getClientRoute(from);
}


先是获取本地的 session,如果能找到直接返回,找不到则跳到 routingTable 里获取客户端的路由信息。

@Override
public ClientSession getClientRoute(JID jid) {// Check if this session is hosted by this cluster node
    ClientSession session = (ClientSession) localRoutingTable.getRoute(jid.toString());
    if (session == null) {// The session is not in this JVM so assume remote
        RemoteSessionLocator locator = server.getRemoteSessionLocator();
        if (locator != null) {// Check if the session is hosted by other cluster node
            ClientRoute route = usersCache.get(jid.toString());
            if (route == null) {route = anonymouSUSErsCache.get(jid.toString());
            }
            if (route != null) {session = locator.getClientSession(route.getNodeID().toByteArray(), jid);
            }
        }
    }
    return session;
}

这里更直接的可以看到,查找本地路由不 null 则会通过 RemoteSessionLocator 来完成。当然这里最大的奥秘其实是 usersCache 和 anonymousUsersCache 这两个 cache。之前写的集群源码分析中提过,最终 openfire 集群后会对缓存进行同步,这样每台服务器上都会有缓存的副本。所以 usersCache 是拥有所有��户信息的,有了 user 的信息就有了 jid 的信息,这样不管是哪台服务器都可以对数据包处理并发送给客户端。

 
这里的 RemoteSessionLocator 是由于适配不同的集群组件所抽象的接口,使得加入不同集群组件提供了透明处理。
 
2、如何获利所有的在线用户
 
对于获取所有在线用户这个功能思路也挺简单,一样是找本地所有的缓存。看 getSessions 的代码:
public Collection<ClientSession> getSessions() {return routingTable.getClientsRoutes(false);
    }

其实就是访问路由表,因为路由表里有所有的 cache,和获取单个的 session 不一样,需要对所有的路由都遍历返回。

@Override
public Collection<ClientSession> getClientsRoutes(boolean onlyLocal) {// Add sessions hosted by this cluster node
    Collection<ClientSession> sessions = new ArrayList<ClientSession>(localRoutingTable.getClientRoutes());
    if (!onlyLocal) {// Add sessions not hosted by this JVM
        RemoteSessionLocator locator = server.getRemoteSessionLocator();
        if (locator != null) {// Add sessions of non-anonymous users hosted by other cluster nodes
            for (Map.Entry<String, ClientRoute> entry : usersCache.entrySet()) {ClientRoute route = entry.getValue();
                if (!server.getNodeID().equals(route.getNodeID())) {sessions.add(locator.getClientSession(route.getNodeID().toByteArray(), new JID(entry.getKey())));
                }
            }
            // Add sessions of anonymous users hosted by other cluster nodes
            for (Map.Entry<String, ClientRoute> entry : anonymousUsersCache.entrySet()) {ClientRoute route = entry.getValue();
                if (!server.getNodeID().equals(route.getNodeID())) {sessions.add(locator.getClientSession(route.getNodeID().toByteArray(), new JID(entry.getKey())));
                }
            }
        }
    }
    return sessions;
} 
 
总结
对于查看 Openfire 的源代码学习了一些东西,特别是对于服务化系统的开发思路。而且在集群化上也有了一些认识,知道了多机部署后系统应该要解决哪些问题。
继续学习吧。
 

CentOS 下 Openfire 详细安装过程 http://www.linuxidc.com/Linux/2012-09/69539.htm

CentOS 5.4 下基于 Jabber/XMPP 协议的 Openfire 服务器配置笔记 http://www.linuxidc.com/Linux/2012-02/55497.htm

Ubuntu 12.04 安装 Openfire http://www.linuxidc.com/Linux/2012-07/64945.htm

Openfire 在使用 MySQL 数据库后的中文乱码问题解决 http://www.linuxidc.com/Linux/2014-03/97989.htm

通过 Nginx 实现 Openfire 集群的负载均衡  http://www.linuxidc.com/Linux/2015-09/122943.htm

Openfire 的详细介绍:请点这里
Openfire 的下载地址:请点这里

本文永久更新链接地址:http://www.linuxidc.com/Linux/2016-08/134021.htm

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-21发表,共计10837字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中