hadoop2.7.2 mapreduce job提交源码及切片源码分析

  1. 首先从waitforcompletion函数进入
boolean result = job.waitforcompletion(true);
   * submit the job to the cluster and wait for it to finish.
   * @param verbose print the progress to the user
   * @return true if the job succeeded
   * @throws ioexception thrown if the communication with the 
   *         <code>jobtracker</code> is lost
  public boolean waitforcompletion(boolean verbose
                                   ) throws ioexception, interruptedexception,
                                            classnotfoundexception {
    // 首先判断state,当state为define时可以提交,进入 submit() 方法
    if (state == jobstate.define) {
    if (verbose) {
    } else {
      // get the completion poll interval from the client.
      int completionpollintervalmillis = 
      while (!iscomplete()) {
        try {
        } catch (interruptedexception ie) {
    return issuccessful();
  1. 进入submit()方法
   * submit the job to the cluster and return immediately.
   * @throws ioexception
  public void submit() 
         throws ioexception, interruptedexception, classnotfoundexception {
    // 确认jobstate状态为可提交状态,否则不能提交
    // 设置使用最新的api
    // 进入connect()方法,mapreduce作业提交时连接集群是通过job类的connect()方法实现的,
    // 它实际上是构造集群cluster实例cluster
    // connect()方法执行完之后,定义提交者submitter
    final jobsubmitter submitter = 
        getjobsubmitter(cluster.getfilesystem(), cluster.getclient());
    status = ugi.doas(new privilegedexceptionaction<jobstatus>() {
      public jobstatus run() throws ioexception, interruptedexception, 
      classnotfoundexception {
        // 这里的核心方法是submitjobinternal(),顾名思义,提交job的内部方法,实现了提交job的所有业务逻辑
          // 进入submitjobinternal
        return submitter.submitjobinternal(job.this, cluster);
    // 提交之后state状态改变
    state = jobstate.running;
    log.info("the url to track the job: " + gettrackingurl());
  1. 进入connect()方法
  • mapreduce作业提交时连接集群通过job的connect方法实现,它实际上是构造集群cluster实例cluster
  • cluster是连接mapreduce集群的一种工具,提供了获取mapreduce集群信息的方法
  • 在cluster内部,有一个与集群进行通信的客户端通信协议clientprotocol的实例client,它由clientprotocolprovider的静态create()方法构造
  • 在create内部,hadoop2.x中提供了两种模式的clientprotocol,分别为yarn模式的yarnrunner和local模式的localjobrunner,cluster实际上是由它们负责与集群进行通信的
  private synchronized void connect()
          throws ioexception, interruptedexception, classnotfoundexception {
    if (cluster == null) {// cluster提供了远程获取mapreduce的方法
      cluster = 
        ugi.doas(new privilegedexceptionaction<cluster>() {
                   public cluster run()
                          throws ioexception, interruptedexception, 
                                 classnotfoundexception {
                     // 只需关注这个cluster()构造器,构造集群cluster实例
                     return new cluster(getconfiguration());
  1. 进入cluster()构造器
// 首先调用一个参数的构造器,间接调用两个参数的构造器
public cluster(configuration conf) throws ioexception {
    this(null, conf);

  public cluster(inetsocketaddress jobtrackaddr, configuration conf) 
      throws ioexception {
    this.conf = conf;
    this.ugi = usergroupinformation.getcurrentuser();
    // 最重要的initialize方法
    initialize(jobtrackaddr, conf);
// cluster中要关注的两个成员变量是客户端通讯协议提供者clientprotocolprovider和客户端通讯协议clientprotocol实例client
  private void initialize(inetsocketaddress jobtrackaddr, configuration conf)
      throws ioexception {

    synchronized (frameworkloader) {
      for (clientprotocolprovider provider : frameworkloader) {
        log.debug("trying clientprotocolprovider : "
            + provider.getclass().getname());
        clientprotocol clientprotocol = null; 
        try {
          // 如果配置文件没有配置yarn信息,则构建localrunner,mr任务本地运行
          // 如果配置文件有配置yarn信息,则构建yarnrunner,mr任务在yarn集群上运行
          if (jobtrackaddr == null) {
            // 客户端通讯协议client是调用clientprotocolprovider的create()方法实现
            clientprotocol = provider.create(conf);
          } else {
            clientprotocol = provider.create(jobtrackaddr, conf);

          if (clientprotocol != null) {
            clientprotocolprovider = provider;
            client = clientprotocol;
            log.debug("picked " + provider.getclass().getname()
                + " as the clientprotocolprovider");
          else {
            log.debug("cannot pick " + provider.getclass().getname()
                + " as the clientprotocolprovider - returned null protocol");
        catch (exception e) {
          log.info("failed to use " + provider.getclass().getname()
              + " due to error: ", e);

    if (null == clientprotocolprovider || null == client) {
      throw new ioexception(
          "cannot initialize cluster. please check your configuration for "
              + mrconfig.framework_name
              + " and the correspond server addresses.");
  1. 进入submitjobinternal(),job的内部提交方法,用于提交job到集群
jobstatus submitjobinternal(job job, cluster cluster) 
  throws classnotfoundexception, interruptedexception, ioexception {

    //validate the jobs output specs 
    // 检查结果的输出路径是否已经存在,如果存在会报异常

    // conf里边是集群的xml配置文件信息
    configuration conf = job.getconfiguration();
    // 添加mr框架到分布式缓存中

    // 获取提交执行时相关资源的临时存放路径
    // 参数未配置时默认是(工作空间根目录下的)/tmp/hadoop-yarn/staging/提交作业用户名/.staging
    path jobstagingarea = jobsubmissionfiles.getstagingdir(cluster, conf);
    //configure the command line options correctly on the submitting dfs
    inetaddress ip = inetaddress.getlocalhost();
    if (ip != null) {//记录提交作业的主机ip、主机名,并且设置配置信息conf
      submithostaddress = ip.gethostaddress();
      submithostname = ip.gethostname();
    // 获取jobid
    jobid jobid = submitclient.getnewjobid();
    // 设置jobid
    // 提交作业的路径path(path parent, string child),会将两个参数拼接为一个路径
    path submitjobdir = new path(jobstagingarea, jobid.tostring());
    // job的状态
    jobstatus status = null;
    try {
      conf.set(mrjobconfig.mapreduce_job_dir, submitjobdir.tostring());
      log.debug("configuring job " + jobid + " with " + submitjobdir 
          + " as the submit dir");
      // get delegation token for the dir
          new path[] { submitjobdir }, conf);
      populatetokencache(conf, job.getcredentials());

      // generate a secret to authenticate shuffle transfers
      if (tokencache.getshufflesecretkey(job.getcredentials()) == null) {
        keygenerator keygen;
        try {
          keygen = keygenerator.getinstance(shuffle_keygen_algorithm);
        } catch (nosuchalgorithmexception e) {
          throw new ioexception("error generating shuffle secret key", e);
        secretkey shufflekey = keygen.generatekey();
      if (cryptoutils.isencryptedspillenabled(conf)) {
        conf.setint(mrjobconfig.mr_am_max_attempts, 1);
        log.warn("max job attempts set to 1 since encrypted intermediate" +
                "data spill is enabled");

      // 拷贝jar包到集群
      // 此方法中调用如下方法:ruploader.uploadfiles(job, jobsubmitdir);
      // uploadfiles方法将jar包拷贝到集群
      copyandconfigurefiles(job, submitjobdir);

      path submitjobfile = jobsubmissionfiles.getjobconfpath(submitjobdir);
      // create the splits for the job
      log.debug("creating splits at " + jtfs.makequalified(submitjobdir));
      // 计算切片,生成切片规划文件
      int maps = writesplits(job, submitjobdir);
      conf.setint(mrjobconfig.num_maps, maps);
      log.info("number of splits:" + maps);

      // write "queue admins of the queue to which job is being submitted"
      // to job file.
      string queue = conf.get(mrjobconfig.queue_name,
      accesscontrollist acl = submitclient.getqueueadmins(queue);
          queueacl.administer_jobs.getaclname()), acl.getaclstring());

      // removing jobtoken referrals before copying the jobconf to hdfs
      // as the tasks don't need this setting, actually they may break
      // because of it if present as the referral will point to a
      // different job.

      if (conf.getboolean(
          mrjobconfig.default_job_token_tracking_ids_enabled)) {
        // add hdfs tracking ids
        arraylist<string> trackingids = new arraylist<string>();
        for (token<? extends tokenidentifier> t :
            job.getcredentials().getalltokens()) {
            trackingids.toarray(new string[trackingids.size()]));

      // set reservation info if it exists
      reservationid reservationid = job.getreservationid();
      if (reservationid != null) {
        conf.set(mrjobconfig.reservation_id, reservationid.tostring());

      // write job file to submit dir
      writeconf(conf, submitjobfile);
      // now, actually submit the job (using the submit name)
      // 开始正式提交job
      printtokens(jobid, job.getcredentials());
      status = submitclient.submitjob(
          jobid, submitjobdir.tostring(), job.getcredentials());
      if (status != null) {
        return status;
      } else {
        throw new ioexception("could not launch job");
    } finally {
      if (status == null) {
        log.info("cleaning up the staging area " + submitjobdir);
        if (jtfs != null && submitjobdir != null)
          jtfs.delete(submitjobdir, true);

  1. 进入writesplits(job, submitjobdir),计算切片,生成切片规划文件
  • 内部会调用writenewsplits(job, jobsubmitdir)方法
  • writenewsplits(job, jobsubmitdir)内部定义了一个inputformat类型的实例input
  • inputformat主要作用
    • 验证job的输入规范
    • 对输入的文件进行切分,形成多个inputsplit(切片)文件,每一个inputsplit对应着一个map任务(maptask)
    • 将切片后的数据按照规则形成key,value键值对recordreader
  • input调用getsplits()方法:list<inputsplit> splits = input.getsplits(job);
  1. 进入fileinputformat类下的getsplits(job)方法
   * generate the list of files and make them into filesplits.
   * @param job the job context
   * @throws ioexception
  public list<inputsplit> getsplits(jobcontext job) throws ioexception {
    stopwatch sw = new stopwatch().start();
    // getformatminsplitsize()返回值固定为1,getminsplitsize(job)返回job大小
    long minsize = math.max(getformatminsplitsize(), getminsplitsize(job));
    // getmaxsplitsize(job)返回lang类型的最大值
    long maxsize = getmaxsplitsize(job);

    // generate splits 生成切片
    list<inputsplit> splits = new arraylist<inputsplit>();
    list<filestatus> files = liststatus(job);
    // 遍历job下的所有文件
    for (filestatus file: files) {
      // 获取文件路径
      path path = file.getpath();
      // 获取文件大小
      long length = file.getlen();
      if (length != 0) {
        blocklocation[] blklocations;
        if (file instanceof locatedfilestatus) {
          blklocations = ((locatedfilestatus) file).getblocklocations();
        } else {
          filesystem fs = path.getfilesystem(job.getconfiguration());
          blklocations = fs.getfileblocklocations(file, 0, length);
        // 判断是否可分割
        if (issplitable(job, path)) {
          // 获取块大小
          // 本地环境块大小默认为32mb,yarn环境在hadoop2.x新版本为128mb,旧版本为64mb
          long blocksize = file.getblocksize();
          // 计算切片的逻辑大小,默认等于块大小
          // 返回值为:return math.max(minsize, math.min(maxsize, blocksize));
          // 其中minsize=1, maxsize=long类型最大值, blocksize为切片大小
          long splitsize = computesplitsize(blocksize, minsize, maxsize);

          long bytesremaining = length;
          // 每次切片时就要判断切片剩下的部分是否大于切片大小的split_slop(默认为1.1)倍,
          // 否则就不再切分,划为一块
          while (((double) bytesremaining)/splitsize > split_slop) {
            int blkindex = getblockindex(blklocations, length-bytesremaining);
            splits.add(makesplit(path, length-bytesremaining, splitsize,
            bytesremaining -= splitsize;

          if (bytesremaining != 0) {
            int blkindex = getblockindex(blklocations, length-bytesremaining);
            splits.add(makesplit(path, length-bytesremaining, bytesremaining,
        } else { // not splitable
          splits.add(makesplit(path, 0, length, blklocations[0].gethosts(),
      } else { 
        //create empty hosts array for zero length files
        splits.add(makesplit(path, 0, length, new string[0]));
    // save the number of input files for metrics/loadgen
    job.getconfiguration().setlong(num_input_files, files.size());
    if (log.isdebugenabled()) {
      log.debug("total # of splits generated by getsplits: " + splits.size()
          + ", timetaken: " + sw.now(timeunit.milliseconds));
    return splits;


