当前位置: 移动技术网 > IT编程>开发语言>Java > SpringBoot webSocket实现发送广播、点对点消息和Android接收

SpringBoot webSocket实现发送广播、点对点消息和Android接收

2019年07月22日  | 移动技术网IT编程  | 我要评论

1、springboot websocket

springboot 使用的websocket 协议,不是标准的websocket协议,使用的是名称叫做stomp的协议。

1.1 stomp协议说明

stomp,streaming text orientated message protocol,是流文本定向消息协议,是一种为mom(message oriented middleware,面向消息的中间件)设计的简单文本协议。

它提供了一个可互操作的连接格式,允许stomp客户端与任意stomp消息代理(broker)进行交互,类似于openwire(一种二进制协议)。

由于其设计简单,很容易开发客户端,因此在多种语言和多种平台上得到广泛应用。其中最流行的stomp消息代理是apache activemq。

1.2 搭建

本人使用的是inject idea 搭建的springboot websocket,并未采用熟悉的gradle,而是采用了maven方式搭建。

项目结构如下

 

pom.xml:

<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
 xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelversion>4.0.0</modelversion>

 <groupid>com.drawthink</groupid>
 <artifactid>websocketdemo</artifactid>
 <version>0.0.1-snapshot</version>
 <packaging>jar</packaging>

 <name>websocketdemo</name>
 <description>websocketdemo project for spring boot</description>

 <parent>
  <groupid>org.springframework.boot</groupid>
  <artifactid>spring-boot-starter-parent</artifactid>
  <version>1.3.6.release</version>
  <relativepath/> <!-- lookup parent from repository -->
 </parent>

 <properties>
  <project.build.sourceencoding>utf-8</project.build.sourceencoding>
  <project.reporting.outputencoding>utf-8</project.reporting.outputencoding>
  <java.version>1.8</java.version>
 </properties>

 <dependencies>
  <dependency>
   <groupid>org.springframework.boot</groupid>
   <artifactid>spring-boot-starter-thymeleaf</artifactid>
  </dependency>
  <dependency>
   <groupid>org.springframework.boot</groupid>
   <artifactid>spring-boot-starter-websocket</artifactid>
  </dependency>

  <dependency>
   <groupid>org.springframework.boot</groupid>
   <artifactid>spring-boot-starter-test</artifactid>
   <scope>test</scope>
  </dependency>
 </dependencies>

 <build>
  <plugins>
   <plugin>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-maven-plugin</artifactid>
   </plugin>
  </plugins>
 </build>


</project>

application:

package com.drawthink;

import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;

@springbootapplication
public class websocketdemoapplication {

 public static void main(string[] args) {
  springapplication.run(websocketdemoapplication.class, args);
 }
}

websocketconfig

package com.drawthink.websocket;

import org.springframework.context.annotation.configuration;
import org.springframework.messaging.simp.config.messagebrokerregistry;
import org.springframework.web.socket.config.annotation.abstractwebsocketmessagebrokerconfigurer;
import org.springframework.web.socket.config.annotation.enablewebsocketmessagebroker;
import org.springframework.web.socket.config.annotation.stompendpointregistry;

/**
 * created by lincoln on 16-10-25
 */
@configuration
@enablewebsocketmessagebroker
public class websocketconfig extends abstractwebsocketmessagebrokerconfigurer {
 @override
 public void registerstompendpoints(stompendpointregistry stompendpointregistry) {
  //允许使用socketjs方式访问,访问点为hello,允许跨域
  stompendpointregistry.addendpoint("/hello").setallowedorigins("*").withsockjs();
 }

 @override
 public void configuremessagebroker(messagebrokerregistry registry) {
  //订阅broker名称
  registry.enablesimplebroker("/topic","/user");
  //全局使用的订阅前缀(客户端订阅路径上会体现出来)
  registry.setapplicationdestinationprefixes("/app/");
  //点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/
  //registry.setuserdestinationprefix("/user/");
 }
}

websocketcontroller

package com.drawthink.websocket.controller;

import com.drawthink.message.clientmessage;
import com.drawthink.message.servermessage;
import com.drawthink.message.tousermessage;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.messaging.handler.annotation.messagemapping;
import org.springframework.messaging.handler.annotation.sendto;
import org.springframework.messaging.simp.simpmessagingtemplate;
import org.springframework.stereotype.controller;

/**
 * created by lincoln on 16-10-25
 */
@controller
public class websocketcontroller {

 @messagemapping("/welcome")
 //sendto 发送至 broker 下的指定订阅路径
 @sendto("/topic/getresponse")
 public servermessage say(clientmessage clientmessage){
  //方法用于广播测试
  system.out.println("clientmessage.getname() = " + clientmessage.getname());
  return new servermessage("welcome , "+clientmessage.getname()+" !");
 }

 //注入simpmessagingtemplate 用于点对点消息发送
 @autowired
 private simpmessagingtemplate messagingtemplate;

 @messagemapping("/cheat")
 // 发送的订阅路径为/user/{userid}/message
 // /user/路径是默认的一个,如果想要改变,必须在config 中setuserdestinationprefix
 public void cheatto(tousermessage tousermessage){
  //方法用于点对点测试
  system.out.println("tousermessage.getmessage() = " + tousermessage.getmessage());
  system.out.println("tousermessage.getuserid() = " + tousermessage.getuserid());          messagingtemplate.convertandsendtouser(tousermessage.getuserid(),"/message",tousermessage.getmessage());
 }
}

vo

package com.drawthink.message;

/**
 * created by lincoln on 16-10-25
 */
public class clientmessage {
 private string name;

 public string getname() {
  return name;
 }

 public void setname(string name) {
  this.name = name;
 }
}
package com.drawthink.message;

/**
 * created by lincoln on 16-10-25
 */
public class servermessage {
 private string responsemessage;

 public servermessage(string responsemessage) {
  this.responsemessage = responsemessage;
 }

 public string getresponsemessage() {
  return responsemessage;
 }

 public void setresponsemessage(string responsemessage) {
  this.responsemessage = responsemessage;
 }
}

package com.drawthink.message;

/**
 * created by lincoln on 16-10-25
 */
public class tousermessage {
 private string userid;
 private string message;

 public string getuserid() {
  return userid;
 }

 public void setuserid(string userid) {
  this.userid = userid;
 }

 public string getmessage() {
  return message;
 }

 public void setmessage(string message) {
  this.message = message;
 }
}

android 客户端

stomp协议在android系统中没有默认实现,必须自行去实现。不过好消息是,开源大神们已经完成了android上使用stomp协议的实现,所以我们只需要使用就好了。

地址:

搭建

build.gradle(app)

apply plugin: 'com.android.application'

android {
 compilesdkversion 24
 buildtoolsversion "24.0.3"
 defaultconfig {
  applicationid "com.drawthink.websocket"
  minsdkversion 16
  targetsdkversion 24
  versioncode 1
  versionname "1.0"
  testinstrumentationrunner "android.support.test.runner.androidjunitrunner"
 }
 buildtypes {
  release {
   minifyenabled false
   proguardfiles getdefaultproguardfile('proguard-android.txt'), 'proguard-rules.pro'
  }
 }
}

dependencies {
 compile filetree(include: ['*.jar'], dir: 'libs')
 androidtestcompile('com.android.support.test.espresso:espresso-core:2.2.2', {
  exclude group: 'com.android.support', module: 'support-annotations'
 })
 compile 'com.android.support:appcompat-v7:24.2.1'
 testcompile 'junit:junit:4.12'
 //依赖stomp协议的android实现
 compile 'com.github.naiksoftware:stompprotocolandroid:1.1.1'
 //stompprotocolandroid 依赖于websocket的标准实现
 compile 'org.java-websocket:java-websocket:1.3.0'
}

接收广播实例:

package com.drawthink.websocket;

import android.content.intent;
import android.os.bundle;
import android.support.v7.app.appcompatactivity;
import android.util.log;
import android.view.view;
import android.widget.button;
import android.widget.edittext;
import android.widget.textview;
import android.widget.toast;

import org.java_websocket.websocket;

import rx.subscriber;
import rx.functions.action1;
import ua.naiksoftware.stomp.lifecycleevent;
import ua.naiksoftware.stomp.stomp;
import ua.naiksoftware.stomp.client.stompclient;
import ua.naiksoftware.stomp.client.stompmessage;

import static android.content.contentvalues.tag;

public class mainactivity extends appcompatactivity {

 private textview servermessage;
 private button start;
 private button stop;
 private button send;
 private edittext edittext;
 private stompclient mstompclient;
 private button cheat;

 @override
 protected void oncreate(bundle savedinstancestate) {
  super.oncreate(savedinstancestate);
  setcontentview(r.layout.activity_main);
  bindview();
  start.setonclicklistener(new view.onclicklistener() {
   @override
   public void onclick(view v) {
   //创建client 实例
    createstompclient();
   //订阅消息
    registerstomptopic();
   }
  });

  send.setonclicklistener(new view.onclicklistener() {
   @override
   public void onclick(view v) {
    mstompclient.send("/app/welcome","{\"name\":\""+edittext.gettext()+"\"}")
      .subscribe(new subscriber<void>() {
     @override
     public void oncompleted() {
      toast("发送成功");
     }

     @override
     public void onerror(throwable e) {
      e.printstacktrace();
      toast("发送错误");
     }

     @override
     public void onnext(void avoid) {

     }
    });
   }
  });

  stop.setonclicklistener(new view.onclicklistener() {
   @override
   public void onclick(view v) {
    mstompclient.disconnect();
   }
  });

  cheat.setonclicklistener(new view.onclicklistener() {
   @override
   public void onclick(view v) {
    startactivity(new intent(mainactivity.this,cheatactivity.class));
    if(mstompclient != null) {
     mstompclient.disconnect();
    }
    finish();
   }
  });
 }

 private void showmessage(final stompmessage stompmessage) {
  runonuithread(new runnable() {
   @override
   public void run() {
    servermessage.settext("stomp command is --->"+stompmessage.getstompcommand() +" body is --->"+stompmessage.getpayload());
   }
  });
 }

 //创建client 实例
 private void createstompclient() {
  mstompclient = stomp.over(websocket.class, "ws://192.168.0.46:8080/hello/websocket");
  mstompclient.connect();
  toast.maketext(mainactivity.this,"开始连接 192.168.0.46:8080",toast.length_short).show();
  mstompclient.lifecycle().subscribe(new action1<lifecycleevent>() {
   @override
   public void call(lifecycleevent lifecycleevent) {
    switch (lifecycleevent.gettype()) {
     case opened:
      log.d(tag, "stomp connection opened");
      toast("连接已开启");
      break;

     case error:
      log.e(tag, "stomp error", lifecycleevent.getexception());
      toast("连接出错");
      break;
     case closed:
      log.d(tag, "stomp connection closed");
      toast("连接关闭");
      break;
    }
   }
  });
 }

 //订阅消息
 private void registerstomptopic() {
  mstompclient.topic("/topic/getresponse").subscribe(new action1<stompmessage>() {
   @override
   public void call(stompmessage stompmessage) {
    log.e(tag, "call: " +stompmessage.getpayload() );
    showmessage(stompmessage);
   }
  });

 }

 private void toast(final string message) {
  runonuithread(new runnable() {
   @override
   public void run() {
    toast.maketext(mainactivity.this,message,toast.length_short).show();
   }
  });
 }

 private void bindview() {
  servermessage = (textview) findviewbyid(r.id.servermessage);
  start = (button) findviewbyid(r.id.start);
  stop = (button) findviewbyid(r.id.stop);
  send = (button) findviewbyid(r.id.send);
  edittext = (edittext) findviewbyid(r.id.clientmessage);
  cheat = (button) findviewbyid(r.id.cheat);
 }
}

点对点

package com.drawthink.websocket;

import android.os.bundle;
import android.support.v7.app.appcompatactivity;
import android.util.log;
import android.view.view;
import android.widget.button;
import android.widget.edittext;
import android.widget.linearlayout;
import android.widget.textview;
import android.widget.toast;

import org.java_websocket.websocket;

import rx.subscriber;
import rx.functions.action1;
import ua.naiksoftware.stomp.lifecycleevent;
import ua.naiksoftware.stomp.stomp;
import ua.naiksoftware.stomp.client.stompclient;
import ua.naiksoftware.stomp.client.stompmessage;

import static android.content.contentvalues.tag;

public class cheatactivity extends appcompatactivity {

 private edittext cheat;
 private button send;
 private linearlayout message;
 private stompclient mstompclient;

 @override
 protected void oncreate(bundle savedinstancestate) {
  super.oncreate(savedinstancestate);
  setcontentview(r.layout.activity_cheat);
  bindview();
  createstompclient();
  registerstomptopic();
  send.setonclicklistener(new view.onclicklistener() {
   @override
   public void onclick(view v) {
   // 向/app/cheat发送json数据
    mstompclient.send("/app/cheat","{\"userid\":\"lincoln\",\"message\":\""+cheat.gettext()+"\"}")
      .subscribe(new subscriber<void>() {
       @override
       public void oncompleted() {
        toast("发送成功");
       }

       @override
       public void onerror(throwable e) {
        e.printstacktrace();
        toast("发送错误");
       }

       @override
       public void onnext(void avoid) {

       }
      });
   }
  });
 }

 private void bindview() {
  cheat = (edittext) findviewbyid(r.id.cheat);
  send = (button) findviewbyid(r.id.send);
  message = (linearlayout) findviewbyid(r.id.message);
 }

 private void createstompclient() {
  mstompclient = stomp.over(websocket.class, "ws://192.168.0.46:8080/hello/websocket");
  mstompclient.connect();
  toast.maketext(cheatactivity.this,"开始连接 192.168.0.46:8080",toast.length_short).show();
  mstompclient.lifecycle().subscribe(new action1<lifecycleevent>() {
   @override
   public void call(lifecycleevent lifecycleevent) {
    switch (lifecycleevent.gettype()) {
     case opened:
      log.d(tag, "stomp connection opened");
      toast("连接已开启");
      break;

     case error:
      log.e(tag, "stomp error", lifecycleevent.getexception());
      toast("连接出错");
      break;
     case closed:
      log.d(tag, "stomp connection closed");
      toast("连接关闭");
      break;
    }
   }
  });
 }

 // 接收/user/xiaoli/message路径发布的消息
 private void registerstomptopic() {
  mstompclient.topic("/user/xiaoli/message").subscribe(new action1<stompmessage>() {
   @override
   public void call(stompmessage stompmessage) {
    log.e(tag, "call: " +stompmessage.getpayload() );
    showmessage(stompmessage);
   }
  });
 }

 private void showmessage(final stompmessage stompmessage) {
  runonuithread(new runnable() {
   @override
   public void run() {
    textview text = new textview(cheatactivity.this);
    text.setlayoutparams(new linearlayout.layoutparams(linearlayout.layoutparams.match_parent, linearlayout.layoutparams.wrap_content));
    text.settext(system.currenttimemillis() +" body is --->"+stompmessage.getpayload());
    message.addview(text);
   }
  });
 }


 private void toast(final string message) {
  runonuithread(new runnable() {
   @override
   public void run() {
    toast.maketext(cheatactivity.this,message,toast.length_short).show();
   }
  });
 }
}

代码比较乱,说明一下。

1、stomp 使用的时候,关键是发布订阅的关系,使用过消息队列,例如rabbitmq的应该很容易理解。

服务器端 websocketconfig.java文件控制的就是订阅发布的路径关系。

2、websocket的路径说明,本例中连接的是ws://192.168.0.46:8080/hello/websocket路径,/hello是在websocketconfig的stompendpointregistry.addendpoint(“/hello”).setallowedorigins(““).withsockjs();*确定的, 如果有多个endpoint,这个地方的路径也会随之变化。

3、发布路径

发布信息的路径是由websocketconfig中的 setapplicationdestinationprefixes(“/app/”); 和 controller 中@messagemapping(“/welcome”) 组合确定的。

例如发广播消息,路径为/app/welcome

例如发点对点消息,路径为/app/cheat

4、消息订阅路径

订阅broker源自websocketconfig中的registry.enablesimplebroker(“/topic”,”/user”);此处开放了两个broker,具体的订阅服务路径给基于controller中的 @sendto(“/topic/getresponse”)或simpmessagingtemplate中给定。(注:此处,服务器和客户端须约定订阅路径)

5、关于心跳

订阅发布模型的心跳很简单,客户端向一个指定的心跳路径发送心跳,服务器处理,服务器使用指定的订阅路径向客户端发心跳,即可。因为没有socket,只需要记录是否联通的状态即可,重连客户端做一下就好了。

本人菜鸟,肯定有些地方没有搞清楚,如果有误,请大神斧正。

代码下载地址:

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持移动技术网。

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网