public void setproperties(map<string, object> props){ for (string name : msgrcv_names) { mr = conf.getmsgrcvinstance(name); if (mr instanceof messagereceiver) { ((messagereceiver) mr).setparent(this); ((messagereceiver) mr).start(); } } }
1、当客户端发送的message消息到tigase服务端,每个一socket连接都会被包装成ioservice对象,ioservice包含一系列操作socket的方法(接收发送数据等),processsocketdata()接收网络数据,由tigase.net处理解析成xml对象,并将packet放到接收队列receivedpackets中再调用servicelistener.packetsready(this)。由于connectionmanager实现ioservicelistener接口,实现上调用的的是connectionmanager中的packetsready()来开始处理数据
clientconnectionmanager.processsocketdata(xmppioservice<object>serv) jid id = serv.getconnectionid(); //c2s@llooper/192.168.0.33_5222_192.168.0.33_38624 p.setpacketfrom(id); //packetfrom 设置为onnectionid p.setpacketto(serv.getdatareceiver()); //packetto 设置为sess-man --> sessionmanager addoutpacket(p);//将会委托给父 messagerouter 路由 }
//我们不会处理没有目标地址的数据包,只是丢弃它们并写一个日志消息 if (packet.getto() == null) { log.log(level.warning, "packet with to attribute set to null: {0}", packet); return; } //它不是一个服务发现包,我们必须找到一个处理组件 //下面的代码块是“快速”找到一个组件if //这个包to 组件id,格式在以下一项: // 1。组件名+“@”+默认域名 // 2。组件名+“@”+任何虚拟主机名 // 3。组件名+ "."+默认域名 // 4。组件名+ "."+任何虚拟主机名 servercomponent comp = getlocalcomponent(packet.getto()); //sessionmanager comp.processpacket(packet, results);
3、sessionmanager.processpacket(final packet packet)处理,有要代码如下。 例如a->b,这样做的目的是为了首先确定用户a有权限发送packet,然后是确定用户b有权限接收数据。如果用户b不在线,那么离线消息处理器会把packet保存到数据库当中。
//xmppresourceconnection session——用户会话保存所有用户会话数据,并提供对用户数据存储库的访问。它只允许在会话的生命周期内将信息存储在永久存储或内存中。如果在分组处理时没有联机用户会话,则此参数可以为空。 xmppresourceconnection conn = getxmppresourceconnection(packet); //现在要走sessionmanager的处理函数,主要是走插件流程,插件在tigase中也是一个重要的组成,入口就是在这里,sm plugin processpacket(packet, conn);
插入下sm plugin 流程说明 :
protected xmppresourceconnection getxmppresourceconnection(packet p) { xmppresourceconnection conn = null; //首先根据这个包的发起者,来查找他的连接资源类,找不到则找接收者的资源类 jid from = p.getpacketfrom(); if (from != null) { conn = connectionsbyfrom.get(from); if (conn != null) { return conn; } } //这个接收者它可能是这个服务器上某个用户的消息,让我们为这个用户查找已建立的会话 jid to = p.getstanzato(); if (to != null) { if (log.isloggable(level.finest)) { log.finest("searching for resource connection for: " + to); } conn = getresourceconnection(to); } else { // hm, not sure what should i do now.... // maybe i should treat it as message to admin.... log.log(level.info, "message without to attribute set, don''t know what to do wih this: {0}", p); } // end of else return conn; } protected void processpacket(packet packet, xmppresourceconnection conn) { ... packet.setpacketto(getcomponentid()); //sess-man@llooper ... if (!stop) { //授权匹配的processor处理packet walk(packet, conn); try { if ((conn != null) && conn.getconnectionid().equals(packet.getpacketfrom())) { handlelocalpacket(packet, conn); } } catch (noconnectionidexception ex) { ... } } ... }
packetto被设置为组件id(sess-man@llooper),其值原先也是这个。
其中walk(packet, conn)方法,匹配处理器(授权)。对于message,此处匹配到的processor是amp和message-carbons,message-carbons没有怎么处理,主要是amp在处理,packet被塞amp的队列中等待处理。
private void walk(final packet packet, final xmppresourceconnection connection) { for (xmppprocessorifc proc_t : processors.values()) { xmppprocessorifc processor = proc_t; //根据element和xmlns,授权匹配成功的processor authorization result = processor.canhandle(packet, connection); if (result == authorization.authorized) { .... processingthreads pt = workerthreads.get(processor.id()); if (pt == null) { pt = workerthreads.get(defpluginsthreadspool); } //packet 放到(additem)授权了的processor的队列 if (pt.additem(processor, packet, connection)) { packet.processedby(processor.id()); } else { ... } } else { ... } } }
@override public void process(queueitem item) { xmppprocessorifc processor = item.getprocessor(); try { //由授权的 processor 处理 packet processor.process(item.getpacket(), item.getconn(), nauserrepository,local_results, plugin_config.get(processor.id())); if (item.getconn() != null) { setpermissions(item.getconn(), local_results); } addoutpackets(item.getpacket(), item.getconn(), local_results); } catch (packeterrortypeexception e) { ... } catch (xmppexception e) { ... } } //其中processor.process()------> messageamp.process(),如下: @override public void process(packet packet, xmppresourceconnection session, nonauthuserrepository repo, queue results, map settings) throws xmppexception { if (packet.getelemname() == "presence") { ... } else { element amp = packet.getelement().getchild("amp", xmlns); if ((amp == null) || (amp.getattributestaticstr("status") != null)) { messageprocessor.process(packet, session, repo, results, settings); } else { ... } } // 其中messageprocessor.process() --------> message.process(),如下 @override public void process(packet packet, xmppresourceconnection session, nonauthuserrepository repo, queue results, map settings) throws xmppexception { ... try { ... // 在比较jids之前,记住要去除资源部分 id = (packet.getstanzafrom() != null) ? packet.getstanzafrom().getbarejid() : null; // 检查这是否是来自客户端的数据包 if (session.isuserid(id)) { // 这是来自这个客户端的数据包,最简单的操作是转发到它的目的地: // simple clone the xml element and.... // ... putting it to results queue is enough results.offer(packet.copyelementonly()); return; } } catch (notauthorizedexception e) { ... } // end of try-catch }
4、上层组件messagerouter处理,把packet塞到in_queues. 又回到了messagerouter.processpacket(packet packet)处理:
protected xmppresourceconnection getxmppresourceconnection(packet p) { xmppresourceconnection conn = null; jid from = p.getpacketfrom(); if (from != null) { conn = connectionsbyfrom.get(from); if (conn != null) { return conn; } } // it might be a message _to_ some user on this server // so let's look for established session for this user... jid to = p.getstanzato(); if (to != null) { ... conn = getresourceconnection(to); } else { ... } // end of else return conn; }
6、如同步骤3,此时packet作为一个以用户b的名义将其作为传入包进行处理。
然后packetto被设置为组件id(sess-man@llooper)
此时packet: packetfrom = sess-man@llooper,packetto =sess-man@llooper。
之后packet又经walk(packet, conn)方法,匹配处理器(授权),扔给amp处理。
@override public void process(packet packet, xmppresourceconnection session, nonauthuserrepository repo, queue<packet> results, map<string, object> settings) throws xmppexception { // for performance reasons it is better to do the check // before calling logging method. if (log.isloggable(level.finest)) { log.log(level.finest, "processing packet: {0}, for session: {1}", new object[] { packet, session }); } // you may want to skip processing completely if the user is offline. if (session == null) { processofflineuser( packet, results ); return; } // end of if (session == null) try { // remember to cut the resource part off before comparing jids barejid id = (packet.getstanzato() != null) ? packet.getstanzato().getbarejid() : null; // checking if this is a packet to the owner of the session if (session.isuserid(id)) { if (log.isloggable(level.finest)) { log.log(level.finest, "message 'to' this user, packet: {0}, for session: {1}", new object[] { packet, session }); } if (packet.getstanzafrom() != null && session.isuserid(packet.getstanzafrom().getbarejid())) { jid connectionid = session.getconnectionid(); if (connectionid.equals(packet.getpacketfrom())) { results.offer(packet.copyelementonly()); // this would cause message packet to be stored in offline storage and will not // send recipient-unavailable error but it will behave the same as a message to // unavailable resources from other sessions or servers return; } } // yes this is message to 'this' client list<xmppresourceconnection> conns = new arraylist<xmppresourceconnection>(5); // this is where and how we set the address of the component // which should rceive the result packet for the final delivery // to the end-user. in most cases this is a c2s or bosh component // which keep the user connection. string resource = packet.getstanzato().getresource(); if (resource == null) { // if the message is sent to barejid then the message is delivered to // all resources conns.addall(getconnectionsformessagedelivery(session)); } else { // otherwise only to the given resource or sent back as error. xmppresourceconnection con = session.getparentsession().getresourceforresource( resource); if (con != null) { conns.add(con); } } // messagecarbons: message cloned to all resources? why? it should be copied only // to resources with non negative priority!! if (conns.size() > 0) { for (xmppresourceconnection con : conns) { packet result = packet.copyelementonly(); result.setpacketto(con.getconnectionid()); // in most cases this might be skept, however if there is a // problem during packet delivery an error might be sent back result.setpacketfrom(packet.getto()); // don't forget to add the packet to the results queue or it // will be lost. results.offer(result); if (log.isloggable(level.finest)) { log.log(level.finest, "delivering message, packet: {0}, to session: {1}", new object[] { packet, con }); } } } else { // if there are no user connections we should process packet // the same as with missing session (i.e. should be stored if // has type 'chat' processofflineuser( packet, results ); } return; } // end of else // remember to cut the resource part off before comparing jids id = (packet.getstanzafrom() != null) ? packet.getstanzafrom().getbarejid() : null; // checking if this is maybe packet from the client if (session.isuserid(id)) { // this is a packet from this client, the simplest action is // to forward it to is't destination: // simple clone the xml element and.... // ... putting it to results queue is enough results.offer(packet.copyelementonly()); return; } // can we really reach this place here? // yes, some packets don't even have from or to address. // the best example is iq packet which is usually a request to // the server for some data. such packets may not have any addresses // and they usually require more complex processing // this is how you check whether this is a packet from the user // who is owner of the session: jid jid = packet.getfrom(); // this test is in most cases equal to checking getelemfrom() if (session.getconnectionid().equals(jid)) { // do some packet specific processing here, but we are dealing // with messages here which normally need just forwarding element el_result = packet.getelement().clone(); // if we are here it means from address was missing from the // packet, it is a place to set it here: el_result.setattribute("from", session.getjid().tostring()); packet result = packet.packetinstance(el_result, session.getjid(), packet .getstanzato()); // ... putting it to results queue is enough results.offer(result); } } catch (notauthorizedexception e) { log.log(level.fine, "notauthorizedexception for packet: " + packet + " for session: " + session, e); results.offer(authorization.not_authorized.getresponsemessage(packet, "you must authorize session first.", true)); } // end of try-catch }
检查stanzato与session匹配通过后,根据session拿到接收方所有的连接(可能多端登陆),然后packet result = packet.copyelementonly()生成新的packet(原packet丢弃了),并将packetto设置为接收方连接的connectionid(例如:c2s@llooper/192.168.0.33_5222_192.168.0.33_38624),通过addoutpacket()方法塞到out_queue队列。
此时packet:packetfrom = sess-man@llooper,packetto =c2s@llooper/192.168.0.33_5222_192.168.0.33_38624。
8、 组件 c2s@llooper 从queue中取出packet,分发到目的地
public void processpacket(final packet packet) { ... if (packet.iscommand() && (packet.getcommand() != command.other)) { ... } else { // 把packet 发送给客户端 if (!writepackettosocket(packet)) { ... } } // end of else }
后续有时间会不断更新,欢迎加入qq群 310790965 更多的交流
如对本文有疑问, 点击进行留言回复!!
Flink程序JDK8 运行一段时间后NullException解决
解决: java.lang.NoClassDefFoundError: org/apache/http/client/HttpClient
SpringBoot中定制异常页面(404页面配置提高用户体验)
DataGrip和IDEA无法连接上Mysql问题解决方法详解
Java基础语法(多态、类、接口、Date类、基本类型、系统类)
网友评论