当前位置: 移动技术网 > IT编程>开发语言>Java > Java利用Sping框架编写RPC远程过程调用服务的教程

Java利用Sping框架编写RPC远程过程调用服务的教程

2019年07月22日  | 移动技术网IT编程  | 我要评论

rpc,即 remote procedure call(远程过程调用),说得通俗一点就是:调用远程计算机上的服务,就像调用本地服务一样。

rpc 可基于 http 或 tcp 协议,web service 就是基于 http 协议的 rpc,它具有良好的跨平台性,但其性能却不如基于 tcp 协议的 rpc。会两方面会直接影响 rpc 的性能,一是传输方式,二是序列化。

众所周知,tcp 是传输层协议,http 是应用层协议,而传输层较应用层更加底层,在数据传输方面,越底层越快,因此,在一般情况下,tcp 一定比 http 快。就序列化而言,java 提供了默认的序列化方式,但在高并发的情况下,这种方式将会带来一些性能上的瓶颈,于是市面上出现了一系列优秀的序列化框架,比如:protobuf、kryo、hessian、jackson 等,它们可以取代 java 默认的序列化,从而提供更高效的性能。

为了支持高并发,传统的阻塞式 io 显然不太合适,因此我们需要异步的 io,即 nio。java 提供了 nio 的解决方案,java 7 也提供了更优秀的 nio.2 支持,用 java 实现 nio 并不是遥不可及的事情,只是需要我们熟悉 nio 的技术细节。

我们需要将服务部署在分布式环境下的不同节点上,通过服务注册的方式,让客户端来自动发现当前可用的服务,并调用这些服务。这需要一种服务注册表(service registry)的组件,让它来注册分布式环境下所有的服务地址(包括:主机名与端口号)。

应用、服务、服务注册表之间的关系见下图:

2016621181737352.png (801×292)

每台 server 上可发布多个 service,这些 service 共用一个 host 与 port,在分布式环境下会提供 server 共同对外提供 service。此外,为防止 service registry 出现单点故障,因此需要将其搭建为集群环境。

本文将为您揭晓开发轻量级分布式 rpc 框架的具体过程,该框架基于 tcp 协议,提供了 nio 特性,提供高效的序列化方式,同时也具备服务注册与发现的能力。

根据以上技术需求,我们可使用如下技术选型:

  • spring:它是最强大的依赖注入框架,也是业界的权威标准。
  • netty:它使 nio 编程更加容易,屏蔽了 java 底层的 nio 细节。
  • protostuff:它基于 protobuf 序列化框架,面向 pojo,无需编写 .proto 文件。
  • zookeeper:提供服务注册与发现功能,开发分布式系统的必备选择,同时它也具备天生的集群能力。

相关 maven 依赖请见最后附录。

第一步:编写服务接口

public interface helloservice {

  string hello(string name);
}

将该接口放在独立的客户端 jar 包中,以供应用使用。

第二步:编写服务接口的实现类

@rpcservice(helloservice.class) // 指定远程接口
public class helloserviceimpl implements helloservice {

  @override
  public string hello(string name) {
    return "hello! " + name;
  }
}

使用rpcservice注解定义在服务接口的实现类上,需要对该实现类指定远程接口,因为实现类可能会实现多个接口,一定要告诉框架哪个才是远程接口。

rpcservice代码如下:

@target({elementtype.type})
@retention(retentionpolicy.runtime)
@component // 表明可被 spring 扫描
public @interface rpcservice {

  class<?> value();
}

该注解具备 spring 的component注解的特性,可被 spring 扫描。

该实现类放在服务端 jar 包中,该 jar 包还提供了一些服务端的配置文件与启动服务的引导程序。

第三步:配置服务端

服务端 spring 配置文件名为spring.xml,内容如下:

<beans ...>
  <context:component-scan base-package="com.xxx.rpc.sample.server"/>

  <context:property-placeholder location="classpath:config.properties"/>

  <!-- 配置服务注册组件 -->
  <bean id="serviceregistry" class="com.xxx.rpc.registry.serviceregistry">
    <constructor-arg name="registryaddress" value="${registry.address}"/>
  </bean>

  <!-- 配置 rpc 服务器 -->
  <bean id="rpcserver" class="com.xxx.rpc.server.rpcserver">
    <constructor-arg name="serveraddress" value="${server.address}"/>
    <constructor-arg name="serviceregistry" ref="serviceregistry"/>
  </bean>
</beans>

具体的配置参数在config.properties文件中,内容如下:

# zookeeper 服务器
registry.address=127.0.0.1:2181

# rpc 服务器
server.address=127.0.0.1:8000

以上配置表明:连接本地的 zookeeper 服务器,并在 8000 端口上发布 rpc 服务。

第四步:启动服务器并发布服务

为了加载 spring 配置文件来发布服务,只需编写一个引导程序即可:

public class rpcbootstrap {

  public static void main(string[] args) {
    new classpathxmlapplicationcontext("spring.xml");
  }
}

运行rpcbootstrap类的main方法即可启动服务端,但还有两个重要的组件尚未实现,它们分别是:serviceregistry与rpcserver,下文会给出具体实现细节。

第五步:实现服务注册

使用 zookeeper 客户端可轻松实现服务注册功能,serviceregistry代码如下:

public class serviceregistry {

  private static final logger logger = loggerfactory.getlogger(serviceregistry.class);

  private countdownlatch latch = new countdownlatch(1);

  private string registryaddress;

  public serviceregistry(string registryaddress) {
    this.registryaddress = registryaddress;
  }

  public void register(string data) {
    if (data != null) {
      zookeeper zk = connectserver();
      if (zk != null) {
        createnode(zk, data);
      }
    }
  }

  private zookeeper connectserver() {
    zookeeper zk = null;
    try {
      zk = new zookeeper(registryaddress, constant.zk_session_timeout, new watcher() {
        @override
        public void process(watchedevent event) {
          if (event.getstate() == event.keeperstate.syncconnected) {
            latch.countdown();
          }
        }
      });
      latch.await();
    } catch (ioexception | interruptedexception e) {
      logger.error("", e);
    }
    return zk;
  }

  private void createnode(zookeeper zk, string data) {
    try {
      byte[] bytes = data.getbytes();
      string path = zk.create(constant.zk_data_path, bytes, zoodefs.ids.open_acl_unsafe, createmode.ephemeral_sequential);
      logger.debug("create zookeeper node ({} => {})", path, data);
    } catch (keeperexception | interruptedexception e) {
      logger.error("", e);
    }
  }
}

其中,通过constant配置了所有的常量:

public interface constant {

  int zk_session_timeout = 5000;

  string zk_registry_path = "/registry";
  string zk_data_path = zk_registry_path + "/data";
}

注意:首先需要使用 zookeeper 客户端命令行创建/registry永久节点,用于存放所有的服务临时节点。

第六步:实现 rpc 服务器

使用 netty 可实现一个支持 nio 的 rpc 服务器,需要使用serviceregistry注册服务地址,rpcserver代码如下:

public class rpcserver implements applicationcontextaware, initializingbean {

  private static final logger logger = loggerfactory.getlogger(rpcserver.class);

  private string serveraddress;
  private serviceregistry serviceregistry;

  private map<string, object> handlermap = new hashmap<>(); // 存放接口名与服务对象之间的映射关系

  public rpcserver(string serveraddress) {
    this.serveraddress = serveraddress;
  }

  public rpcserver(string serveraddress, serviceregistry serviceregistry) {
    this.serveraddress = serveraddress;
    this.serviceregistry = serviceregistry;
  }

  @override
  public void setapplicationcontext(applicationcontext ctx) throws beansexception {
    map<string, object> servicebeanmap = ctx.getbeanswithannotation(rpcservice.class); // 获取所有带有 rpcservice 注解的 spring bean
    if (maputils.isnotempty(servicebeanmap)) {
      for (object servicebean : servicebeanmap.values()) {
        string interfacename = servicebean.getclass().getannotation(rpcservice.class).value().getname();
        handlermap.put(interfacename, servicebean);
      }
    }
  }

  @override
  public void afterpropertiesset() throws exception {
    eventloopgroup bossgroup = new nioeventloopgroup();
    eventloopgroup workergroup = new nioeventloopgroup();
    try {
      serverbootstrap bootstrap = new serverbootstrap();
      bootstrap.group(bossgroup, workergroup).channel(nioserversocketchannel.class)
        .childhandler(new channelinitializer<socketchannel>() {
          @override
          public void initchannel(socketchannel channel) throws exception {
            channel.pipeline()
              .addlast(new rpcdecoder(rpcrequest.class)) // 将 rpc 请求进行解码(为了处理请求)
              .addlast(new rpcencoder(rpcresponse.class)) // 将 rpc 响应进行编码(为了返回响应)
              .addlast(new rpchandler(handlermap)); // 处理 rpc 请求
          }
        })
        .option(channeloption.so_backlog, 128)
        .childoption(channeloption.so_keepalive, true);

      string[] array = serveraddress.split(":");
      string host = array[0];
      int port = integer.parseint(array[1]);

      channelfuture future = bootstrap.bind(host, port).sync();
      logger.debug("server started on port {}", port);

      if (serviceregistry != null) {
        serviceregistry.register(serveraddress); // 注册服务地址
      }

      future.channel().closefuture().sync();
    } finally {
      workergroup.shutdowngracefully();
      bossgroup.shutdowngracefully();
    }
  }
}

以上代码中,有两个重要的 pojo 需要描述一下,它们分别是rpcrequest与rpcresponse。

使用rpcrequest封装 rpc 请求,代码如下:

public class rpcrequest {

  private string requestid;
  private string classname;
  private string methodname;
  private class<?>[] parametertypes;
  private object[] parameters;

  // getter/setter...
}

使用rpcresponse封装 rpc 响应,代码如下:

public class rpcresponse {

  private string requestid;
  private throwable error;
  private object result;

  // getter/setter...
}

使用rpcdecoder提供 rpc 解码,只需扩展 netty 的bytetomessagedecoder抽象类的decode方法即可,代码如下:

public class rpcdecoder extends bytetomessagedecoder {

  private class<?> genericclass;

  public rpcdecoder(class<?> genericclass) {
    this.genericclass = genericclass;
  }

  @override
  public void decode(channelhandlercontext ctx, bytebuf in, list<object> out) throws exception {
    if (in.readablebytes() < 4) {
      return;
    }
    in.markreaderindex();
    int datalength = in.readint();
    if (datalength < 0) {
      ctx.close();
    }
    if (in.readablebytes() < datalength) {
      in.resetreaderindex();
      return;
    }
    byte[] data = new byte[datalength];
    in.readbytes(data);

    object obj = serializationutil.deserialize(data, genericclass);
    out.add(obj);
  }
}

使用rpcencoder提供 rpc 编码,只需扩展 netty 的messagetobyteencoder抽象类的encode方法即可,代码如下:

public class rpcencoder extends messagetobyteencoder {

  private class<?> genericclass;

  public rpcencoder(class<?> genericclass) {
    this.genericclass = genericclass;
  }

  @override
  public void encode(channelhandlercontext ctx, object in, bytebuf out) throws exception {
    if (genericclass.isinstance(in)) {
      byte[] data = serializationutil.serialize(in);
      out.writeint(data.length);
      out.writebytes(data);
    }
  }
}

编写一个serializationutil工具类,使用protostuff实现序列化:

public class serializationutil {

  private static map<class<?>, schema<?>> cachedschema = new concurrenthashmap<>();

  private static objenesis objenesis = new objenesisstd(true);

  private serializationutil() {
  }

  @suppresswarnings("unchecked")
  private static <t> schema<t> getschema(class<t> cls) {
    schema<t> schema = (schema<t>) cachedschema.get(cls);
    if (schema == null) {
      schema = runtimeschema.createfrom(cls);
      if (schema != null) {
        cachedschema.put(cls, schema);
      }
    }
    return schema;
  }

  @suppresswarnings("unchecked")
  public static <t> byte[] serialize(t obj) {
    class<t> cls = (class<t>) obj.getclass();
    linkedbuffer buffer = linkedbuffer.allocate(linkedbuffer.default_buffer_size);
    try {
      schema<t> schema = getschema(cls);
      return protostuffioutil.tobytearray(obj, schema, buffer);
    } catch (exception e) {
      throw new illegalstateexception(e.getmessage(), e);
    } finally {
      buffer.clear();
    }
  }

  public static <t> t deserialize(byte[] data, class<t> cls) {
    try {
      t message = (t) objenesis.newinstance(cls);
      schema<t> schema = getschema(cls);
      protostuffioutil.mergefrom(data, message, schema);
      return message;
    } catch (exception e) {
      throw new illegalstateexception(e.getmessage(), e);
    }
  }
}

以上了使用 objenesis 来实例化对象,它是比 java 反射更加强大。

注意:如需要替换其它序列化框架,只需修改serializationutil即可。当然,更好的实现方式是提供配置项来决定使用哪种序列化方式。

使用rpchandler中处理 rpc 请求,只需扩展 netty 的simplechannelinboundhandler抽象类即可,代码如下:

public class rpchandler extends simplechannelinboundhandler<rpcrequest> {

  private static final logger logger = loggerfactory.getlogger(rpchandler.class);

  private final map<string, object> handlermap;

  public rpchandler(map<string, object> handlermap) {
    this.handlermap = handlermap;
  }

  @override
  public void channelread0(final channelhandlercontext ctx, rpcrequest request) throws exception {
    rpcresponse response = new rpcresponse();
    response.setrequestid(request.getrequestid());
    try {
      object result = handle(request);
      response.setresult(result);
    } catch (throwable t) {
      response.seterror(t);
    }
    ctx.writeandflush(response).addlistener(channelfuturelistener.close);
  }

  private object handle(rpcrequest request) throws throwable {
    string classname = request.getclassname();
    object servicebean = handlermap.get(classname);

    class<?> serviceclass = servicebean.getclass();
    string methodname = request.getmethodname();
    class<?>[] parametertypes = request.getparametertypes();
    object[] parameters = request.getparameters();

    /*method method = serviceclass.getmethod(methodname, parametertypes);
    method.setaccessible(true);
    return method.invoke(servicebean, parameters);*/

    fastclass servicefastclass = fastclass.create(serviceclass);
    fastmethod servicefastmethod = servicefastclass.getmethod(methodname, parametertypes);
    return servicefastmethod.invoke(servicebean, parameters);
  }

  @override
  public void exceptioncaught(channelhandlercontext ctx, throwable cause) {
    logger.error("server caught exception", cause);
    ctx.close();
  }
}

为了避免使用 java 反射带来的性能问题,我们可以使用 cglib 提供的反射 api,如上面用到的fastclass与fastmethod。

第七步:配置客户端

同样使用 spring 配置文件来配置 rpc 客户端,spring.xml代码如下:

<beans ...>
  <context:property-placeholder location="classpath:config.properties"/>

  <!-- 配置服务发现组件 -->
  <bean id="servicediscovery" class="com.xxx.rpc.registry.servicediscovery">
    <constructor-arg name="registryaddress" value="${registry.address}"/>
  </bean>

  <!-- 配置 rpc 代理 -->
  <bean id="rpcproxy" class="com.xxx.rpc.client.rpcproxy">
    <constructor-arg name="servicediscovery" ref="servicediscovery"/>
  </bean>
</beans>

其中config.properties提供了具体的配置:

# zookeeper 服务器
registry.address=127.0.0.1:2181

第八步:实现服务发现

同样使用 zookeeper 实现服务发现功能,见如下代码:

public class servicediscovery {

  private static final logger logger = loggerfactory.getlogger(servicediscovery.class);

  private countdownlatch latch = new countdownlatch(1);

  private volatile list<string> datalist = new arraylist<>();

  private string registryaddress;

  public servicediscovery(string registryaddress) {
    this.registryaddress = registryaddress;

    zookeeper zk = connectserver();
    if (zk != null) {
      watchnode(zk);
    }
  }

  public string discover() {
    string data = null;
    int size = datalist.size();
    if (size > 0) {
      if (size == 1) {
        data = datalist.get(0);
        logger.debug("using only data: {}", data);
      } else {
        data = datalist.get(threadlocalrandom.current().nextint(size));
        logger.debug("using random data: {}", data);
      }
    }
    return data;
  }

  private zookeeper connectserver() {
    zookeeper zk = null;
    try {
      zk = new zookeeper(registryaddress, constant.zk_session_timeout, new watcher() {
        @override
        public void process(watchedevent event) {
          if (event.getstate() == event.keeperstate.syncconnected) {
            latch.countdown();
          }
        }
      });
      latch.await();
    } catch (ioexception | interruptedexception e) {
      logger.error("", e);
    }
    return zk;
  }

  private void watchnode(final zookeeper zk) {
    try {
      list<string> nodelist = zk.getchildren(constant.zk_registry_path, new watcher() {
        @override
        public void process(watchedevent event) {
          if (event.gettype() == event.eventtype.nodechildrenchanged) {
            watchnode(zk);
          }
        }
      });
      list<string> datalist = new arraylist<>();
      for (string node : nodelist) {
        byte[] bytes = zk.getdata(constant.zk_registry_path + "/" + node, false, null);
        datalist.add(new string(bytes));
      }
      logger.debug("node data: {}", datalist);
      this.datalist = datalist;
    } catch (keeperexception | interruptedexception e) {
      logger.error("", e);
    }
  }
}

第九步:实现 rpc 代理

这里使用 java 提供的动态代理技术实现 rpc 代理(当然也可以使用 cglib 来实现),具体代码如下:

public class rpcproxy {

  private string serveraddress;
  private servicediscovery servicediscovery;

  public rpcproxy(string serveraddress) {
    this.serveraddress = serveraddress;
  }

  public rpcproxy(servicediscovery servicediscovery) {
    this.servicediscovery = servicediscovery;
  }

  @suppresswarnings("unchecked")
  public <t> t create(class<?> interfaceclass) {
    return (t) proxy.newproxyinstance(
      interfaceclass.getclassloader(),
      new class<?>[]{interfaceclass},
      new invocationhandler() {
        @override
        public object invoke(object proxy, method method, object[] args) throws throwable {
          rpcrequest request = new rpcrequest(); // 创建并初始化 rpc 请求
          request.setrequestid(uuid.randomuuid().tostring());
          request.setclassname(method.getdeclaringclass().getname());
          request.setmethodname(method.getname());
          request.setparametertypes(method.getparametertypes());
          request.setparameters(args);

          if (servicediscovery != null) {
            serveraddress = servicediscovery.discover(); // 发现服务
          }

          string[] array = serveraddress.split(":");
          string host = array[0];
          int port = integer.parseint(array[1]);

          rpcclient client = new rpcclient(host, port); // 初始化 rpc 客户端
          rpcresponse response = client.send(request); // 通过 rpc 客户端发送 rpc 请求并获取 rpc 响应

          if (response.iserror()) {
            throw response.geterror();
          } else {
            return response.getresult();
          }
        }
      }
    );
  }
}

使用rpcclient类实现 rpc 客户端,只需扩展 netty 提供的simplechannelinboundhandler抽象类即可,代码如下:

public class rpcclient extends simplechannelinboundhandler<rpcresponse> {

  private static final logger logger = loggerfactory.getlogger(rpcclient.class);

  private string host;
  private int port;

  private rpcresponse response;

  private final object obj = new object();

  public rpcclient(string host, int port) {
    this.host = host;
    this.port = port;
  }

  @override
  public void channelread0(channelhandlercontext ctx, rpcresponse response) throws exception {
    this.response = response;

    synchronized (obj) {
      obj.notifyall(); // 收到响应,唤醒线程
    }
  }

  @override
  public void exceptioncaught(channelhandlercontext ctx, throwable cause) throws exception {
    logger.error("client caught exception", cause);
    ctx.close();
  }

  public rpcresponse send(rpcrequest request) throws exception {
    eventloopgroup group = new nioeventloopgroup();
    try {
      bootstrap bootstrap = new bootstrap();
      bootstrap.group(group).channel(niosocketchannel.class)
        .handler(new channelinitializer<socketchannel>() {
          @override
          public void initchannel(socketchannel channel) throws exception {
            channel.pipeline()
              .addlast(new rpcencoder(rpcrequest.class)) // 将 rpc 请求进行编码(为了发送请求)
              .addlast(new rpcdecoder(rpcresponse.class)) // 将 rpc 响应进行解码(为了处理响应)
              .addlast(rpcclient.this); // 使用 rpcclient 发送 rpc 请求
          }
        })
        .option(channeloption.so_keepalive, true);

      channelfuture future = bootstrap.connect(host, port).sync();
      future.channel().writeandflush(request).sync();

      synchronized (obj) {
        obj.wait(); // 未收到响应,使线程等待
      }

      if (response != null) {
        future.channel().closefuture().sync();
      }
      return response;
    } finally {
      group.shutdowngracefully();
    }
  }
}

第十步:发送 rpc 请求

使用 junit 结合 spring 编写一个单元测试,代码如下:

@runwith(springjunit4classrunner.class)
@contextconfiguration(locations = "classpath:spring.xml")
public class helloservicetest {

  @autowired
  private rpcproxy rpcproxy;

  @test
  public void hellotest() {
    helloservice helloservice = rpcproxy.create(helloservice.class);
    string result = helloservice.hello("world");
    assert.assertequals("hello! world", result);
  }
}

运行以上单元测试,如果不出意外的话,您应该会看到绿条。

总结

本文通过 spring + netty + protostuff + zookeeper 实现了一个轻量级 rpc 框架,使用 spring 提供依赖注入与参数配置,使用 netty 实现 nio 方式的数据传输,使用 protostuff 实现对象序列化,使用 zookeeper 实现服务注册与发现。使用该框架,可将服务部署到分布式环境中的任意节点上,客户端通过远程接口来调用服务端的具体实现,让服务端与客户端的开发完全分离,为实现大规模分布式应用提供了基础支持。

附录:maven 依赖

<!-- junit -->
<dependency>
  <groupid>junit</groupid>
  <artifactid>junit</artifactid>
  <version>4.11</version>
  <scope>test</scope>
</dependency>

<!-- slf4j -->
<dependency>
  <groupid>org.slf4j</groupid>
  <artifactid>slf4j-log4j12</artifactid>
  <version>1.7.7</version>
</dependency>

<!-- spring -->
<dependency>
  <groupid>org.springframework</groupid>
  <artifactid>spring-context</artifactid>
  <version>3.2.12.release</version>
</dependency>
<dependency>
  <groupid>org.springframework</groupid>
  <artifactid>spring-test</artifactid>
  <version>3.2.12.release</version>
  <scope>test</scope>
</dependency>

<!-- netty -->
<dependency>
  <groupid>io.netty</groupid>
  <artifactid>netty-all</artifactid>
  <version>4.0.24.final</version>
</dependency>

<!-- protostuff -->
<dependency>
  <groupid>com.dyuproject.protostuff</groupid>
  <artifactid>protostuff-core</artifactid>
  <version>1.0.8</version>
</dependency>
<dependency>
  <groupid>com.dyuproject.protostuff</groupid>
  <artifactid>protostuff-runtime</artifactid>
  <version>1.0.8</version>
</dependency>

<!-- zookeeper -->
<dependency>
  <groupid>org.apache.zookeeper</groupid>
  <artifactid>zookeeper</artifactid>
  <version>3.4.6</version>
</dependency>

<!-- apache commons collections -->
<dependency>
  <groupid>org.apache.commons</groupid>
  <artifactid>commons-collections4</artifactid>
  <version>4.0</version>
</dependency>

<!-- objenesis -->
<dependency>
  <groupid>org.objenesis</groupid>
  <artifactid>objenesis</artifactid>
  <version>2.1</version>
</dependency>

<!-- cglib -->
<dependency>
  <groupid>cglib</groupid>
  <artifactid>cglib</artifactid>
  <version>3.1</version>
</dependency>

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网