当前位置: 移动技术网 > IT编程>开发语言>Java > SpringBoot+Netty+WebSocket实现消息发送的示例代码

SpringBoot+Netty+WebSocket实现消息发送的示例代码

2020年09月21日  | 移动技术网IT编程  | 我要评论
一.导入netty依赖<dependency> <groupid>io.netty</groupid> <artifactid>netty-al

一.导入netty依赖

<dependency>
   <groupid>io.netty</groupid>
   <artifactid>netty-all</artifactid>
   <version>4.1.25.final</version>
  </dependency>

二.搭建websocket服务器

@component
public class websocketserver {

 /**
  * 主线程池
  */
 private eventloopgroup bossgroup;
 /**
  * 工作线程池
  */
 private eventloopgroup workergroup;
 /**
  * 服务器
  */
 private serverbootstrap server;
 /**
  * 回调
  */
 private channelfuture future;

 public void start() {
  future = server.bind(9001);
  system.out.println("netty server - 启动成功");
 }

 public websocketserver() {
  bossgroup = new nioeventloopgroup();
  workergroup = new nioeventloopgroup();

  server = new serverbootstrap();
  server.group(bossgroup, workergroup)
    .channel(nioserversocketchannel.class)
    .childhandler(new websocketinitializer());
 }
}

三.初始化websocket

public class websocketinitializer extends channelinitializer<socketchannel> {
 
 @override
 protected void initchannel(socketchannel ch) throws exception {
  channelpipeline pipeline = ch.pipeline();
  // ------------------
  // 用于支持http协议
  // ------------------
  // websocket基于http协议,需要有http的编解码器
  pipeline.addlast(new httpservercodec());
  // 对写大数据流的支持
  pipeline.addlast(new chunkedwritehandler());
  // 添加对http请求和响应的聚合器:只要使用netty进行http编程都需要使用
  //设置单次请求的文件的大小
  pipeline.addlast(new httpobjectaggregator(1024 * 64));
  //websocket 服务器处理的协议,用于指定给客户端连接访问的路由 :/ws
  pipeline.addlast(new websocketserverprotocolhandler("/ws"));
  // 添加netty空闲超时检查的支持
  // 1. 读空闲超时(超过一定的时间会发送对应的事件消息)
  // 2. 写空闲超时
  // 3. 读写空闲超时
  pipeline.addlast(new idlestatehandler(4, 8, 12));
  //添加心跳处理
  pipeline.addlast(new hearbeathandler());
  // 添加自定义的handler
  pipeline.addlast(new chathandler());

 }
}

四.创建netty监听器

@component
public class nettylistener implements applicationlistener<contextrefreshedevent> {

 @resource
 private websocketserver websocketserver;

 @override
 public void onapplicationevent(contextrefreshedevent event) {
  if(event.getapplicationcontext().getparent() == null) {
   try {
    websocketserver.start();
   } catch (exception e) {
    e.printstacktrace();
   }
  }
 }
}

五.建立消息通道

public class userchannelmap {
 /**
  * 用户保存用户id与通道的map对象
  */
// private static map<string, channel> userchannelmap;

 /* static {
  userchannelmap = new hashmap<string, channel>();
 }*/

 /**
  * 定义一个channel组,管理所有的channel
  * globaleventexecutor.instance 是全局的事件执行器,是一个单例
  */
 private static channelgroup channelgroup = new defaultchannelgroup(globaleventexecutor.instance);

 /**
  * 存放用户与chanel的对应信息,用于给指定用户发送消息
  */
 private static concurrenthashmap<string,channel> userchannelmap = new concurrenthashmap<>();

 private userchannelmap(){}
 /**
  * 添加用户id与channel的关联
  * @param usernum
  * @param channel
  */
 public static void put(string usernum, channel channel) {
  userchannelmap.put(usernum, channel);
 }

 /**
  * 根据用户id移除用户id与channel的关联
  * @param usernum
  */
 public static void remove(string usernum) {
  userchannelmap.remove(usernum);
 }

 /**
  * 根据通道id移除用户与channel的关联
  * @param channelid 通道的id
  */
 public static void removebychannelid(string channelid) {
  if(!stringutils.isnotblank(channelid)) {
   return;
  }
  for (string s : userchannelmap.keyset()) {
   channel channel = userchannelmap.get(s);
   if(channelid.equals(channel.id().aslongtext())) {
    system.out.println("客户端连接断开,取消用户" + s + "与通道" + channelid + "的关联");
    userchannelmap.remove(s);
    userservice userservice = springutil.getbean(userservice.class);
    userservice.logout(s);
    break;
   }
  }
 }

 /**
  * 打印所有的用户与通道的关联数据
  */
 public static void print() {
  for (string s : userchannelmap.keyset()) {
   system.out.println("用户id:" + s + " 通道:" + userchannelmap.get(s).id());
  }
 }

 /**
  * 根据好友id获取对应的通道
  * @param receivernum 接收人编号
  * @return netty通道
  */
 public static channel get(string receivernum) {
  return userchannelmap.get(receivernum);
 }

 /**
  * 获取channel组
  * @return
  */
 public static channelgroup getchannelgroup() {
  return channelgroup;
 }

 /**
  * 获取用户channel map
  * @return
  */
 public static concurrenthashmap<string,channel> getuserchannelmap(){
  return userchannelmap;
 }
}

六.自定义消息类型

public class message {
 /**
  * 消息类型
  */
 private integer type;
 /**
  * 聊天消息
  */
 private string message;
 /**
  * 扩展消息字段
  */
 private object ext;
 public integer gettype() {
  return type;
 }

 public void settype(integer type) {
  this.type = type;
 }

 public marketchatrecord getchatrecord() {
  return marketchatrecord;
 }
 public void setchatrecord(marketchatrecord chatrecord) {
  this.marketchatrecord = chatrecord;
 }

 public object getext() {
  return ext;
 }

 public void setext(object ext) {
  this.ext = ext;
 }

 @override
 public string tostring() {
  return "message{" +
    "type=" + type +
    ", marketchatrecord=" + marketchatrecord +
    ", ext=" + ext +
    '}';
 }

}

七.创建处理消息的handler

public class chathandler extends simplechannelinboundhandler<textwebsocketframe> {
 private static final logger log = loggerfactory.getlogger(websocketserver.class);


 /**
  * 用来保存所有的客户端连接
  */
 private static channelgroup clients = new defaultchannelgroup(globaleventexecutor.instance);

 /**
  *当channel中有新的事件消息会自动调用
  */
 @override
 protected void channelread0(channelhandlercontext ctx, textwebsocketframe msg) throws exception {
  // 当接收到数据后会自动调用
  // 获取客户端发送过来的文本消息
  gson gson = new gson();
  log.info("服务器收到消息:{}",msg.text());
  system.out.println("接收到消息数据为:" + msg.text());
  message message = gson.fromjson(msg.text(), message.class); 
//根据业务要求进行消息处理
  switch (message.gettype()) {
   // 处理客户端连接的消息
   case 0:
    // 建立用户与通道的关联
   // 处理客户端发送好友消息
    break;
   case 1:
   // 处理客户端的签收消息
    break;
   case 2:
    // 将消息记录设置为已读
    break;
   case 3:
    // 接收心跳消息
    break;
   default:
    break;
  }

 }

 // 当有新的客户端连接服务器之后,会自动调用这个方法
 @override
 public void handleradded(channelhandlercontext ctx) throws exception {
  log.info("handleradded 被调用"+ctx.channel().id().aslongtext());
  // 添加到channelgroup 通道组
  userchannelmap.getchannelgroup().add(ctx.channel());
//  clients.add(ctx.channel());
 }

 @override
 public void exceptioncaught(channelhandlercontext ctx, throwable cause) throws exception {
  log.info("{异常:}"+cause.getmessage());
  // 删除通道
  userchannelmap.getchannelgroup().remove(ctx.channel());
  userchannelmap.removebychannelid(ctx.channel().id().aslongtext());
  ctx.channel().close();
 }

 @override
 public void handlerremoved(channelhandlercontext ctx) throws exception {
  log.info("handlerremoved 被调用"+ctx.channel().id().aslongtext());
  //删除通道
  userchannelmap.getchannelgroup().remove(ctx.channel());
  userchannelmap.removebychannelid(ctx.channel().id().aslongtext());
  userchannelmap.print();
 }

}

八.处理心跳

public class hearbeathandler extends channelinboundhandleradapter {

 @override
 public void usereventtriggered(channelhandlercontext ctx, object evt) throws exception {
  if(evt instanceof idlestateevent) {
   idlestateevent idlestateevent = (idlestateevent)evt;

   if(idlestateevent.state() == idlestate.reader_idle) {
    system.out.println("读空闲事件触发...");
   }
   else if(idlestateevent.state() == idlestate.writer_idle) {
    system.out.println("写空闲事件触发...");
   }
   else if(idlestateevent.state() == idlestate.all_idle) {
    system.out.println("---------------");
    system.out.println("读写空闲事件触发");
    system.out.println("关闭通道资源");
    ctx.channel().close();
   }
  }
 }
}

搭建完成后调用测试

1.页面访问http://localhost:9001/ws
 2.端口号9001和访问路径ws都是我们在上边配置的,然后传入我们自定义的消息message类型。
3.大概流程:消息发送 :用户1先连接通道,然后发送消息给用户2,用户2若是在线直接可以发送给用户,若没在线可以将消息暂存在redis或者通道里,用户2链接通道的话,两者可以直接通讯。
消息推送 :用户1连接通道,根据通道id查询要推送的人是否在线,或者推送给所有人,这里我只推送给指定的人。

到此这篇关于springboot+netty+websocket实现消息发送的示例代码的文章就介绍到这了,更多相关springboot netty websocket消息发送内容请搜索移动技术网以前的文章或继续浏览下面的相关文章希望大家以后多多支持移动技术网!

如您对本文有疑问或者有任何想说的,请点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网