当前位置: 移动技术网 > IT编程>开发语言>Java > Tigase 发送消息的流程源码分析

Tigase 发送消息的流程源码分析

2018年11月01日  | 移动技术网IT编程  | 我要评论
xmpp 的<message/>节是使用基本的”push”方法来从一个地方到另一个地方得到消息。因为消息通常是不告知的,它们是一种”fire-and-forget”(发射后自寻目的)的机制来从一个地方到另一个地方快速获取信息
消息节有五种不同的类型,通过 type 属性来进行区分:例如 chat 类型为 chat 的消息在两个实体间的实时对话中交换,例如两个朋友之间的即时通讯聊天。除了 type 属性外,消息节还包括一个 to 和 from 地址,并且也可以包含一个用于跟踪目的的 id  属性(我们在使用更为广泛的 iq  节中详细的讨论 ids)。to  地址是预期接收人的
jabberid,from 地址是发送者的jabberid。from 地址不由发送客户端提供,而是由发送者的服务器添加邮戳,以避免地址欺骗。
在tigase中,有两个重要的组成,一个组件,二是插件,可以去官方网去看下他的架构介绍 https://docs.tigase.net/tigase-server/7.1.4/development_guide/html/#writeplugincode
例如最著名的组件的一个例子是muc或pubsub。在tigase中,几乎所有东西实际上都是一个组件:会话管理器、s2s连接管理器、消息路由器等等,组件是根据服务器配置加载的,新的组件可以在运行时加载和激活。您可以轻松地替换组件实现,唯一要做的更改是配置条目中的类名。

tigase 中定义一个最简单的消息组件,需要实现messagereceiver或继承 extends abstractmessagereceiver 类, messagereceiver 的抽象类: abstractmessagereceiver 子类 :
一、clientconnectionmanager
二、sessionmanager
三、 messagerouter
public void setproperties(map<string, object> props){
    for (string name : msgrcv_names) {
        mr = conf.getmsgrcvinstance(name);
        if (mr instanceof messagereceiver) {
            ((messagereceiver) mr).setparent(this);
            ((messagereceiver) mr).start();
        }
    }
}

1、当客户端发送的message消息到tigase服务端,每个一socket连接都会被包装成ioservice对象,ioservice包含一系列操作socket的方法(接收发送数据等),processsocketdata()接收网络数据,由tigase.net处理解析成xml对象,并将packet放到接收队列receivedpackets中再调用servicelistener.packetsready(this)。由于connectionmanager实现ioservicelistener接口,实现上调用的的是connectionmanager中的packetsready()来开始处理数据

此时的packet :packetfrom=null,packetto=null。
 
clientconnectionmanager.processsocketdata方法中设置packet的一些属性:
此时: packetfrom=c2s@llooper/192.168.0.33_5222_192.168.0.33_38624, packetto=sess-man@llooper
clientconnectionmanager.processsocketdata(xmppioservice<object>serv)
    jid id = serv.getconnectionid(); //c2s@llooper/192.168.0.33_5222_192.168.0.33_38624
    p.setpacketfrom(id); //packetfrom 设置为onnectionid
    p.setpacketto(serv.getdatareceiver()); //packetto 设置为sess-man --> sessionmanager 
    addoutpacket(p);//将会委托给父 messagerouter 路由
    
}
//packet 被设置上一些源信息,和目的地信息,接下来,这个数据包将会委托给父 messagerouter 帮忙路由到 sessionmanager组件中进行处理
packet = (tigase.server.message) from=c2s@llooper/192.168.0.33_5222_192.168.0.33_38624, to=sess-man@llooper, data=<message xmlns="jabber:client" id="44grm-176" type="chat" to="llooper@llooper"><thread>swjzv5</thread><composing xmlns="http://jabber.org/protocol/chatstates"/></message>, size=170, xmlns=jabber:client, priority=normal, permission=none, type=chat
 
packet = from=c2s@llooper/192.168.0.33_5222_192.168.0.33_38624, to=sess-man@llooper, data=<message to="admin@llooper" type="chat" id="2jepe-253" xmlns="jabber:client"><thread>7vkmrq</thread><composing xmlns="http://jabber.org/protocol/chatstates"/></message>, size=168, xmlns=jabber:client, priority=normal, permission=none, type=chat
 
2、messagerouter.processpacket(packet packet)部分代码如下:
 
//我们不会处理没有目标地址的数据包,只是丢弃它们并写一个日志消息
if (packet.getto() == null) {
    log.log(level.warning, "packet with to attribute set to null: {0}", packet);
    return;
}   


//它不是一个服务发现包,我们必须找到一个处理组件
//下面的代码块是“快速”找到一个组件if

//这个包to 组件id,格式在以下一项:
// 1。组件名+“@”+默认域名
// 2。组件名+“@”+任何虚拟主机名
// 3。组件名+ "."+默认域名
// 4。组件名+ "."+任何虚拟主机名

servercomponent comp = getlocalcomponent(packet.getto()); //sessionmanager
comp.processpacket(packet, results);

 3、sessionmanager.processpacket(final packet packet)处理,有要代码如下。 例如a->b,这样做的目的是为了首先确定用户a有权限发送packet,然后是确定用户b有权限接收数据。如果用户b不在线,那么离线消息处理器会把packet保存到数据库当中。

//xmppresourceconnection session——用户会话保存所有用户会话数据,并提供对用户数据存储库的访问。它只允许在会话的生命周期内将信息存储在永久存储或内存中。如果在分组处理时没有联机用户会话,则此参数可以为空。
xmppresourceconnection conn = getxmppresourceconnection(packet);
//现在要走sessionmanager的处理函数,主要是走插件流程,插件在tigase中也是一个重要的组成,入口就是在这里,sm plugin
processpacket(packet, conn);

   插入下sm plugin 流程说明 :

这个设计有一个惊人的结果。如果你看下面的图片,显示了两个用户之间的通信,你可以看到数据包被复制了两次才送到最终目的地: 

会话管理器(sessionmanager)必须对数据包进行两次处理。第一次以用户a的名义将其作为传出包进行处理,第二次以用户b的名义将其作为传入包进行处理。
这是为了确保用户a有权限发送一个包,所有的processor都应用到packet上,也为了确保用户b有权限接收packet,所有的processor都应用到packet了。例如,如果用户b是脱机的,那么有一个脱机消息processor应该将包发送到数据库,而不是用户b。
 
protected xmppresourceconnection getxmppresourceconnection(packet p) {
        xmppresourceconnection conn = null;
        
        //首先根据这个包的发起者,来查找他的连接资源类,找不到则找接收者的资源类
        jid    from = p.getpacketfrom();
        if (from != null) {
            conn = connectionsbyfrom.get(from);
            if (conn != null) {
                return conn;
            }
        }

        //这个接收者它可能是这个服务器上某个用户的消息,让我们为这个用户查找已建立的会话
        jid to = p.getstanzato();

        if (to != null) {
            if (log.isloggable(level.finest)) {
                log.finest("searching for resource connection for: " + to);
            }
            conn = getresourceconnection(to);
        } else {

            // hm, not sure what should i do now....
            // maybe i should treat it as message to admin....
            log.log(level.info,
                    "message without to attribute set, don''t know what to do wih this: {0}", p);
        }    // end of else

        return conn;
    }
    
    
protected void processpacket(packet packet, xmppresourceconnection conn) {

    ...
    packet.setpacketto(getcomponentid()); //sess-man@llooper
    ...

    if (!stop) {
        //授权匹配的processor处理packet
        walk(packet, conn);
        try {
            if ((conn != null) && conn.getconnectionid().equals(packet.getpacketfrom())) {
                handlelocalpacket(packet, conn);
            }
        } catch (noconnectionidexception ex) {
            ...
        }
    }
    
    ...
}

 

packetto被设置为组件id(sess-man@llooper),其值原先也是这个。
其中walk(packet, conn)方法,匹配处理器(授权)。对于message,此处匹配到的processor是amp和message-carbons,message-carbons没有怎么处理,主要是amp在处理,packet被塞amp的队列中等待处理。

private void walk(final packet packet, final xmppresourceconnection connection) {

        for (xmppprocessorifc proc_t : processors.values()) {
            xmppprocessorifc processor = proc_t;
            //根据element和xmlns,授权匹配成功的processor
            authorization    result    = processor.canhandle(packet, connection);

            if (result == authorization.authorized) {
                ....
            
                processingthreads pt = workerthreads.get(processor.id());

                if (pt == null) {
                    pt = workerthreads.get(defpluginsthreadspool);
                }
                //packet 放到(additem)授权了的processor的队列
                if (pt.additem(processor, packet, connection)) {
                    packet.processedby(processor.id());
                } else {

                    ...
                }
            } else {
                ...
            }
        }   
    }
workerthread.run() 从队列中取出packet,由sessionmanager.process(queueitem item)给amp处理。
sessionmanager.pocess(queueitem item) 如下:
@override
public void process(queueitem item) {
    
    xmppprocessorifc processor = item.getprocessor();

    try {
        //由授权的 processor 处理 packet
        processor.process(item.getpacket(), item.getconn(), nauserrepository,local_results, plugin_config.get(processor.id()));
        if (item.getconn() != null) {
            setpermissions(item.getconn(), local_results);
        }
        addoutpackets(item.getpacket(), item.getconn(), local_results);
    } catch (packeterrortypeexception e) {
        ...
    } catch (xmppexception e) {
        ...
    }
}


//其中processor.process()------> messageamp.process(),如下:

@override
public void process(packet packet, xmppresourceconnection session,
        nonauthuserrepository repo, queue results, map settings) throws xmppexception {
    if (packet.getelemname() == "presence") {
        ...
        
    } else {
        element amp = packet.getelement().getchild("amp", xmlns);

        if ((amp == null) || (amp.getattributestaticstr("status") != null)) {
            messageprocessor.process(packet, session, repo, results, settings);
        } else {
            ...
    }
}

// 其中messageprocessor.process() --------> message.process(),如下


@override
public void process(packet packet, xmppresourceconnection session,
        nonauthuserrepository repo, queue results, map settings) throws xmppexception {

    ...
    try {
        ...
        // 在比较jids之前,记住要去除资源部分
        id = (packet.getstanzafrom() != null)
                ? packet.getstanzafrom().getbarejid()
                : null;

        // 检查这是否是来自客户端的数据包
        if (session.isuserid(id)) {
            // 这是来自这个客户端的数据包,最简单的操作是转发到它的目的地:
            // simple clone the xml element and....
            // ... putting it to results queue is enough
            results.offer(packet.copyelementonly());

            return;
        }

        
    } catch (notauthorizedexception e) {
        ...
    }    // end of try-catch
}

 

检查stanzaffrom与session匹配通过后,将packet.copyelementonly()放到results中,作后续投递,原来的packet 就丢弃了。
此时投递的packet :packetfrom=null,packetto=null。
packet在sessionmanager.addoutpacket(packet packet)中判断packetfrom是否为空,为空则将其设置为componentid(此处为sess-man@llooper),然后调用父类(abstractmessagereceiver.java) 的addoutpacket(packet)方法塞到out_queue 队列中。
此时packet::packetfrom=sess-man@llooper,packetto=null。
 

4、上层组件messagerouter处理,把packet塞到in_queues. 又回到了messagerouter.processpacket(packet packet)处理:

 
不同的是 packetto为空,packet.getto()的返回值是stanzato。
getlocalcomponent(packet.getto());方法根据stanzato与compid、comp name、component都匹配不到。
此时packet会给组件sessionmanager处理,packet will be processed by: sess-man@llooper,由abstractmessagereceiver的非阻塞性方法addpacketnb(packet packet)加入到in_queues。
 
 5、第二次来到sessionmanager.processpacket(final packet packet)处理。不同的是在getxmppresourceconnection(packet)方法中,
conn = connectionsbyfrom.get(from)返回值是null,所以是根据stanzato取获取接收方的session,返回接收方连接的connection。
protected xmppresourceconnection getxmppresourceconnection(packet p) {
    xmppresourceconnection conn = null;
    jid                    from = p.getpacketfrom();

    if (from != null) {
        conn = connectionsbyfrom.get(from);
        if (conn != null) {
            return conn;
        }
    }

    // it might be a message _to_ some user on this server
    // so let's look for established session for this user...
    jid to = p.getstanzato();

    if (to != null) {
        ...
        conn = getresourceconnection(to);
    } else {

        ...
    }    // end of else

    return conn;
}

 

 6、如同步骤3,此时packet作为一个以用户b的名义将其作为传入包进行处理。

然后packetto被设置为组件id(sess-man@llooper)

此时packet: packetfrom = sess-man@llooper,packetto =sess-man@llooper。

之后packet又经walk(packet, conn)方法,匹配处理器(授权),扔给amp处理。

 如同前面: 直到message.process(),如下:
@override
public void process(packet packet, xmppresourceconnection session,
        nonauthuserrepository repo, queue<packet> results, map<string, object> settings) throws xmppexception {

    // for performance reasons it is better to do the check
    // before calling logging method.
    if (log.isloggable(level.finest)) {
        log.log(level.finest, "processing packet: {0}, for session: {1}", new object[] {
                packet,
                session });
    }

    // you may want to skip processing completely if the user is offline.
    if (session == null) {
        processofflineuser( packet, results );
        return;
    }    // end of if (session == null)
    try {

        // remember to cut the resource part off before comparing jids
        barejid id = (packet.getstanzato() != null)
                ? packet.getstanzato().getbarejid()
                : null;

        // checking if this is a packet to the owner of the session
        if (session.isuserid(id)) {
            if (log.isloggable(level.finest)) {
                log.log(level.finest, "message 'to' this user, packet: {0}, for session: {1}",
                        new object[] { packet,
                        session });
            }

            if (packet.getstanzafrom() != null && session.isuserid(packet.getstanzafrom().getbarejid())) {
                jid connectionid = session.getconnectionid();
                if (connectionid.equals(packet.getpacketfrom())) {
                    results.offer(packet.copyelementonly());
                    // this would cause message packet to be stored in offline storage and will not
                    // send recipient-unavailable error but it will behave the same as a message to
                    // unavailable resources from other sessions or servers
                    return;
                }
            }

            // yes this is message to 'this' client
            list<xmppresourceconnection> conns = new arraylist<xmppresourceconnection>(5);

            // this is where and how we set the address of the component
            // which should rceive the result packet for the final delivery
            // to the end-user. in most cases this is a c2s or bosh component
            // which keep the user connection.
            string resource = packet.getstanzato().getresource();

            if (resource == null) {

                // if the message is sent to barejid then the message is delivered to
                // all resources
                conns.addall(getconnectionsformessagedelivery(session));
            } else {

                // otherwise only to the given resource or sent back as error.
                xmppresourceconnection con = session.getparentsession().getresourceforresource(
                        resource);

                if (con != null) {
                    conns.add(con);
                }
            }

            // messagecarbons: message cloned to all resources? why? it should be copied only
            // to resources with non negative priority!!

            if (conns.size() > 0) {
                for (xmppresourceconnection con : conns) {
                    packet result = packet.copyelementonly();

                    result.setpacketto(con.getconnectionid());

                    // in most cases this might be skept, however if there is a
                    // problem during packet delivery an error might be sent back
                    result.setpacketfrom(packet.getto());

                    // don't forget to add the packet to the results queue or it
                    // will be lost.
                    results.offer(result);
                    if (log.isloggable(level.finest)) {
                        log.log(level.finest, "delivering message, packet: {0}, to session: {1}",
                                new object[] { packet,
                                con });
                    }
                }
            } else {
                // if there are no user connections we should process packet
                // the same as with missing session (i.e. should be stored if
                // has type 'chat'
                processofflineuser( packet, results );
            }

            return;
        }    // end of else

        // remember to cut the resource part off before comparing jids
        id = (packet.getstanzafrom() != null)
                ? packet.getstanzafrom().getbarejid()
                : null;

        // checking if this is maybe packet from the client
        if (session.isuserid(id)) {

            // this is a packet from this client, the simplest action is
            // to forward it to is't destination:
            // simple clone the xml element and....
            // ... putting it to results queue is enough
            results.offer(packet.copyelementonly());

            return;
        }

        // can we really reach this place here?
        // yes, some packets don't even have from or to address.
        // the best example is iq packet which is usually a request to
        // the server for some data. such packets may not have any addresses
        // and they usually require more complex processing
        // this is how you check whether this is a packet from the user
        // who is owner of the session:
        jid jid = packet.getfrom();

        // this test is in most cases equal to checking getelemfrom()
        if (session.getconnectionid().equals(jid)) {

            // do some packet specific processing here, but we are dealing
            // with messages here which normally need just forwarding
            element el_result = packet.getelement().clone();

            // if we are here it means from address was missing from the
            // packet, it is a place to set it here:
            el_result.setattribute("from", session.getjid().tostring());

            packet result = packet.packetinstance(el_result, session.getjid(), packet
                    .getstanzato());

            // ... putting it to results queue is enough
            results.offer(result);
        }
    } catch (notauthorizedexception e) {
        log.log(level.fine, "notauthorizedexception for packet: " + packet + " for session: " + session, e);
        results.offer(authorization.not_authorized.getresponsemessage(packet,
                "you must authorize session first.", true));
    }    // end of try-catch
}

检查stanzato与session匹配通过后,根据session拿到接收方所有的连接(可能多端登陆),然后packet result = packet.copyelementonly()生成新的packet(原packet丢弃了),并将packetto设置为接收方连接的connectionid(例如:c2s@llooper/192.168.0.33_5222_192.168.0.33_38624),通过addoutpacket()方法塞到out_queue队列。
此时packet:packetfrom = sess-man@llooper,packetto =c2s@llooper/192.168.0.33_5222_192.168.0.33_38624。

7、 如同前面步骤2,不同的是根据packetto匹配到组件 c2s@llooper

8、 组件 c2s@llooper 从queue中取出packet,分发到目的地

public void processpacket(final packet packet) {
    ...
    if (packet.iscommand() && (packet.getcommand() != command.other)) {
        ...
    } else {
        // 把packet 发送给客户端
        if (!writepackettosocket(packet)) {

            ...
            
        }
    }    // end of else
}

 

后续有时间会不断更新,欢迎加入qq群 310790965 更多的交流

 
 

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网