当前位置: 移动技术网 > IT编程>数据库>其他数据库 > Hadoop学习之路(7)MapReduce自定义排序

Hadoop学习之路(7)MapReduce自定义排序

2019年12月28日  | 移动技术网IT编程  | 我要评论

本文测试文本:

tom 20 8000
nancy 22 8000
ketty 22 9000
stone 19 10000
green 19 11000
white 39 29000
socrates 30 40000

   mapreduce中,根据key进行分区、排序、分组
mapreduce会按照基本类型对应的key进行排序,如int类型的intwritable,long类型的longwritable,text类型,默认升序排序
   为什么要自定义排序规则?现有需求,需要自定义key类型,并自定义key的排序规则,如按照人的salary降序排序,若相同,则再按age升序排序
以text类型为例:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
text类实现了writablecomparable接口,并且有write()readfields()compare()方法
readfields()方法:用来反序列化操作
write()方法:用来序列化操作
所以要想自定义类型用来排序需要有以上的方法
自定义类代码

import org.apache.hadoop.io.writablecomparable;
import java.io.datainput;
import java.io.dataoutput;
import java.io.ioexception;
public class person implements writablecomparable<person> {
    private string name;
    private int age;
    private int salary;
    public person() {
    }
    public person(string name, int age, int salary) {
        //super();
        this.name = name;
        this.age = age;
        this.salary = salary;
    }
    public string getname() {
        return name;
    }
    public void setname(string name) {
        this.name = name;
    }
    public int getage() {
        return age;
    }
    public void setage(int age) {
        this.age = age;
    }
    public int getsalary() {
        return salary;
    }
    public void setsalary(int salary) {
        this.salary = salary;
    }
    @override
    public string tostring() {
        return this.salary + "  " + this.age + "    " + this.name;
    }
    //先比较salary,高的排序在前;若相同,age小的在前
    public int compareto(person o) {
        int compareresult1= this.salary - o.salary;
        if(compareresult1 != 0) {
            return -compareresult1;
        } else {
            return this.age - o.age;
        }
    }
    //序列化,将newkey转化成使用流传送的二进制
    public void write(dataoutput dataoutput) throws ioexception {
        dataoutput.writeutf(name);
        dataoutput.writeint(age);
        dataoutput.writeint(salary);
    }
    //使用in读字段的顺序,要与write方法中写的顺序保持一致
    public void readfields(datainput datainput) throws ioexception {
        //read string
        this.name = datainput.readutf();
        this.age = datainput.readint();
        this.salary = datainput.readint();
    }

}

mapreuduce程序:

import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.filesystem;
import org.apache.hadoop.fs.path;
import org.apache.hadoop.io.longwritable;
import org.apache.hadoop.io.nullwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.mapreduce.job;
import org.apache.hadoop.mapreduce.mapper;
import org.apache.hadoop.mapreduce.reducer;
import org.apache.hadoop.mapreduce.lib.input.fileinputformat;
import org.apache.hadoop.mapreduce.lib.output.fileoutputformat;
import java.io.ioexception;
import java.net.uri;
public class  secondarysort {
	public static void main(string[] args) throws exception {
		system.setproperty("hadoop_user_name","hadoop2.7");
		configuration configuration = new configuration();
        //设置本地运行的mapreduce程序 jar包
        configuration.set("mapreduce.job.jar","c:\\users\\tanglei1\\ideaprojects\\hadooptang\\target\\com.kaikeba.hadoop-1.0-snapshot.jar");
		job job = job.getinstance(configuration, secondarysort.class.getsimplename());
		filesystem filesystem = filesystem.get(uri.create(args[1]), configuration);
		if (filesystem.exists(new path(args[1]))) {
			filesystem.delete(new path(args[1]), true);
		}
		fileinputformat.setinputpaths(job, new path(args[0]));
		job.setmapperclass(mymap.class);
		job.setmapoutputkeyclass(person.class);
		job.setmapoutputvalueclass(nullwritable.class);
		//设置reduce的个数
		job.setnumreducetasks(1);
		job.setreducerclass(myreduce.class);
		job.setoutputkeyclass(person.class);
		job.setoutputvalueclass(nullwritable.class);
		fileoutputformat.setoutputpath(job, new path(args[1]));
		job.waitforcompletion(true);
	}
	public static class mymap extends
			mapper<longwritable, text, person, nullwritable> {
		//longwritable:输入参数键类型,text:输入参数值类型
		//persion:输出参数键类型,nullwritable:输出参数值类型
		@override
		//map的输出值是键值对<k,v>,nullwritable说关心v的值
		protected void map(longwritable key, text value,
				context context)
				throws ioexception, interruptedexception {
			//longwritable key:输入参数键值对的键,text value:输入参数键值对的值
			//获得一行数据,输入参数的键(距首行的位置),hadoop读取数据的时候逐行读取文本
			//fields:代表着文本一行的的数据
			string[] fields = value.tostring().split(" ");
			// 本列中文本一行数据:nancy 22 8000
			string name = fields[0];
			//字符串转换成int
			int age = integer.parseint(fields[1]);
			int salary = integer.parseint(fields[2]);
			//在自定义类中进行比较
			person person = new person(name, age, salary);
			context.write(person, nullwritable.get());
		}
	}
	public static class myreduce extends
			reducer<person, nullwritable, person, nullwritable> {
		@override
		protected void reduce(person key, iterable<nullwritable> values, context context) throws ioexception, interruptedexception {
			context.write(key, nullwritable.get());
		}
	}
}

运行结果:

40000  30    socrates
29000  39    white
11000  19    green
10000  19    stone
9000  22    ketty
8000  20    tom
8000  22    nancy

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网