当前位置: 移动技术网 > IT编程>开发语言>Java > 基于netty实现rpc框架-spring boot客户端

基于netty实现rpc框架-spring boot客户端

2020年04月23日  | 移动技术网IT编程  | 我要评论

上篇讲了rpc服务端的实现。原理就是解析netty通道数据拿到类、方法及入参等信息,然后通过java反射机制调用本地接口返回结果。没有用到很复杂的技术。

这篇我们将客户端的实现。说白了客户端的任务很简单:一是建立socket长连接。二是封装发送服务端需要的数据包。三是处理返回结果。

demo地址

rpc实现

同样定义注解扫描service接口。

@retention(retentionpolicy.runtime)
@target({elementtype.type})
@documented
@import({nettyclientscannerregistrar.class, nettyclientapplicationcontextaware.class})
public @interface nettyclientscan {

    string[] basepackages();

    class<? extends nettyfactorybean> factorybean() default nettyfactorybean.class;
}

  

该注解用于spring boot启动类上,参数basepackages指定接口所在的包路径。

@springbootapplication
@nettyclientscan(basepackages = {
        "com.braska.grave.netty.api.service"
})
public class gravenettyclientapplication {

    public static void main(string[] args) {
        springapplication.run(gravenettyclientapplication.class, args);
    }

}

  

nettyserverscannerregistrar类注册bean。

public class nettyclientscannerregistrar implements importbeandefinitionregistrar, resourceloaderaware {

    @override
    public void registerbeandefinitions(annotationmetadata importingclassmetadata, beandefinitionregistry registry) {
        // spring bean注册
        nettyclientinterfacescanner scanner = new nettyclientinterfacescanner(registry);

        annotationattributes annoattrs =
                annotationattributes.frommap(importingclassmetadata.getannotationattributes(nettyclientscan.class.getname()));

        class<? extends nettyfactorybean> nettyfactorybeanclass = annoattrs.getclass("factorybean");
        if (!nettyfactorybean.class.equals(nettyfactorybeanclass)) {
            scanner.setnettyfactorybean(beanutils.instantiateclass(nettyfactorybeanclass));
        }

        list<string> basepackages = new arraylist<string>();
        for (string pkg : annoattrs.getstringarray("basepackages")) {
            if (stringutils.hastext(pkg)) {
                basepackages.add(pkg);
            }
        }

        scanner.doscan(stringutils.tostringarray(basepackages));
    }
}

  

nettyclientinterfacescanner类使用jdk动态代理basepackages路径下的接口。

public class nettyclientinterfacescanner extends classpathbeandefinitionscanner {
    private nettyfactorybean nettyfactorybean = new nettyfactorybean();


    @override
    public set<beandefinitionholder> doscan(string... basepackages) {
        set<beandefinitionholder> beandefinitions = super.doscan(basepackages);

        if (beandefinitions.isempty()) {
        } else {
            processbeandefinitions(beandefinitions);
        }

        return beandefinitions;
    }

    private void processbeandefinitions(
            set<beandefinitionholder> beandefinitions) {

        genericbeandefinition definition;

        for (beandefinitionholder holder : beandefinitions) {

            definition = (genericbeandefinition) holder.getbeandefinition();
            // 为对象属性赋值(这一块我也还不太明白)
       definition.getconstructorargumentvalues().addgenericargumentvalue(definition.getbeanclassname());
            // 这里的nettyfactorybean是生成bean实例的工厂,不是bean本身
            definition.setbeanclass(this.nettyfactorybean.getclass());

            definition.setautowiremode(abstractbeandefinition.autowire_by_type);
        }
    }
}

  

nettyfactorybean 

public class nettyfactorybean<t> implements factorybean<t> {
    private class<t> nettyinterface;

    public nettyfactorybean() {}

    public nettyfactorybean(class<t> nettyinterface) {
        this.nettyinterface = nettyinterface;
    }

    @override
    public t getobject() throws exception {
        // 通过jdk动态代理创建实例
        return (t) proxy.newproxyinstance(nettyinterface.getclassloader(), new class[]{nettyinterface}, c.getinstance());
    }

    @override
    public class<?> getobjecttype() {
        return this.nettyinterface;
    }

    @override
    public boolean issingleton() {
        return true;
    }
}

  

关键来了,nettyinterfaceinvoker类负责数据包封装及发送。

public class nettyinterfaceinvoker implements invocationhandler {

    private requestsender sender;
    // 静态内部类做单例模式 
    private static class singleton {
        private static final nettyinterfaceinvoker invoker = new nettyinterfaceinvoker();

        private static nettyinterfaceinvoker setsender(requestsender sender) {
            invoker.sender = sender;
            return invoker;
        }
    }

    public static nettyinterfaceinvoker getinstance() {
        return singleton.invoker;
    }

    public static nettyinterfaceinvoker setsender(requestsender sender) {
        return singleton.setsender(sender);
    }

    @override
    public object invoke(object proxy, method method, object[] args) throws throwable {
        // 数据包封装,包含类名、方法名及参数等信息。
        request request = new request();
        request.setclassname(method.getdeclaringclass().getname());
        request.setmethodname(method.getname());
        request.setparameters(args);
        request.setparametertypes(method.getparametertypes());
        request.setid(uuid.randomuuid().tostring());
        // 数据发送
        object result = sender.send(request);
        class<?> returntype = method.getreturntype();
        // 处理返回数据
        response response = json.parseobject(result.tostring(), response.class);
        if (response.getcode() == 1) {
            throw new exception(response.geterror());
        }
        if (returntype.isprimitive() || string.class.isassignablefrom(returntype)) {
            return response.getdata();
        } else if (collection.class.isassignablefrom(returntype)) {
            return jsonarray.parsearray(response.getdata().tostring(), object.class);
        } else if (map.class.isassignablefrom(returntype)) {
            return json.parseobject(response.getdata().tostring(), map.class);
        } else {
            object data = response.getdata();
            return jsonobject.parseobject(data.tostring(), returntype);
        }
    }
}

  

接着我们来看看requestsender怎么处理数据的。

public interface requestsender {
    channel connect(socketaddress address) throws interruptedexception;

    object send(request request) throws interruptedexception;
}

  

requestsender本身只是一个接口。他的实现类有:

public class nettyclientapplicationcontextaware extends channelinitializer<socketchannel>
        implements requestsender, applicationcontextaware, initializingbean {
    private static final logger logger = logger.getlogger(nettyclientapplicationcontextaware.class.getname());

    private string remoteaddress;
    private bootstrap bootstrap;
    private eventloopgroup group;
    private nettychannelmanager manager;
    private nettyclienthandler handler;

    @override
    public void setapplicationcontext(applicationcontext applicationcontext) throws beansexception {
        this.remoteaddress = applicationcontext.getenvironment().getproperty("remoteaddress");
        this.bootstrap = new bootstrap();
        this.group = new nioeventloopgroup(1);
        this.bootstrap.group(group).
                channel(niosocketchannel.class).
                option(channeloption.tcp_nodelay, true).
                option(channeloption.so_keepalive, true).
                handler(this);
        this.manager = new nettychannelmanager(this);
        this.handler = new nettyclienthandler(manager, remoteaddress);
    }

    @override
    public void afterpropertiesset() throws exception {
        // socket连接入口。
        this.manager.refresh(lists.newarraylist(remoteaddress));
    }

    @override
    public object send(request request) throws interruptedexception {
        channel channel = manager.take();
        if (channel != null && channel.isactive()) {
            synchronousqueue<object> queue = this.handler.sendrequest(request, channel);
            object result = queue.take();
            return jsonarray.tojsonstring(result);
        } else {
            response res = new response();
            res.setcode(1);
            res.seterror("未正确连接到服务器.请检查相关配置信息!");
            return jsonarray.tojsonstring(res);
        }
    }

    @override
    protected void initchannel(socketchannel channel) throws exception {
        channelpipeline pipeline = channel.pipeline();
        pipeline.addlast(new idlestatehandler(0, 0, 30));
        pipeline.addlast(new jsonencoder());
        pipeline.addlast(new jsondecoder());
        // 管道处理器
        pipeline.addlast(this.handler);
    }

    @override
    public channel connect(socketaddress address) throws interruptedexception {
        channelfuture future = bootstrap.connect(address);
        // 建立长连接,提供失败重连。
        future.addlistener(new connectionlistener(this.manager, this.remoteaddress));
        channel channel = future.channel();//future.sync().channel();
        return channel;
    }

    public void destroy() {
        this.group.shutdowngracefully();
    }
}

  

nettyclienthandler类处理管道事件。与服务端不通,这个管道处理器是继承channelinboundhandleradapter类。

@channelhandler.sharable
public class nettyclienthandler extends channelinboundhandleradapter {
    private static final logger logger = logger.getlogger(nettyserverhandler.class.getname());

    private concurrenthashmap<string, synchronousqueue<object>> queuemap = new concurrenthashmap<>();
    private nettychannelmanager manager;
    private string remoteaddress;

    public nettyclienthandler(nettychannelmanager manager, string remoteaddress) {
        this.manager = manager;
        this.remoteaddress = remoteaddress;
    }

    @override
    public void channelinactive(channelhandlercontext ctx) {
        inetsocketaddress address = (inetsocketaddress) ctx.channel().remoteaddress();
        logger.info("与netty服务器断开连接." + address);
        ctx.channel().close();
        manager.remove(ctx.channel());
        // 掉线重连
        final eventloop eventloop = ctx.channel().eventloop();
        eventloop.schedule(() -> {
            manager.refresh(lists.newarraylist(remoteaddress));
        }, 1l, timeunit.seconds);
    }

    @override
    public void channelread(channelhandlercontext ctx, object msg) throws exception {
        // 处理服务端返回的数据
        response response = json.parseobject(msg.tostring(), response.class);
        string requestid = response.getrequestid();
        synchronousqueue<object> queue = queuemap.get(requestid);
        queue.put(response);
        queuemap.remove(requestid);
    }

    public synchronousqueue<object> sendrequest(request request, channel channel) {
        // 使用阻塞队列处理客户端请求
        synchronousqueue<object> queue = new synchronousqueue<>();
        queuemap.put(request.getid(), queue);
        channel.writeandflush(request);
        return queue;
    }

    public void usereventtriggered(channelhandlercontext ctx, object evt) throws exception {
        logger.info("发送心跳消息...");
        if (evt instanceof idlestateevent) {
            idlestateevent event = (idlestateevent) evt;
            if (event.state() == idlestate.all_idle) {
                request request = new request();
                request.setmethodname("heartbeat");
                ctx.channel().writeandflush(request);
            }
        } else {
            super.usereventtriggered(ctx, evt);
        }
    }
}

  

这样,rpc的客户端就写好了,其中主要涉及到的关键内容就是netty实例及管道处理器、jdk动态代理、还有一个阻塞队列。

结合上篇rpc服务端。一个完整的rpc框架就搭建完了。

当然,有些地方处理的还是比较粗糙。后续有修改以git代码为准。

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网