当前位置: 移动技术网 > 科技>人工智能>云计算 > flume源码分析

flume源码分析

2018年04月13日  | 移动技术网科技  | 我要评论
Flume的程序入口是org.apache.flume.node.Application#main 进入后会先进行命令行参数的解析及核对,使用的组件是org.apache.commons.c

Flume的程序入口是org.apache.flume.node.Application#main

进入后会先进行命令行参数的解析及核对,使用的组件是org.apache.commons.cli。还是很好用的。

会从参数中获取isZkConfigured及reload两个参数,isZkConfigured是指是否使用zookeeper来存储flume任务的配置,reload是指当flume作业的配置改变了以后是否重新启动程序来加载最新的参数。默认我们不使用zookeeper,而是采用文件来存储:

然后根据reload的不同,启动方式有所不同,为false的时候就直接启动,为true的时候就监听文件的改变,一旦有改变就重新启动加载最新的配置。这里使用了google的一个组件EventBus?:

????????if (reload) {
          EventBus eventBus = new EventBus(agentName + "-event-bus");
          PollingPropertiesFileConfigurationProvider configurationProvider =
              new PollingPropertiesFileConfigurationProvider(
                  agentName, configurationFile, eventBus, 30);
          components.add(configurationProvider);
          application = new Application(components);
          eventBus.register(application);
        } else {
          PropertiesFileConfigurationProvider configurationProvider =
              new PropertiesFileConfigurationProvider(agentName, configurationFile);
          application = new Application();
          application.handleConfigurationEvent(configurationProvider.getConfiguration());
        }

假如是reload模式,

会构造一个configurationProvider ,注意这个configurationProvider 实现了LifecycleAware接口,那么什么是LifecycleAware?flume中把任意有生命周期(有空闲、启动、停止等状态)的组件都看作LifecycleAware,比如sink、source、channel等。

然后据此会构造一个application,并注册监听eventBus。

然后调用application.start(),

  public synchronized void start() {
    for (LifecycleAware component : components) {
      supervisor.supervise(component,
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
    }
  }

这里就是把每个组件交给监护人去监护。下面分析下这个监护人LifecycleSupervisor supervisor

需要注意的是这个监护人自身也是有生命周期的,也实现了LifecycleAware。

LifecycleSupervisor 有个静态内部类Supervisoree记录监护策略、期望状态。

LifecycleSupervisor 还有个静态内部类MonitorRunnable实现Runnable接口,用于根据组件的期望状态去调用组件的相应的方法。

所以org.apache.flume.lifecycle.LifecycleSupervisor#supervise方法主要就是做一些必要的检查,然后将监护信息封装进MonitorRunnable对象,然后启动一个线程去运行它。至于具体怎么运行逻辑都封装在MonitorRunnable里面。

这里为什么使用静态内部类?如果一段逻辑必须要封装独立出去,否则违反类的单一原则,但这个类又只被当前类使用,那么可以考虑内部类,如果这个内部类不访问原类的任何成员变量,那么可以考虑使用静态内部类。

使用内部类目的主要是封装更好,更好维护。

在MonitorRunnable使用这种方法保证组件在多线程环境下状态切换的安全进行。这样我们就不用在LifecycleAware类里面使用synchronized 修饰每个方法了。

synchronized (lifecycleAware) {
.....
case START:lifecycleAware.start
.....
case STOP:lifecycleAware.stop
.....
}
MonitorRunnable使用ScheduledThreadPoolExecutor定时调度运行,来确保每隔几秒钟就对组件来进行一次温暖的监护,确保其位于期望的状态。

如您对本文有疑问或者有任何想说的,请点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网