当前位置: 移动技术网 > IT编程>数据库>其他数据库 > 基于Storm的WordCount

基于Storm的WordCount

2019年12月18日  | 移动技术网IT编程  | 我要评论

storm wordcount 工作过程

storm 版本:
1、spout 从外部数据源中读取数据,随机发送一个元组对象出去;
2、splitbolt 接收 spout 中输出的元组对象,将元组中的数据切分成单词,并将切分后的单词发射出去;
3、wordcountbolt 接收 splitbolt 中输出的单词数组,对里面单词的频率进行累加,将累加后的结果输出。

java 版本:
1、读取文件中的数据,一行一行的读取;
2、将读到的数据进行切割;
3、对切割后的数组中的单词进行计算。

hadoop 版本:
1、按行读取文件中的数据;
2、在 mapper()函数中对每一行的数据进行切割,并输出切割后的数据数组;
3、接收 mapper()中输出的数据数组,在 reducer()函数中对数组中的单词进行计算,将计算后的统计结果输出。

源代码

storm的配置、eclipse里maven的配置以及创建项目部分省略。

mainclass

package com.test.stormwordcount;
import backtype.storm.config; 
import backtype.storm.localcluster; 
import backtype.storm.stormsubmitter; 
import backtype.storm.generated.alreadyaliveexception; 
import backtype.storm.generated.invalidtopologyexception; 
import backtype.storm.topology.topologybuilder; 
import backtype.storm.tuple.fields; 

public class mainclass { 

    public static void main(string[] args) throws alreadyaliveexception, invalidtopologyexception {         
        //创建一个 topologybuilder         
        topologybuilder tb = new topologybuilder();         
        tb.setspout("spoutbolt", new spoutbolt(), 2);         tb.setbolt("splitbolt", new splitbolt(), 2).shufflegrouping("spoutbolt");         
        tb.setbolt("countbolt", new countbolt(), 4).fieldsgrouping("splitbolt", new fields("word"));         
        //创建配置         
        config conf = new config();         
        //设置 worker 数量         
        conf.setnumworkers(2);         
        //提交任务         
        //集群提交         
        //stormsubmitter.submittopology("mywordcount", conf, tb.createtopology());         
        //本地提交         
        localcluster localcluster = new localcluster();         
        localcluster.submittopology("mywordcount", conf, tb.createtopology()); 
    }  
} 

splitbolt 部分

package com.test.stormwordcount;
import java.util.map; 
import backtype.storm.task.outputcollector; 
import backtype.storm.task.topologycontext; 
import backtype.storm.topology.outputfieldsdeclarer; 
import backtype.storm.topology.base.baserichbolt; 
import backtype.storm.tuple.fields; 
import backtype.storm.tuple.tuple; 
import backtype.storm.tuple.values; 

public class splitbolt extends baserichbolt{      
    outputcollector collector; 

    /**      * 初始化      */     
    public void prepare(map stormconf, topologycontext context, outputcollector collector) {         
        this.collector = collector;     
        } 

    /**      * 执行方法      */     
    public void execute(tuple input) {         
        string line = input.getstring(0);         
        string[] split = line.split(" ");         
        for (string word : split) {             
            collector.emit(new values(word));         
            }     
        } 

    /**      * 输出      */     
    public void declareoutputfields(outputfieldsdeclarer declarer) {         
        declarer.declare(new fields("word"));     
        } 
} 

countbolt 部分

package com.test.stormwordcount;
import java.util.hashmap; 
import java.util.map; 
import backtype.storm.task.outputcollector; 
import backtype.storm.task.topologycontext; 
import backtype.storm.topology.outputfieldsdeclarer; 
import backtype.storm.topology.base.baserichbolt; 
import backtype.storm.tuple.tuple; 

public class countbolt extends baserichbolt{ 

    outputcollector collector;
    map<string, integer> map = new hashmap<string, integer>(); 

    /**      * 初始化      */     
    public void prepare(map stormconf, topologycontext context, outputcollector collector) {         
        this.collector = collector;     
        } 


    /**      * 执行方法      */     
public void execute(tuple input) {         
    string word = input.getstring(0);         
    if(map.containskey(word)){             
    integer c = map.get(word);             
        map.put(word, c+1);         
        }else{             
        map.put(word, 1);         
        }         
    //测试输出         
    system.out.println("结果:"+map);     
    } 

    /**      * 输出      */     
public void declareoutputfields(outputfieldsdeclarer declarer) {     
    
} 
} 

spoutbolt 部分

package com.test.stormwordcount;
import java.util.map; 
import backtype.storm.spout.spoutoutputcollector; 
import backtype.storm.task.topologycontext; 
import backtype.storm.topology.outputfieldsdeclarer; 
import backtype.storm.topology.base.baserichspout; 
import backtype.storm.tuple.fields; 
import backtype.storm.tuple.values; 

public class spoutbolt extends baserichspout{ 

    spoutoutputcollector collector;
    /**      * 初始化方法      */     
    public void open(map map, topologycontext context, spoutoutputcollector collector) {         
        this.collector = collector;     
        } 

    /**      * 重复调用方法      */     
    public void nexttuple() {         
        collector.emit(new values("hello world this is a test"));     
        } 

    /**      * 输出      */     
    public void declareoutputfields(outputfieldsdeclarer declarer) {         
        declarer.declare(new fields("test"));     
        } 
} 

pom.xml 文件内容

<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelversion>4.0.0</modelversion>

<groupid>com.test</groupid>
<artifactid>stormwordcount</artifactid>
<version>0.9.6</version>
<packaging>jar</packaging>

<name>stormwordcount</name>
<url>http://maven.apache.org</url>

<properties>
    <project.build.sourceencoding>utf-8</project.build.sourceencoding>
</properties>
<dependencies>
    <dependency>
        <groupid>junit</groupid>
        <artifactid>junit</artifactid>
        <version>3.8.1</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupid>org.apache.storm</groupid>
        <artifactid>storm-core</artifactid>
        <version>0.9.6</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <artifactid>maven-assembly-plugin</artifactid>
            <configuration>
                <descriptorrefs>
                    <descriptorref>jar-with-dependencies</descriptorref>
                </descriptorrefs>
                <archive>
                    <manifest>
                        <mainclass>com.test.stormwordcount.mainclass</mainclass>
                    </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupid>org.apache.maven.plugins</groupid>
            <artifactid>maven-compiler-plugin</artifactid>
            <configuration>
                <source>1.7</source>
                <target>1.7</target>
            </configuration>
        </plugin>
    </plugins>
</build>

遇到的问题

基于storm的wordcount需要eclipse安装了maven插件,之前的大数据实践安装的eclipse版本为eclipse ide for eclipse committers4.5.2,这个版本不自带maven插件,后续安装失败了几次(网上很多的教程都已经失效),这里分享一下我成功安装的方法:
使用链接下载,help->install new software

点击add,name输入随意,在location输入下载eclipse的maven插件,下载地址可以这样获取
点击连接: 进入网站后点击download,拉到最下面可以看到很多eclipse maven插件的版本和发布时间,选在适合eclipse的版本复制链接即可。建议取消选中contack all update sites during install to find required software(耗时太久)。

但是安装成功后还是无法配置(这里原因不太清楚,没找到解决办法),就直接上官网换成自己maven插件的javaee ide了...

后续的maven的配置这些都比较顺利,第一次创建maven-archetype-quickstat项目报错,试了网上很多办法都还没成功,然后打开 windows->preferencs->maven->installation发现之前配置了的maven的安装路径没了...重新配置了下就可以创建项目了。

最后运行成功的结果:

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网