当前位置: 移动技术网 > IT编程>开发语言>Java > java 中自定义OutputFormat的实例详解

java 中自定义OutputFormat的实例详解

2019年07月19日  | 移动技术网IT编程  | 我要评论

java 中 自定义outputformat的实例详解

实例代码:

package com.ccse.hadoop.outputformat; 
 
import java.io.ioexception; 
import java.net.uri; 
import java.net.urisyntaxexception; 
import java.util.stringtokenizer; 
 
import org.apache.hadoop.conf.configuration; 
import org.apache.hadoop.fs.fsdataoutputstream; 
import org.apache.hadoop.fs.filesystem; 
import org.apache.hadoop.fs.path; 
import org.apache.hadoop.io.longwritable; 
import org.apache.hadoop.io.text; 
import org.apache.hadoop.mapreduce.job; 
import org.apache.hadoop.mapreduce.jobcontext; 
import org.apache.hadoop.mapreduce.mapper; 
import org.apache.hadoop.mapreduce.outputcommitter; 
import org.apache.hadoop.mapreduce.outputformat; 
import org.apache.hadoop.mapreduce.recordwriter; 
import org.apache.hadoop.mapreduce.reducer; 
import org.apache.hadoop.mapreduce.taskattemptcontext; 
import org.apache.hadoop.mapreduce.lib.input.fileinputformat; 
import org.apache.hadoop.mapreduce.lib.output.fileoutputcommitter; 
 
 
public class myselfoutputformatapp { 
   
  public final static string input_path = "hdfs://chaoren1:9000/mapinput"; 
  public final static string output_path = "hdfs://chaoren1:9000/mapoutput"; 
  public final static string output_filename = "/abc"; 
   
  public static void main(string[] args) throws ioexception, urisyntaxexception,  
    classnotfoundexception, interruptedexception { 
    configuration conf = new configuration(); 
    filesystem filesystem = filesystem.get(new uri(output_path), conf); 
    filesystem.delete(new path(output_path), true); 
     
    job job = new job(conf, myselfoutputformatapp.class.getsimplename()); 
    job.setjarbyclass(myselfoutputformatapp.class); 
     
    fileinputformat.setinputpaths(job, new path(input_path)); 
    job.setmapperclass(mymapper.class); 
    job.setmapoutputkeyclass(text.class); 
    job.setmapoutputvalueclass(longwritable.class); 
     
    job.setreducerclass(myreducer.class); 
    job.setoutputkeyclass(text.class); 
    job.setoutputvalueclass(longwritable.class); 
    job.setoutputformatclass(myselfoutputformat.class); 
     
    job.waitforcompletion(true); 
  } 
   
  public static class mymapper extends mapper<longwritable, text, text, longwritable> { 
 
    private text word = new text(); 
    private longwritable writable = new longwritable(1); 
     
    @override 
    protected void map(longwritable key, text value, 
        mapper<longwritable, text, text, longwritable>.context context) 
        throws ioexception, interruptedexception { 
      if (value != null) { 
        string line = value.tostring(); 
        stringtokenizer tokenizer = new stringtokenizer(line); 
        while (tokenizer.hasmoretokens()) { 
          word.set(tokenizer.nexttoken()); 
          context.write(word, writable); 
        } 
      } 
    } 
     
  } 
   
  public static class myreducer extends reducer<text, longwritable, text, longwritable> { 
 
    @override 
    protected void reduce(text key, iterable<longwritable> values, 
        reducer<text, longwritable, text, longwritable>.context context) 
        throws ioexception, interruptedexception { 
      long sum = 0;  
      for (longwritable value : values) { 
        sum += value.get(); 
      } 
      context.write(key, new longwritable(sum)); 
    } 
  } 
 
  public static class myselfoutputformat extends outputformat<text, longwritable> { 
 
    private fsdataoutputstream outputstream = null; 
     
    @override 
    public recordwriter<text, longwritable> getrecordwriter( 
        taskattemptcontext context) throws ioexception, 
        interruptedexception { 
      try { 
        filesystem filesystem = filesystem.get(new uri(myselfoutputformatapp.output_path), context.getconfiguration()); 
        //指定文件的输出路径 
        final path path = new path(myselfoutputformatapp.output_path  
                     + myselfoutputformatapp.output_filename); 
        this.outputstream = filesystem.create(path, false); 
      } catch (urisyntaxexception e) { 
        e.printstacktrace(); 
      } 
      return new myselfrecordwriter(outputstream); 
    } 
 
    @override 
    public void checkoutputspecs(jobcontext context) throws ioexception, 
        interruptedexception { 
    } 
 
    @override 
    public outputcommitter getoutputcommitter(taskattemptcontext context) 
        throws ioexception, interruptedexception { 
      return new fileoutputcommitter(new path(myselfoutputformatapp.output_path), context); 
    } 
     
  } 
   
  public static class myselfrecordwriter extends recordwriter<text, longwritable> { 
 
    private fsdataoutputstream outputstream = null; 
     
    public myselfrecordwriter(fsdataoutputstream outputstream) { 
      this.outputstream = outputstream; 
    } 
     
    @override 
    public void write(text key, longwritable value) throws ioexception, 
        interruptedexception { 
      this.outputstream.writebytes(key.tostring()); 
      this.outputstream.writebytes("\t"); 
      this.outputstream.writelong(value.get()); 
    } 
 
    @override 
    public void close(taskattemptcontext context) throws ioexception, 
        interruptedexception { 
      this.outputstream.close(); 
    } 
     
  } 
   
} 

 2.outputformat是用于处理各种输出目的地的。

2.1 outputformat需要写出去的键值对,是来自于reducer类,是通过recordwriter获得的。

2.2 recordwriter中的write(...)方法只有k和v,写到哪里去哪?这要通过单独传入outputstream来处理。write就是把k和v写入到outputstream中的。

2.3 recordwriter类位于outputformat中的。因此,我们自定义的outputfromat必须继承outputformat类型。那么,流对象必须在getrecordwriter(...)方法中获得。

以上就是java 中自定义outputformat的实例,如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网