当前位置: 移动技术网 > IT编程>开发语言>Java > 荐 我的架构梦:(二十)基于Netty手写RPC框架

荐 我的架构梦:(二十)基于Netty手写RPC框架

2020年07月15日  | 移动技术网IT编程  | 我要评论

一、前言

RPC又称远程过程调用,我们所知的远程调用分为两种,现在在服务间通信的方式也基本以这两种为主:

  • 是基于HTTPrestful形式的广义远程调用,以spring couldfeignrestTemplate为代表,采用的协议是HTTP7层调用协议,并且协议的参数和响应序列化基本以JSON格式和XML格式为主。
  • 是基于TCP的狭义的RPC远程调用,以阿里的Dubbo为代表,主要通过netty来实现4层网络协议,NIO来异步传输,序列化也可以是JSON或者hessian2以及java自带的序列化等,可以配置。

接下来我们主要以第二种的RPC远程调用来自己实现

二、需求与步骤

模仿 dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用 Netty

1、创建一个公共的接口项目以及创建接口及方法,用于消费者和提供者之间的约定。
2、创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
3、创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用 Netty 请求提供者返回数据。

三、代码实现

1、maven聚合工程

rpc-netty父pom依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.riemann</groupId>
    <artifactId>rpc-netty</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <modules>
        <module>rpc-common</module>
        <module>rpc-provider</module>
        <module>rpc-consumer</module>
    </modules>

    <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.16.Final</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.41</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

rpc-common pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>rpc-netty</artifactId>
        <groupId>com.riemann</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rpc-common</artifactId>
    
</project>

rpc-provider pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>rpc-netty</artifactId>
        <groupId>com.riemann</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rpc-provider</artifactId>

    <dependencies>
        <dependency>
            <groupId>com.riemann</groupId>
            <artifactId>rpc-common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>

</project>

rpc-consumer pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>rpc-netty</artifactId>
        <groupId>com.riemann</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rpc-consumer</artifactId>

    <dependencies>
        <dependency>
            <groupId>com.riemann</groupId>
            <artifactId>rpc-common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>

</project>

2、rpc-common

提供者及消费者工程都需依赖公共模块,这样提供者来实现接口并且提供网络调用,消费者直接通过接口来进行TCP通信及一定的协议定制获取提供者的实现返回值。

public interface IUserService {

    String sayHello(String msg);

}

根据RpcRequest实体作为通信协议

@Data
public class RpcRequest {

    /**
     * 请求对象的ID
     */
    private String requestId;

    /**
     * 类名
     */
    private String className;

    /**
     * 方法名
     */
    private String methodName;

    /**
     * 参数类型
     */
    private Class<?>[] parameterTypes;

    /**
     * 入参
     */
    private Object[] parameters;

}

采用JSON的方式,定义JSONSerializer的实现类。

public class JSONSerializer implements Serializer {

    public byte[] serialize(Object object) throws IOException {
        return JSON.toJSONBytes(object);
    }

    public <T> T deserialize(Class<T> clazz, byte[] bytes) throws IOException {
        return JSON.parseObject(bytes, clazz);
    }

}

3、rpc-provider

首先是接口的实现,这一点和普通接口实现是一样的

@Service
public class UserServiceImpl implements IUserService {

    // 将来客户端要远程调用的方法
    public String sayHello(String msg) {
        System.out.println("hello " + msg);
        return "success";
    }

    // 创建一个方法启动服务器
    public static void startServer(String ip, int port) throws InterruptedException {
        // 1.创建两个线程池对象
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workGroup = new NioEventLoopGroup();

        // 2.创建服务端的启动引导对象
        ServerBootstrap serverBootstrap = new ServerBootstrap();

        // 3.配置启动引导对象
        serverBootstrap.group(bossGroup, workGroup)
                // 设置通道为NIO
                .channel(NioServerSocketChannel.class)
                // 创建监听channel
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        // 获取管道对象
                        ChannelPipeline pipeline = nioSocketChannel.pipeline();
                        // 给管道对象pipLine 设置编码
                        // pipeline.addLast(new StringEncoder());
                        // pipeline.addLast(new StringDecoder());

                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new RpcDecoder(RpcRequest.class, new JSONSerializer()));

                        // 把我们自定义的一个ChannelHandler添加到通道中
                        pipeline.addLast(new UserServiceHandler());
                    }
                });

        // 4.绑定端口
        serverBootstrap.bind(ip, port).sync();

        System.out.println("rpc-provider已启动...");
    }

}

在实现中加入了netty的服务器启动程序,上面的代码中添加了String类型的编解码 handler,添加了一个自定义 handler

自定义 handler 逻辑如下:

/**
 * 自定义的业务处理器
 */
@Component
public class UserServiceHandler extends ChannelInboundHandlerAdapter {

    // 当客户端读取数据时,该方法会被调用
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("接收到客户端信息:" + JSON.toJSON(msg).toString());
        // 将读到的msg对象,强转成RpcRequest对象
        RpcRequest rpcRequest = (RpcRequest) msg;
        // 加载class文件
        Class<?> clazz = Class.forName(rpcRequest.getClassName());
        // 通过class获取服务器spring容器中service类的实例化bean对象
        Object serviceBean = SpringContextUtil.getBean(clazz);
        Method method = clazz.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
        method.invoke(serviceBean, rpcRequest.getParameters());

        //服务器写入数据,将结果返回给客户端
        ctx.writeAndFlush("success");

        // 注意:客户端将来发送请求的时候会传递一个参数:UserService#sayHello#riemann

        // 1.判断当前的请求是否符合规则
        /*if (msg.toString().startsWith("UserService")) {
            // 2.如何符合规则,调用实现类获取到一个result
            UserServiceImpl userService = new UserServiceImpl();
            String result = userService.sayHello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
            // 3.将调用实现类的方法获得的结果写到客户端
            ctx.writeAndFlush(result);
        }*/
    }

}

还需要一个启动类:

@SpringBootApplication
public class ServerBoot {

    public static void main(String[] args) throws InterruptedException {
        SpringApplication.run(ServerBoot.class, args);
        // 启动服务器
        UserServiceImpl.startServer("127.0.0.1", 8999);
    }

}

4、rpc-consumer

消费者有一个需要注意的地方,就是调用需要透明,也就是说,框架使用者不用关心底层的网络实现。这里我们可以使用 JDK动态代理来实现这个目的。

思路:客户端调用代理方法,返回一个实现了 HelloService 接口的代理对象,调用代理对象的方法,返回结果。

我们需要在代理中做手脚,当调用代理方法的时候,我们需要初始化 Netty 客户端,还需要向服务端请求数据,并返回数据。

首先创建代理相关的类:

/**
 * 消费者
 */
public class RPCConsumer {

    // 1.创建一个线程池对象  -- 它要处理我们自定义事件
    private static ExecutorService executorService =
            Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    // 2.声明一个自定义事件处理器  UserClientHandler
    private static UserClientHandler userClientHandler;

    // 3.编写方法,初始化客户端 (创建连接池、bootStrap、设置bootStrap、连接服务器)
    public static void initClient() throws InterruptedException {
        // 1) 初始化 userClientHandler
        userClientHandler = new UserClientHandler();
        // 2) 创建连接池对象
        EventLoopGroup group = new NioEventLoopGroup();
        // 3) 创建客户端的引导对象
        Bootstrap bootstrap = new Bootstrap();
        // 4) 配置启动引导对象
        bootstrap.group(group)
                // 设置通道为NIO
                .channel(NioSocketChannel.class)
                // 设置请求协议为TCP
                .option(ChannelOption.TCP_NODELAY, true)
                // 监听channel 并初始化
                .handler(new ChannelInitializer<SocketChannel>() {
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 获取 ChannelPipeline
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        // 设置编码
                        // pipeline.addLast(new StringEncoder());
                        // pipeline.addLast(new StringDecoder());

                        pipeline.addLast(new RpcEncoder(RpcRequest.class, new JSONSerializer()));
                        pipeline.addLast(new StringDecoder());
                        // 添加自定义事件处理器
                        pipeline.addLast(userClientHandler);
                    }
                });
        // 5) 连接服务端
        bootstrap.connect("127.0.0.1", 8999).sync();
        System.out.println("rpc-consumer已启动...");
    }

    // 4.编写一个方法,使用JDK的动态代理创建对象
    /**
     *
     * @param serviceClass   接口类型,根据哪个接口生成子类代理对象
     * @param providerParam  "UserService#sayHello#riemann"
     * @return
     */
    public static Object createProxy(Class<?> serviceClass, final String providerParam) {
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class[]{serviceClass}, new InvocationHandler() {
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        // 1.初始化客户端client
                        if (userClientHandler == null) {
                            initClient();
                        }

                        String requestId = UUID.randomUUID().toString();
                        String className = method.getDeclaringClass().getName();
                        String methodName = method.getName();
                        Class<?>[] parameterTypes = method.getParameterTypes();

                        RpcRequest rpcRequest = new RpcRequest();
                        rpcRequest.setRequestId(requestId);
                        rpcRequest.setClassName(className);
                        rpcRequest.setMethodName(methodName);
                        rpcRequest.setParameterTypes(parameterTypes);
                        rpcRequest.setParameters(args);

                        // 2.给 userClientHandler 设置param参数
                        // userClientHandler.setParam(providerParam + args[0]);
                        userClientHandler.setParam(rpcRequest);

                        // 3.使用线程池,开启一个线程处理call()写操作,并返回结果。
                        Object result = executorService.submit(userClientHandler).get();

                        // 4.return 结果
                        return result;
                    }
                });
    }

}

该类有 2 个方法,创建代理和初始化客户端。

创建代理逻辑:使用 JDK 的动态代理技术,代理对象中的 invoke 方法实现如下: 如果 client 没有初始化,则初始化 client,这个 client 既是 handler ,也是一个 Callback。将参数设置进 client ,使用线程池调用 clientcall 方法并阻塞等待数据返回。

初始化客户端逻辑: 创建一个 Netty 的客户端,并连接提供者,并设置一个自定义 handler,和一些 String 类型的序列化方式。

UserClientHandler 的实现:

/**
 * 自定义事件处理器
 */
public class UserClientHandler extends ChannelInboundHandlerAdapter implements Callable {

    // 1.定义成员变量
    private ChannelHandlerContext context; // 事件处理器上下文对象(存储handler写信息,写操作)
    private String result; // 记录服务器返回的数据
    private Object param; // 记录将要返回给服务器的数据

    // 2.实现 channelActive 客户端和服务器连接时,该方法就自动执行。
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("服务器已连接...");
        // 初始化 ChannelHandlerContext
        this.context = ctx;
    }


    // 3.实现 channelRead 当我们读到服务器数据,该方法自动执行。
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 将读到的服务器的数据msg,设置为成员变量的值
        result = msg.toString();
        notify();
    }

    // 4.将客户端的数据写到服务器
    public synchronized Object call() throws Exception {
        // context 给服务器写数据
        context.writeAndFlush(param);
        wait();
        return result;
    }

    // 5.设置参数的方法
    public void setParam(Object param) {
        this.param = param;
    }
}

该类缓存了 ChannelHandlerContext,用于下次使用,有两个属性:返回结果和请求参数。

当成功连接后,缓存 ChannelHandlerContext,当调用 call 方法的时候,将请求参数发送到服务端,等待。当服务端收到并返回数据后,调用 channelRead 方法,将返回值赋值个 result,并唤醒等待在 call 方法上的线程。此时,代理对象返回数据。

再看看消费者调用方式,一般的TCPRPC只需要这样调用即可,无需关心具体的协议和通信方式:

public class ConsumeBoot {

    // 参数定义
    private static final String PROVIDE_NAME = "UserService#sayHello#";

    public static void main(String[] args) throws InterruptedException {
        // 1.创建代理对象
        IUserService service = (IUserService) RPCConsumer.createProxy(IUserService.class, PROVIDE_NAME);

        // 2.循环给服务器写数据
        while (true) {
            String result = service.sayHello("riemann");
            System.out.println(result);
            Thread.sleep(2000);
        }
    }

}

四、结果测试

调用者首先创建了一个代理对象,然后每隔一秒钟调用代理的 sayHello 方法,并打印服务端返回的结果。

服务提供者:
在这里插入图片描述
服务消费者:
在这里插入图片描述

可以看到,消费者无需通过jar包的形式引入具体的实现项目,而是通过远程TCP通信的形式,以一定的协议和代理通过接口直接调用了方法,实现远程service间的调用,是分布式服务的基础。

五、代码仓库

https://github.com/riemannChow/perseverance/tree/master/handwriting-framework/rpc-netty

本文地址:https://blog.csdn.net/riemann_/article/details/107350656

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网