当前位置: 移动技术网 > IT编程>开发语言>Java > java使用hadoop实现关联商品统计

java使用hadoop实现关联商品统计

2019年07月22日  | 移动技术网IT编程  | 我要评论

最近几天一直在看hadoop相关的书籍,目前稍微有点感觉,自己就仿照着wordcount程序自己编写了一个统计关联商品。

需求描述:

根据超市的销售清单,计算商品之间的关联程度(即统计同时买a商品和b商品的次数)。

数据格式:

超市销售清单简化为如下格式:一行表示一个清单,每个商品采用 "," 分割,如下图所示:

需求分析:

采用hadoop中的mapreduce对该需求进行计算。

map函数主要拆分出关联的商品,输出结果为 key为商品a,value为商品b,对于第一条三条结果拆分结果如下图所示:

这里为了统计出和a、b两件商品想关联的商品,所以商品a、b之间的关系输出两条结果即 a-b、b-a。

reduce函数分别对和商品a相关的商品进行分组统计,即分别求value中的各个商品出现的次数,输出结果为key为商品a|商品b,value为该组合出现的次数。针对上面提到的5条记录,对map输出中key值为r的做下分析:

通过map函数的处理,得到如下图所示的记录:

reduce中对map输出的value值进行分组计数,得到的结果如下图所示

将商品a b作为key,组合个数作为value输出,输出结果如下图所示:

对于需求的实现过程的分析到目前就结束了,下面就看下具体的代码实现

代码实现:

关于代码就不做详细的介绍,具体参照代码之中的注释吧。

package com; 
 
import java.io.ioexception; 
import java.util.hashmap; 
import java.util.map.entry; 
 
import org.apache.hadoop.conf.configuration; 
import org.apache.hadoop.conf.configured; 
import org.apache.hadoop.fs.path; 
import org.apache.hadoop.io.intwritable; 
import org.apache.hadoop.io.longwritable; 
import org.apache.hadoop.io.text; 
import org.apache.hadoop.mapreduce.job; 
import org.apache.hadoop.mapreduce.mapper; 
import org.apache.hadoop.mapreduce.reducer; 
import org.apache.hadoop.mapreduce.lib.input.fileinputformat; 
import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; 
import org.apache.hadoop.mapreduce.lib.output.textoutputformat; 
import org.apache.hadoop.util.tool; 
import org.apache.hadoop.util.toolrunner; 
 
public class test extends configured implements tool{ 
 
  /** 
   * map类,实现数据的预处理 
   * 输出结果key为商品a value为关联商品b 
   * @author lulei 
   */ 
  public static class mapt extends mapper<longwritable, text, text, text> { 
    public void map(longwritable key, text value, context context) throws ioexception, interruptedexception{ 
      string line = value.tostring(); 
      if (!(line == null || "".equals(line))) { 
        //分割商品 
        string []vs = line.split(","); 
        //两两组合,构成一条记录 
        for (int i = 0; i < (vs.length - 1); i++) { 
          if ("".equals(vs[i])) {//排除空记录 
            continue; 
          } 
          for (int j = i+1; j < vs.length; j++) { 
            if ("".equals(vs[j])) { 
              continue; 
            } 
            //输出结果 
            context.write(new text(vs[i]), new text(vs[j])); 
            context.write(new text(vs[j]), new text(vs[i])); 
          } 
        } 
      }  
    } 
  } 
   
  /** 
   * reduce类,实现数据的计数 
   * 输出结果key 为商品a|b value为该关联次数 
   * @author lulei 
   */ 
  public static class reducet extends reducer<text, text, text, intwritable> { 
    private int count; 
     
    /** 
     * 初始化 
     */ 
    public void setup(context context) { 
      //从参数中获取最小记录个数 
      string countstr = context.getconfiguration().get("count"); 
      try { 
        this.count = integer.parseint(countstr); 
      } catch (exception e) { 
        this.count = 0; 
      } 
    } 
    public void reduce(text key, iterable<text> values, context context) throws ioexception, interruptedexception{ 
      string keystr = key.tostring(); 
      hashmap<string, integer> hashmap = new hashmap<string, integer>(); 
      //利用hash统计b商品的次数 
      for (text value : values) { 
        string valuestr = value.tostring(); 
        if (hashmap.containskey(valuestr)) { 
          hashmap.put(valuestr, hashmap.get(valuestr) + 1); 
        } else { 
          hashmap.put(valuestr, 1); 
        } 
      } 
      //将结果输出 
      for (entry<string, integer> entry : hashmap.entryset()) { 
        if (entry.getvalue() >= this.count) {//只输出次数不小于最小值的 
          context.write(new text(keystr + "|" + entry.getkey()), new intwritable(entry.getvalue())); 
        } 
      } 
    } 
  } 
   
  @override 
  public int run(string[] arg0) throws exception { 
    // todo auto-generated method stub 
    configuration conf = getconf(); 
    conf.set("count", arg0[2]); 
     
    job job = new job(conf); 
    job.setjobname("jobtest"); 
     
    job.setoutputformatclass(textoutputformat.class); 
    job.setoutputkeyclass(text.class); 
    job.setoutputvalueclass(text.class); 
     
    job.setmapperclass(mapt.class); 
    job.setreducerclass(reducet.class); 
     
    fileinputformat.addinputpath(job, new path(arg0[0])); 
    fileoutputformat.setoutputpath(job, new path(arg0[1])); 
     
    job.waitforcompletion(true); 
     
    return job.issuccessful() ? 0 : 1; 
     
  } 
   
  /** 
   * @param args 
   */ 
  public static void main(string[] args) { 
    // todo auto-generated method stub 
    if (args.length != 3) { 
      system.exit(-1); 
    } 
    try { 
      int res = toolrunner.run(new configuration(), new test(), args); 
      system.exit(res); 
    } catch (exception e) { 
      // todo auto-generated catch block 
      e.printstacktrace(); 
    } 
  } 
 
} 

上传运行:

将程序打包成jar文件,上传到机群之中。将测试数据也上传到hdfs分布式文件系统中。

命令运行截图如下图所示:

运行结束后查看相应的hdfs文件系统,如下图所示:

到此一个完整的mapreduce程序就完成了,关于hadoop的学习,自己还将继续~感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网