当前位置: 移动技术网 > IT编程>开发语言>Java > 关于通过java调用datax,返回任务执行的方法

关于通过java调用datax,返回任务执行的方法

2019年09月07日  | 移动技术网IT编程  | 我要评论

datax

datax 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 mysql、oracle、sqlserver、postgre、hdfs、hive、ads、hbase、tablestore(ots)、maxcompute(odps)、drds 等各种异构数据源之间高效的数据同步功能。

datax的详细介绍

请参考 datax-introduction

引言

因为业务需要,需要使用到datax把数据从文本写入到数据库,原来的做法都是使用python通过datax.py去调用脚本,阿文为了能更好的管控datax的任务,阿文要求我们对datax进行改造,使用java集成的方式去调用datax,并返回任务执行的详细信息。

datax源码跟踪

从github下完源码开始改造,datax的启动类在datax-core包下engine类的entry方法,该方法是一个静态方法。

public static void entry(final string[] args) throws throwable {
 options options = new options();
 options.addoption("job", true, "job config.");
 options.addoption("jobid", true, "job unique id.");
 options.addoption("mode", true, "job runtime mode.");

 basicparser parser = new basicparser();
 commandline cl = parser.parse(options, args);

 string jobpath = cl.getoptionvalue("job");

 // 如果用户没有明确指定jobid, 则 datax.py 会指定 jobid 默认值为-1
 string jobidstring = cl.getoptionvalue("jobid");
 runtime_mode = cl.getoptionvalue("mode");

 configuration configuration = configparser.parse(jobpath);

 long jobid;
 if (!"-1".equalsignorecase(jobidstring)) {
  jobid = long.parselong(jobidstring);
 } else {
  // only for dsc & ds & datax 3 update
  string dscjoburlpatternstring = "/instance/(\\d{1,})/config.xml";
  string dsjoburlpatternstring = "/inner/job/(\\d{1,})/config";
  string dstaskgroupurlpatternstring = "/inner/job/(\\d{1,})/taskgroup/";
  list<string> patternstringlist = arrays.aslist(dscjoburlpatternstring,
   dsjoburlpatternstring, dstaskgroupurlpatternstring);
  jobid = parsejobidfromurl(patternstringlist, jobpath);
 }

 boolean isstandalonemode = "standalone".equalsignorecase(runtime_mode);
 if (!isstandalonemode && jobid == -1) {
  // 如果不是 standalone 模式,那么 jobid 一定不能为-1
  throw dataxexception.asdataxexception(frameworkerrorcode.config_error, "非 standalone 模式必须在 url 中提供有效的 jobid.");
 }
 configuration.set(coreconstant.datax_core_container_job_id, jobid);

 //打印vminfo
 vminfo vminfo = vminfo.getvminfo();
 if (vminfo != null) {
  log.info(vminfo.tostring());
 }

 log.info("\n" + engine.filterjobconfiguration(configuration) + "\n");

 log.debug(configuration.tojson());

 configurationvalidate.dovalidate(configuration);
 engine engine = new engine();
 engine.start(configuration);
 }

里面最后通过调用engine.start(configuration) 开始启动,我们点进去,最后会发现在里面是调用jobcontainer 的start() 方法。

@override
 public void start() {
 log.info("datax jobcontainer starts job.");

 boolean hasexception = false;
 boolean isdryrun = false;
 try {
  this.starttimestamp = system.currenttimemillis();
  isdryrun = configuration.getbool(coreconstant.datax_job_setting_dryrun, false);
  if (isdryrun) {
  log.info("jobcontainer starts to do precheck ...");
  this.precheck();
  } else {
  userconf = configuration.clone();
  log.debug("jobcontainer starts to do prehandle ...");
  this.prehandle();

  log.debug("jobcontainer starts to do init ...");
  this.init();
  log.info("jobcontainer starts to do prepare ...");
  this.prepare();
  log.info("jobcontainer starts to do split ...");
  this.totalstage = this.split();
  log.info("jobcontainer starts to do schedule ...");
  this.schedule();
  log.debug("jobcontainer starts to do post ...");
  this.post();

  log.debug("jobcontainer starts to do posthandle ...");
  this.posthandle();
  log.info("datax jobid [{}] completed successfully.", this.jobid);

  this.invokehooks();
  }
 } catch (throwable e) {
  log.error("exception when job run", e);

  hasexception = true;

  if (e instanceof outofmemoryerror) {
  this.destroy();
  system.gc();
  }


  if (super.getcontainercommunicator() == null) {
  // 由于 containercollector 是在 scheduler() 中初始化的,所以当在 scheduler() 之前出现异常时,需要在此处对 containercollector 进行初始化

  abstractcontainercommunicator tempcontainercollector;
  // standalone
  tempcontainercollector = new standalonejobcontainercommunicator(configuration);

  super.setcontainercommunicator(tempcontainercollector);
  }

  communication communication = super.getcontainercommunicator().collect();
  // 汇报前的状态,不需要手动进行设置
  // communication.setstate(state.failed);
  communication.setthrowable(e);
  communication.settimestamp(this.endtimestamp);

  communication tempcomm = new communication();
  tempcomm.settimestamp(this.starttransfertimestamp);

  communication reportcommunication = communicationtool.getreportcommunication(communication, tempcomm, this.totalstage);
  super.getcontainercommunicator().report(reportcommunication);

  throw dataxexception.asdataxexception(
   frameworkerrorcode.runtime_error, e);
 } finally {
  if (!isdryrun) {

  this.destroy();
  this.endtimestamp = system.currenttimemillis();
  if (!hasexception) {
   //最后打印cpu的平均消耗,gc的统计
   vminfo vminfo = vminfo.getvminfo();
   if (vminfo != null) {
   vminfo.getdelta(false);
   log.info(vminfo.totalstring());
   }

   log.info(perftrace.getinstance().summarizenoexception());
   this.logstatistics();
  }
  }
 }
 }

而我们需要的任务信息就在this.logstatistics() 中

private void logstatistics() {
 long totalcosts = (this.endtimestamp - this.starttimestamp) / 1000;
 long transfercosts = (this.endtransfertimestamp - this.starttransfertimestamp) / 1000;
 if (0l == transfercosts) {
  transfercosts = 1l;
 }

 if (super.getcontainercommunicator() == null) {
  return;
 }

 communication communication = super.getcontainercommunicator().collect();
 communication.settimestamp(this.endtimestamp);

 communication tempcomm = new communication();
 tempcomm.settimestamp(this.starttransfertimestamp);

 communication reportcommunication = communicationtool.getreportcommunication(communication, tempcomm, this.totalstage);

 // 字节速率
 long bytespeedpersecond = communication.getlongcounter(communicationtool.read_succeed_bytes)
  / transfercosts;

 long recordspeedpersecond = communication.getlongcounter(communicationtool.read_succeed_records)
  / transfercosts;

 reportcommunication.setlongcounter(communicationtool.byte_speed, bytespeedpersecond);
 reportcommunication.setlongcounter(communicationtool.record_speed, recordspeedpersecond);

 super.getcontainercommunicator().report(reportcommunication);


 log.info(string.format(
  "\n" + "%-26s: %-18s\n" + "%-26s: %-18s\n" + "%-26s: %19s\n"
   + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n"
   + "%-26s: %19s\n",
  "任务启动时刻",
  dateformat.format(starttimestamp),

  "任务结束时刻",
  dateformat.format(endtimestamp),

  "任务总计耗时",
  string.valueof(totalcosts) + "s",
  "任务平均流量",
  strutil.stringify(bytespeedpersecond)
   + "/s",
  "记录写入速度",
  string.valueof(recordspeedpersecond)
   + "rec/s", "读出记录总数",
  string.valueof(communicationtool.gettotalreadrecords(communication)),
  "读写失败总数",
  string.valueof(communicationtool.gettotalerrorrecords(communication))
 ));

 log.info("task-total-info:" + dateformat.format(starttimestamp) + "|" +
  dateformat.format(endtimestamp) + "|" +
  string.valueof(totalcosts) + "|" +
  strutil.stringify(bytespeedpersecond) + "|" +
  string.valueof(recordspeedpersecond) + "|" +
  string.valueof(communicationtool.gettotalreadrecords(communication)) + "|" +
  string.valueof(communicationtool.gettotalerrorrecords(communication))
 );

 if (communication.getlongcounter(communicationtool.transformer_succeed_records) > 0
  || communication.getlongcounter(communicationtool.transformer_failed_records) > 0
  || communication.getlongcounter(communicationtool.transformer_filter_records) > 0) {
  log.info(string.format(
   "\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n",
   "transformer成功记录总数",
   communication.getlongcounter(communicationtool.transformer_succeed_records),

   "transformer失败记录总数",
   communication.getlongcounter(communicationtool.transformer_failed_records),

   "transformer过滤记录总数",
   communication.getlongcounter(communicationtool.transformer_filter_records)
  ));
 }
 }

改造开始

新增返回实体dataxresult (get、set省略)

public class dataxresult {
 //任务启动时刻
 private long starttimestamp;
 //任务结束时刻
 private long endtimestamp;
 //任务总时耗
 private long totalcosts;
 //任务平均流量
 private long bytespeedpersecond;
 //记录写入速度
 private long recordspeedpersecond;
 //读出记录总数
 private long totalreadrecords;
 //读写失败总数
 private long totalerrorrecords;
 //成功记录总数
 private long transformersucceedrecords;
 // 失败记录总数
 private long transformerfailedrecords;
 // 过滤记录总数
 private long transformerfilterrecords;
 //字节数
 private long readsucceedbytes;
 //转换开始时间
 private long endtransfertimestamp;
 //转换结束时间
 private long starttransfertimestamp;
 //转换总耗时
 private long transfercosts;

重写logstatistics方法,返回该实体。

private dataxresult logstatistics(dataxresult resultmsg) {
 long totalcosts = (this.endtimestamp - this.starttimestamp) / 1000;
 long transfercosts = (this.endtransfertimestamp - this.starttransfertimestamp) / 1000;
 if (0l == transfercosts) {
  transfercosts = 1l;
 }
 if (super.getcontainercommunicator() == null) {
  return resultmsg;
 }
 communication communication = super.getcontainercommunicator().collect();
 long bytespeedpersecond = communication.getlongcounter(communicationtool.read_succeed_bytes)
  / transfercosts;
 long recordspeedpersecond = communication.getlongcounter(communicationtool.read_succeed_records)
  / transfercosts;
  
 return resultmsg.getresultmsg(starttimestamp,
  endtimestamp,
  totalcosts,
  bytespeedpersecond,
  recordspeedpersecond,
  communication.getlongcounter(communicationtool.transformer_succeed_records),
  communication.getlongcounter(communicationtool.transformer_failed_records),
  communication.getlongcounter(communicationtool.transformer_filter_records),
  communication.getlongcounter(communicationtool.transformer_failed_records),
  communication.getlongcounter(communicationtool.transformer_filter_records),
  communication.getlongcounter(communicationtool.read_succeed_bytes),
  this.endtransfertimestamp,
  this.starttransfertimestamp,
  transfercosts
 );


 }

还需要重写jobcontainer的**start()**方法。

@override
 public dataxresult start(dataxresult dataxresult) {
 ...
 dataxresult result = new dataxresult();
 result = logstatistics(dataxresult);
 ...
 return result;
 }

然后在engine 类中添加模拟测试方法mockentry

 public dataxresult mockstart(configuration allconf) {

 ...
 dataxresult dataxresult = new dataxresult();
 return container.start(dataxresult);
 }

开始测试

在com.alibaba.datax.core.util.container.coreconstant里修改datax_home 为本地路径

该datax_home路径下有以下几个目录

public class test {

 public static void main(string[] args) {
 string[] datxargs = {"-job", coreconstant.datax_home + "\\job\\job2.json", "-mode", "standalone", "-jobid", "-1"};
 try {
  dataxresult dataxresult= engine.mockentry(datxargs);
 } catch (throwable e) {
  e.printstacktrace();
 }

 }
}

执行结果为

3

大功告成!

以上这篇关于通过java调用datax,返回任务执行的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持移动技术网。

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网