当前位置: 移动技术网 > IT编程>开发语言>Java > spring与disruptor集成的简单示例

spring与disruptor集成的简单示例

2019年07月19日  | 移动技术网IT编程  | 我要评论

肾病食品,一箱烟有多少条,任我游gps

disruptor不过多介绍了,描述下当前的业务场景,两个应用a,b,应用 a 向应用 b 传递数据 . 数据传送比较快,如果用http直接push数据然后入库,效率不高.有可能导致a应用比较大的压力. 使用mq 太重量级,所以选择了disruptor. 也可以使用reactor

basequeuehelper.java

/**
 * lmax.disruptor 高效队列处理模板. 支持初始队列,即在init()前进行发布。
 *
 * 调用init()时才真正启动线程开始处理 系统退出自动清理资源.
 *
 * @author xielongwang
 * @create 2018-01-18 下午3:49
 * @email xielong.wang@nvr-china.com
 * @description
 */
public abstract class basequeuehelper<d, e extends valuewrapper<d>, h extends workhandler<e>> {

  /**
   * 记录所有的队列,系统退出时统一清理资源
   */
  private static list<basequeuehelper> queuehelperlist = new arraylist<basequeuehelper>();
  /**
   * disruptor 对象
   */
  private disruptor<e> disruptor;
  /**
   * ringbuffer
   */
  private ringbuffer<e> ringbuffer;
  /**
   * initqueue
   */
  private list<d> initqueue = new arraylist<d>();

  /**
   * 队列大小
   *
   * @return 队列长度,必须是2的幂
   */
  protected abstract int getqueuesize();

  /**
   * 事件工厂
   *
   * @return eventfactory
   */
  protected abstract eventfactory<e> eventfactory();

  /**
   * 事件消费者
   *
   * @return workhandler[]
   */
  protected abstract workhandler[] gethandler();

  /**
   * 初始化
   */
  public void init() {
    threadfactory namedthreadfactory = new threadfactorybuilder().setnameformat("disruptorthreadpool").build();
    disruptor = new disruptor<e>(eventfactory(), getqueuesize(), namedthreadfactory, producertype.single, getstrategy());
    disruptor.setdefaultexceptionhandler(new myhandlerexception());
    disruptor.handleeventswithworkerpool(gethandler());
    ringbuffer = disruptor.start();

    //初始化数据发布
    for (d data : initqueue) {
      ringbuffer.publishevent(new eventtranslatoronearg<e, d>() {
        @override
        public void translateto(e event, long sequence, d data) {
          event.setvalue(data);
        }
      }, data);
    }

    //加入资源清理钩子
    synchronized (queuehelperlist) {
      if (queuehelperlist.isempty()) {
        runtime.getruntime().addshutdownhook(new thread() {
          @override
          public void run() {
            for (basequeuehelper basequeuehelper : queuehelperlist) {
              basequeuehelper.shutdown();
            }
          }
        });
      }
      queuehelperlist.add(this);
    }
  }

  /**
   * 如果要改变线程执行优先级,override此策略. yieldingwaitstrategy会提高响应并在闲时占用70%以上cpu,
   * 慎用sleepingwaitstrategy会降低响应更减少cpu占用,用于日志等场景.
   *
   * @return waitstrategy
   */
  protected abstract waitstrategy getstrategy();

  /**
   * 插入队列消息,支持在对象init前插入队列,则在队列建立时立即发布到队列处理.
   */
  public synchronized void publishevent(d data) {
    if (ringbuffer == null) {
      initqueue.add(data);
      return;
    }
    ringbuffer.publishevent(new eventtranslatoronearg<e, d>() {
      @override
      public void translateto(e event, long sequence, d data) {
        event.setvalue(data);
      }
    }, data);
  }

  /**
   * 关闭队列
   */
  public void shutdown() {
    disruptor.shutdown();
  }
}

eventfactory.java

/**
 * @author xielongwang
 * @create 2018-01-18 下午6:24
 * @email xielong.wang@nvr-china.com
 * @description
 */
public class eventfactory implements com.lmax.disruptor.eventfactory<seriesdataevent> {

  @override
  public seriesdataevent newinstance() {
    return new seriesdataevent();
  }
}

myhandlerexception.java

public class myhandlerexception implements exceptionhandler {

  private logger logger = loggerfactory.getlogger(myhandlerexception.class);

  /*
   * (non-javadoc) 运行过程中发生时的异常
   *
   * @see
   * com.lmax.disruptor.exceptionhandler#handleeventexception(java.lang.throwable
   * , long, java.lang.object)
   */
  @override
  public void handleeventexception(throwable ex, long sequence, object event) {
    ex.printstacktrace();
    logger.error("process data error sequence ==[{}] event==[{}] ,ex ==[{}]", sequence, event.tostring(), ex.getmessage());
  }

  /*
   * (non-javadoc) 启动时的异常
   *
   * @see
   * com.lmax.disruptor.exceptionhandler#handleonstartexception(java.lang.
   * throwable)
   */
  @override
  public void handleonstartexception(throwable ex) {
    logger.error("start disruptor error ==[{}]!", ex.getmessage());
  }

  /*
   * (non-javadoc) 关闭时的异常
   *
   * @see
   * com.lmax.disruptor.exceptionhandler#handleonshutdownexception(java.lang
   * .throwable)
   */
  @override
  public void handleonshutdownexception(throwable ex) {
    logger.error("shutdown disruptor error ==[{}]!", ex.getmessage());
  }
}

seriesdata.java (代表应用a发送给应用b的消息)

public class seriesdata {
  private string deviceinfostr;
  public seriesdata() {
  }

  public seriesdata(string deviceinfostr) {
    this.deviceinfostr = deviceinfostr;
  }

  public string getdeviceinfostr() {
    return deviceinfostr;
  }

  public void setdeviceinfostr(string deviceinfostr) {
    this.deviceinfostr = deviceinfostr;
  }

  @override
  public string tostring() {
    return "seriesdata{" +
        "deviceinfostr='" + deviceinfostr + '\'' +
        '}';
  }
}

seriesdataevent.java

public class seriesdataevent extends valuewrapper<seriesdata> {
}

seriesdataeventhandler.java

public class seriesdataeventhandler implements workhandler<seriesdataevent> {
  private logger logger = loggerfactory.getlogger(seriesdataeventhandler.class);
  @autowired
  private deviceinfoservice deviceinfoservice;

  @override
  public void onevent(seriesdataevent event) {
    if (event.getvalue() == null || stringutils.isempty(event.getvalue().getdeviceinfostr())) {
      logger.warn("receiver series data is empty!");
    }
    //业务处理
    deviceinfoservice.processdata(event.getvalue().getdeviceinfostr());
  }
}

seriesdataeventqueuehelper.java

@component
public class seriesdataeventqueuehelper extends basequeuehelper<seriesdata, seriesdataevent, seriesdataeventhandler> implements initializingbean {
  private static final int queue_size = 1024;
  @autowired
  private list<seriesdataeventhandler> seriesdataeventhandler;

  @override
  protected int getqueuesize() {
    return queue_size;
  }

  @override
  protected com.lmax.disruptor.eventfactory eventfactory() {
    return new eventfactory();
  }

  @override
  protected workhandler[] gethandler() {
    int size = seriesdataeventhandler.size();
    seriesdataeventhandler[] parameventhandlers = (seriesdataeventhandler[]) seriesdataeventhandler.toarray(new seriesdataeventhandler[size]);
    return parameventhandlers;
  }

  @override
  protected waitstrategy getstrategy() {
    return new blockingwaitstrategy();
    //return new yieldingwaitstrategy();
  }

  @override
  public void afterpropertiesset() throws exception {
    this.init();
  }
}

valuewrapper.java

public abstract class valuewrapper<t> {
  private t value;
  public valuewrapper() {}
  public valuewrapper(t value) {
    this.value = value;
  }

  public t getvalue() {
    return value;
  }

  public void setvalue(t value) {
    this.value = value;
  }
}

disruptorconfig.java

@configuration
@componentscan(value = {"com.portal.disruptor"})
//多实例几个消费者
public class disruptorconfig {

  /**
   * smsparameventhandler1
   *
   * @return seriesdataeventhandler
   */
  @bean
  public seriesdataeventhandler smsparameventhandler1() {
    return new seriesdataeventhandler();
  }

  /**
   * smsparameventhandler2
   *
   * @return seriesdataeventhandler
   */
  @bean
  public seriesdataeventhandler smsparameventhandler2() {
    return new seriesdataeventhandler();
  }

  /**
   * smsparameventhandler3
   *
   * @return seriesdataeventhandler
   */
  @bean
  public seriesdataeventhandler smsparameventhandler3() {
    return new seriesdataeventhandler();
  }


  /**
   * smsparameventhandler4
   *
   * @return seriesdataeventhandler
   */
  @bean
  public seriesdataeventhandler smsparameventhandler4() {
    return new seriesdataeventhandler();
  }

  /**
   * smsparameventhandler5
   *
   * @return seriesdataeventhandler
   */
  @bean
  public seriesdataeventhandler smsparameventhandler5() {
    return new seriesdataeventhandler();
  }
}

测试

  //注入seriesdataeventqueuehelper消息生产者
  @autowired
  private seriesdataeventqueuehelper seriesdataeventqueuehelper;

  @requestmapping(value = "/data", method = requestmethod.post, produces = mediatype.application_json_value)
  public dataresponsevo<string> receiverdevicedata(@requestbody string devicedata) {
    long starttime1 = system.currenttimemillis();

    if (stringutils.isempty(devicedata)) {
      logger.info("receiver data is empty !");
      return new dataresponsevo<string>(400, "failed");
    }
    seriesdataeventqueuehelper.publishevent(new seriesdata(devicedata));
    long starttime2 = system.currenttimemillis();
    logger.info("receiver data ==[{}] millisecond ==[{}]", devicedata, starttime2 - starttime1);
    return new dataresponsevo<string>(200, "success");
  }

应用a通过/data 接口把数据发送到应用b ,然后通过seriesdataeventqueuehelper 把消息发给disruptor队列,消费者去消费,整个过程对不会堵塞应用a. 可接受消息丢失, 可以通过扩展seriesdataeventqueuehelper来达到对disruptor队列的监控

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持移动技术网。

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网