当前位置: 移动技术网 > IT编程>开发语言>JavaScript > vert实践五——Json?Protocol Buffer?FlatBuffers?

vert实践五——Json?Protocol Buffer?FlatBuffers?

2020年07月31日  | 移动技术网IT编程  | 我要评论

本节我们来比较一下Json、FlatBuffers、Protocol Buffer在vertx中使用时的占用资源情况比较。

方式阐述

启动一个消费端进程和一个生产端进程,其中生产端发布一个web服务,方便使用ab测试工具进行测试(其中消费端和生产端分布在两台机器)。

ab调用生产端web -> 生产端通过bus传输消息 -> 消费端通过bus回复消息

消费端机器规格 4核 2.6G HZ 16G
生产端机器规格 6核 3.2G HZ 16G

Json序列化

生产端

   JsonObject jsonObject = new JsonObject();
            for (int i=0; i<500; i ++){
                jsonObject.put("name_" + i,"wan_ke");
            }
            //通过eventbus发送请求
            eventBus.request("com.xiaoniu.bus", jsonObject, msg -> {
                        if (msg.succeeded()) {
                            if (msg.result() != null){
                                String data = ((JsonObject) msg.result().body()).encodePrettily();
                            }
                        } else {
                            System.err.println(msg.cause().getMessage());
                            msg.cause().printStackTrace();
                        }
                    }
            );

消费端

eventBus.consumer("com.xiaoniu.bus", msg -> {
                    System.out.println("收到消息");
                    if (msg != null && msg.body() instanceof JsonObject){
                String receive_data = ((JsonObject) msg.body()).encodePrettily();
            }
            JsonObject jsonObject = new JsonObject();
            for (int i=0;i<500;i++){
                jsonObject.put("name_"+ i,"wan_ke_receive");
            }

           msg.reply(jsonObject);
        });

生产端和消费端均启动后,进行三次,每次间隔2分钟。

第一次执行ab -n10000 -c10 http://192.168.2.112:7777/index(并发度10,1万次请求)

第二次执行ab -n20000 -c20 http://192.168.2.112:7777/index(并发度10,2万次请求)

第三次执行ab -n30000 -c20 http://192.168.2.112:7777/index(并发度20,3万次请求)

资源 cpu 内存
第一次 生产端(初值:12% 峰值:71% 末值:12%)消费端(初值:2% 峰值:16% 末值:4%) 生产端(初值:50M 峰值:294M 末值:131M )消费端(初值:50M 峰值:150M 末值:150M)
第二次 生产端(峰值:62% 末值:12%)消费端(峰值:10% 末值:8%) 生产端(峰值:188M 末值:160M)消费端(峰值:120M 末值:120M)
第三次 生产端(峰值:60% 末值:16%)消费端(峰值:10% 末值:3%) 生产端(峰值:380M 末值:100M )消费端(峰值:230M 末值:110M)

Proto序列化

生产端

 ProtoCommonMsg protoCommonMsg = new ProtoCommonMsg();
      for (int i =0; i<500 ; i ++){
        protoCommonMsg.put("name_" + i,"wan_ke");
      }
      //通过eventbus发送请求
      eventBus.request("com.xiaoniu.bus", protoCommonMsg,options, msg -> {
        if (msg.succeeded()) {
          if (msg.result() != null){
            ProtoCommonMsg proto = (ProtoCommonMsg) msg.result().body();
          }
        } else {
          System.err.println(msg.cause().getMessage());
          msg.cause().printStackTrace();
        }
      }

消费端

   eventBus.consumer("com.xiaoniu.bus", msg -> {
      System.out.println("收到消息");
      if (msg != null && msg.body() instanceof ProtoCommonMsg){
          ProtoCommonMsg receive_data = (ProtoCommonMsg) msg.body();
      }
      ProtoCommonMsg protoCommonMsg = new ProtoCommonMsg();
      for (int i =0; i<500 ; i ++){
        protoCommonMsg.put("name_" + i,"wan_ke_receive");
      }

   msg.reply(protoCommonMsg,options);
    });
资源 cpu 内存
第一次 生产端(初值:12% 峰值:45% 末值:14%)消费端(初值:2% 峰值:9% 末值:3%) 生产端(初值:50M 峰值:165M 末值:37M )消费端(初值:50M 峰值:46M 末值:58M)
第二次 生产端(峰值:53% 末值:11%)消费端(峰值:13% 末值:3%) 生产端(峰值:192M 末值:150M)消费端(峰值:70M 末值:48M)
第三次 生产端(峰值:65% 末值:14%)消费端(峰值:30% 末值:3%) 生产端(峰值:320M 末值:240M )消费端(峰值:70M 末值:50M)

FlatBuffers序列化

生产端

     FlatBuffersCommonMsg flatBuffersCommonMsg = new FlatBuffersCommonMsg();
            for (int i =0; i<500 ; i ++){
                flatBuffersCommonMsg.put("name_" + i,"wan_ke");
            }

            //通过eventbus发送请求
            eventBus.request("com.xiaoniu.bus", flatBuffersCommonMsg,options, msg -> {
                        if (msg.succeeded()) {
                            if (msg.result() != null){
                                FlatBuffersCommonMsg proto = (FlatBuffersCommonMsg) msg.result().body();
                            }
                        } else {
                            System.err.println(msg.cause().getMessage());
                            msg.cause().printStackTrace();
                        }
                    }
            );

消费端

   eventBus.consumer("com.xiaoniu.bus", msg -> {
            System.out.println("收到消息");
            if (msg != null && msg.body() instanceof FlatBuffersCommonMsg){
                FlatBuffersCommonMsg receive_data = (FlatBuffersCommonMsg) msg.body();
            }
            FlatBuffersCommonMsg protoCommonMsg = new FlatBuffersCommonMsg();
            for (int i =0; i<500 ; i ++){
                protoCommonMsg.put("name_" + i,"wan_ke_receive");
            }

            msg.reply(protoCommonMsg,options);
        });
资源 cpu 内存
第一次 生产端(初值:14% 峰值:71% 末值:17%)消费端(初值:3% 峰值:10% 末值:10%) 生产端(初值:50M 峰值:123M 末值:50M )消费端(初值:50M 峰值:70M 末值:70M)
第二次 生产端(峰值:60% 末值:17%)消费端(峰值:8% 末值:2%) 生产端(峰值:300M 末值:300M)消费端(峰值:70M 末值:60M)
第三次 生产端(峰值:66% 末值:17%)消费端(峰值:16% 末值:3%) 生产端(峰值:332M 末值:200M )消费端(峰值:60M 末值:58M)

总结

总体来看Proto和FlatBuffers在消费端的内存消耗表现比较优秀一点

Event Bus如何使用proto编码

自定义编码器

package org.example.MessageCode;

import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
import org.example.proto.ProtoCommonMsg;
import org.example.util.ProtostuffUtils;

/**
 * 自定义对象编解码器,两个类型可用于消息转换,即发送对象转换为接受需要的对象
 */
public class ProtoMessageCodec implements MessageCodec<ProtoCommonMsg, ProtoCommonMsg> {
    /**
     * 将消息实体封装到Buffer用于传输
     * 实现方式:使用对象流从对象中获取Byte数组然后追加到Buffer
     */
    @Override
    public void encodeToWire(Buffer buffer, ProtoCommonMsg protoCommonMsg) {
        buffer.appendBytes(ProtostuffUtils.serialize(protoCommonMsg));


    }
    //从Buffer中获取消息对象
    @Override
    public ProtoCommonMsg decodeFromWire(int pos, Buffer buffer) {
       return ProtostuffUtils.deserialize(buffer.getBytes(pos, buffer.length()),ProtoCommonMsg.class);
    }

    @Override
    public ProtoCommonMsg transform(ProtoCommonMsg protoMessage) {
        return null;
    }
    //消息转换

    @Override
    public String name() { return "myProtoCodec"; }
    //识别是否是用户自定义编解码器,通常为-1
    @Override
    public byte systemCodecID() { return -1; }

}

集群EventBus注册且只能注册一次编码解码器

  ProtoMessageCodec protoMessageCodec = new ProtoMessageCodec();
    EventBus eventBus = vertx.eventBus();
    eventBus.registerCodec(protoMessageCodec);

消费者回复需要带上编码解码器名称,告诉生产者这次使用哪个编码解码器

//新建带上编码解码器名称的options
DeliveryOptions options = new DeliveryOptions().setCodecName(protoMessageCodec.name());
 eventBus.consumer("com.xiaoniu.bus", msg -> {
      System.out.println("收到消息");
      if (msg != null && msg.body() instanceof ProtoCommonMsg){
          ProtoCommonMsg receive_data = (ProtoCommonMsg) msg.body();
      }
      ProtoCommonMsg protoCommonMsg = new ProtoCommonMsg();
      for (int i =0; i<500 ; i ++){
        protoCommonMsg.put("name_" + i,"wan_ke_receive");
      }
 //回复消息时带上options
   msg.reply(protoCommonMsg,options);
    });

生产者发送消息时带上编码解码器名称

 DeliveryOptions options = new DeliveryOptions().setCodecName(protoMessageCodec.name());
   eventBus.request("com.xiaoniu.bus", protoCommonMsg,options, msg -> {
        if (msg.succeeded()) {
          if (msg.result() != null){
            ProtoCommonMsg proto = (ProtoCommonMsg) msg.result().body();
          }
        } else {
          System.err.println(msg.cause().getMessage());
          msg.cause().printStackTrace();
        }
      }
      );

使用时,新建ProtoCommonMsg对象,插入键值对即可

package org.example.proto;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author: Administrator
 * @Description:
 * @Date: 2020/7/22 8:32
 * @Version: 1.0
 */
public class ProtoCommonMsg   {

  private Map data = new HashMap();

  public void put(Object key,Object value){
      data.put(key,value);
  }
  public Object get(Object key){
      return  data.get(key);
  }
  public <T> T get(Object key, Class<T> clazz){
        return  (T) data.get(key);
  }
  public String toString(){
      return data.toString();
  }
}

Protocol Buffer编码解码工具,以供自定义编码解码器使用

package org.example.util;

import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

public class ProtostuffUtils {
    /**
     * 避免每次序列化都重新申请Buffer空间
     */
    private static LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
    /**
     * 缓存Schema
     */
    private static Map<Class<?>, Schema<?>> schemaCache = new ConcurrentHashMap<>();

    /**
     * 序列化方法,把指定对象序列化成字节数组
     *
     * @param obj
     * @param <T>
     * @return
     */
    @SuppressWarnings("unchecked")
    public static <T> byte[] serialize(T obj) {
        Class<T> clazz = (Class<T>) obj.getClass();
        Schema<T> schema = getSchema(clazz);
        byte[] data;
        try {
            data = ProtostuffIOUtil.toByteArray(obj, schema, buffer);
        } finally {
            buffer.clear();
        }

        return data;
    }

    /**
     * 反序列化方法,将字节数组反序列化成指定Class类型
     *
     * @param data
     * @param clazz
     * @param <T>
     * @return
     */
    public static <T> T deserialize(byte[] data, Class<T> clazz) {
        Schema<T> schema = getSchema(clazz);
        T obj = schema.newMessage();
        ProtostuffIOUtil.mergeFrom(data, obj, schema);
        return obj;
    }

    @SuppressWarnings("unchecked")
    private static <T> Schema<T> getSchema(Class<T> clazz) {
        Schema<T> schema = (Schema<T>) schemaCache.get(clazz);
        if (Objects.isNull(schema)) {
            //这个schema通过RuntimeSchema进行懒创建并缓存
            //所以可以一直调用RuntimeSchema.getSchema(),这个方法是线程安全的
            schema = RuntimeSchema.getSchema(clazz);
            if (Objects.nonNull(schema)) {
                schemaCache.put(clazz, schema);
            }
        }

        return schema;
    }
}

本文地址:https://blog.csdn.net/xk4848123/article/details/107686232

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网