当前位置: 移动技术网 > IT编程>数据库>其他数据库 > Mysql 流增量写入 Hdfs(二) --Storm + hdfs 的流式处理

Mysql 流增量写入 Hdfs(二) --Storm + hdfs 的流式处理

2018年12月13日  | 移动技术网IT编程  | 我要评论

一. 概述

上一篇我们介绍了如何将数据从 mysql 抛到 kafka,这次我们就专注于利用 storm 将数据写入到 hdfs 的过程,由于 storm 写入 hdfs 的可定制东西有些多,我们先不从 kafka 读取,而先自己定义一个 spout 数据充当数据源,下章再进行整合。这里默认你是拥有一定的 storm 知识的基础,起码知道 spout 和 bolt 是什么。

写入 hdfs 可以有以下的定制策略:

  1. 自定义写入文件的名字
  2. 定义写入内容格式
  3. 满足给定条件后更改写入的文件
  4. 更改写入文件时触发的 action

本篇会先说明如何用 storm 写入 hdfs,写入过程一些 api 的描述,以及最后给定一个例子:

storm 每接收到 10 个 tuple 后就会改变 hdfs 写入文件,新文件的名字就是第几次改变。

ps:storm 版本:1.1.1 。hadoop 版本:2.7.4 。

接下来我们首先看看 storm 如何写入 hdfs 。

二. storm 写入 hdfs

storm 官方有提供了相应的 api 让我们可以使用。可以通过创建 hdfsbolt 以及定义相应的规则,即可写入 hdfs 。

首先通过 maven 配置依赖以及插件。

    <properties>
        <storm.version>1.1.1</storm.version>
    </properties>

    <dependencies>

        <dependency>
            <groupid>org.apache.storm</groupid>
            <artifactid>storm-core</artifactid>
            <version>${storm.version}</version>
            <!--<scope>provided</scope>-->
            <exclusions>
                <exclusion>
                    <groupid>org.slf4j</groupid>
                    <artifactid>log4j-over-slf4j</artifactid>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupid>commons-collections</groupid>
            <artifactid>commons-collections</artifactid>
            <version>3.2.1</version>
        </dependency>
        <dependency>
            <groupid>com.google.guava</groupid>
            <artifactid>guava</artifactid>
            <version>15.0</version>
        </dependency>

        <!--hadoop模块-->
        <dependency>
            <groupid>org.apache.hadoop</groupid>
            <artifactid>hadoop-client</artifactid>
            <version>2.7.4</version>
            <exclusions>
                <exclusion>
                    <groupid>org.slf4j</groupid>
                    <artifactid>slf4j-log4j12</artifactid>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupid>org.apache.hadoop</groupid>
            <artifactid>hadoop-hdfs</artifactid>
            <version>2.7.4</version>
            <exclusions>
                <exclusion>
                    <groupid>org.slf4j</groupid>
                    <artifactid>slf4j-log4j12</artifactid>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-hdfs -->
        <dependency>
            <groupid>org.apache.storm</groupid>
            <artifactid>storm-hdfs</artifactid>
            <version>1.1.1</version>
            <!--<scope>test</scope>-->
        </dependency>

    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupid>org.apache.maven.plugins</groupid>
                <artifactid>maven-compiler-plugin</artifactid>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupid>org.codehaus.mojo</groupid>
                <artifactid>exec-maven-plugin</artifactid>
                <version>1.2.1</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>exec</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <executable>java</executable>
                    <includeprojectdependencies>true</includeprojectdependencies>
                    <includeplugindependencies>false</includeplugindependencies>
                    <classpathscope>compile</classpathscope>
                    <mainclass>com.learningstorm.kafka.kafkatopology</mainclass>
                </configuration>
            </plugin>
   
            <plugin>
                <groupid>org.apache.maven.plugins</groupid>
                <artifactid>maven-shade-plugin</artifactid>
                <version>1.7</version>
                <configuration>
                    <createdependencyreducedpom>true</createdependencyreducedpom>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.servicesresourcetransformer"/>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.manifestresourcetransformer">
                                    <mainclass></mainclass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

这里要提一下,如果要打包部署到集群上的话,打包的插件需要使用 maven-shade-plugin 这个插件,然后使用 maven lifecycle 中的 package 打包。而不是用 maven-assembly-plugin 插件进行打包。

因为使用 maven-assembly-plugin 的时候,会将所有依赖的包unpack,然后在pack,这样就会出现,同样的文件被覆盖的情况。发布到集群上的时候就会报 no filesystem for scheme: hdfs 的错 。

然后是使用 hdfsbolt 写入 hdfs。这里来看看官方文档中的例子吧。

// 使用 "|" 来替代 ",",来进行字符分割
recordformat format = new delimitedrecordformat()
        .withfielddelimiter("|");

// 每输入 1k 后将内容同步到 hdfs 中
syncpolicy syncpolicy = new countsyncpolicy(1000);

// 当文件大小达到 5mb ,转换写入文件,即写入到一个新的文件中
filerotationpolicy rotationpolicy = new filesizerotationpolicy(5.0f, units.mb);

//当转换写入文件时,生成新文件的名字并使用
filenameformat filenameformat = new defaultfilenameformat()
        .withpath("/foo/");

hdfsbolt bolt = new hdfsbolt()
        .withfsurl("hdfs://localhost:9000")
        .withfilenameformat(filenameformat)
        .withrecordformat(format)
        .withrotationpolicy(rotationpolicy)
        .withsyncpolicy(syncpolicy);

//生成该 bolt
topologybuilder.setbolt("hdfsbolt", bolt, 5).globalgrouping("randomstrspout");
        

到这里就结束了。可以将 hdfsbolt 当作一个 storm 中特殊一些的 bolt 即可。这个 bolt 的作用即使根据接收信息写入 hdfs。

而在新建 hdfsbolt 中,storm 为我们提供了相当强的灵活性,我们可以定义一些策略,比如当达成某个条件的时候转换写入文件,新写入文件的名字,写入时候的分隔符等等。

如果选择使用的话,storm 有提供部分接口供我们使用,但如果我们觉得不够丰富也可以自定义相应的类。下面我们看看如何控制这些策略吧。

recordformat

这是一个接口,允许你自由定义接收到内容的格式。

public interface recordformat extends serializable {
    byte[] format(tuple tuple);
}

storm 提供了 delimitedrecordformat ,使用方法在上面已经有了。这个类默认的分割符是逗号",",而你可以通过 withfielddelimiter 方法改变分隔符。
如果你的初始分隔符不是逗号的话,那么也可以重写写一个类实现 recordformat 接口即可。

filenameformat

同样是一个接口。

public interface filenameformat extends serializable {
    void prepare(map conf, topologycontext topologycontext);
    string getname(long rotation, long timestamp);
    string getpath();
}

storm 所提供的默认的是 org.apache.storm.hdfs.format.defaultfilenameformat 。默认人使用的转换文件名有点长,格式是这样的:

{prefix}{componentid}-{taskid}-{rotationnum}-{timestamp}{extension}

例如:

mybolt-5-7-1390579837830.txt

默认情况下,前缀是空的,扩展标识是".txt"。

syncpolicy

同步策略允许你将 buffered data 缓冲到 hdfs 文件中(从而client可以读取数据),通过实现org.apache.storm.hdfs.sync.syncpolicy 接口:

public interface syncpolicy extends serializable {
    boolean mark(tuple tuple, long offset);
    void reset();
}

filerotationpolicy

这个接口允许你控制什么情况下转换写入文件。

public interface filerotationpolicy extends serializable {
    boolean mark(tuple tuple, long offset);
    void reset();
}

storm 有提供三个实现该接口的类:

  • 最简单的就是不进行转换的org.apache.storm.hdfs.bolt.rotation.norotationpolicy ,就是什么也不干。

  • 通过文件大小触发转换的 org.apache.storm.hdfs.bolt.rotation.filesizerotationpolicy。

  • 通过时间条件来触发转换的 org.apache.storm.hdfs.bolt.rotation.timedrotationpolicy。

如果有更加复杂的需求也可以自己定义。

rotationaction

这个主要是提供一个或多个 hook ,可加可不加。主要是在触发写入文件转换的时候会启动。

public interface rotationaction extends serializable {
    void execute(filesystem filesystem, path filepath) throws ioexception;
}

三.实现一个例子

了解了上面的情况后,我们会实现一个例子,根据写入记录的多少来控制写入转换(改变写入的文件),并且转换后文件的名字表示当前是第几次转换。

首先来看看 hdfsbolt 的内容:

        recordformat format = new delimitedrecordformat().withfielddelimiter(" ");
        // sync the filesystem after every 1k tuples
        syncpolicy syncpolicy = new countsyncpolicy(1000);
//        filerotationpolicy rotationpolicy = new filesizerotationpolicy(1.0f, filesizerotationpolicy.units.kb);
        /** rotate file with date,every month create a new file
         * format:yyyymm.txt
         */
        filerotationpolicy rotationpolicy = new countstrrotationpolicy();
        filenameformat filenameformat = new timesfilenameformat().withpath("/test/");
        rotationaction action = new newfileaction();
        hdfsbolt bolt = new hdfsbolt()
                .withfsurl("hdfs://127.0.0.1:9000")
                .withfilenameformat(filenameformat)
                .withrecordformat(format)
                .withrotationpolicy(rotationpolicy)
                .withsyncpolicy(syncpolicy)
                .addrotationaction(action);

然后分别来看各个策略的类。

filerotationpolicy

import org.apache.storm.hdfs.bolt.rotation.filerotationpolicy;
import org.apache.storm.tuple.tuple;

import java.text.simpledateformat;
import java.util.date;

/**
 * 计数以改变hdfs写入文件的位置,当写入10次的时候,则更改写入文件,更改名字取决于 “timesfilenameformat”
 * 这个类是线程安全
 */

public class countstrrotationpolicy implements filerotationpolicy {


    private simpledateformat df = new simpledateformat("yyyymm");

    private string date =  null;

    private int count = 0;

    public countstrrotationpolicy(){
        this.date =  df.format(new date());
//        this.date = df.format(new date());
    }


    /**
     * called for every tuple the hdfsbolt executes.
     *
     * @param tuple  the tuple executed.
     * @param offset current offset of file being written
     * @return true if a file rotation should be performed
     */
    @override
    public boolean mark(tuple tuple, long offset) {
        count ++;
        if(count == 10) {
            system.out.print("num :" +count + "   ");
            count = 0;
            return true;

        }
        else {
            return false;
        }
    }

    /**
     * called after the hdfsbolt rotates a file.
     */
    @override
    public void reset() {

    }

    @override
    public filerotationpolicy copy() {
        return new countstrrotationpolicy();
    }


}

filenameformat

import org.apache.storm.hdfs.bolt.format.filenameformat;
import org.apache.storm.task.topologycontext;

import java.util.map;

/**
 * 决定重新写入文件时候的名字
 * 这里会返回是第几次转换写入文件,将这个第几次做为文件名
 */
public class timesfilenameformat implements filenameformat {
    //默认路径
    private string path = "/storm";
    //默认后缀
    private string extension = ".txt";
    private long times = new long(0);

    public timesfilenameformat withpath(string path){
        this.path = path;
        return this;
    }

    @override
    public void prepare(map conf, topologycontext topologycontext) {
    }


    @override
    public string getname(long rotation, long timestamp) {
        times ++ ;
        //返回文件名,文件名为更换写入文件次数
        return times.tostring() + this.extension;
    }

    public string getpath(){
        return this.path;
    }
}

rotationaction

import org.apache.hadoop.fs.filecontext;
import org.apache.hadoop.fs.filesystem;
import org.apache.hadoop.fs.path;
import org.apache.storm.hdfs.common.rotation.rotationaction;
import org.slf4j.logger;
import org.slf4j.loggerfactory;

import java.io.ioexception;
import java.net.uri;
/**
    当转换写入文件时候调用的 hook ,这里仅写入日志。
 */
public class newfileaction implements rotationaction {
    private static final logger log = loggerfactory.getlogger(newfileaction.class);



    @override
    public void execute(filesystem filesystem, path filepath) throws ioexception {
        log.info("hdfs change the written file!!");

        return;
    }
}

ok,这样就大功告成了。通过上面的代码,每接收到 10 个 tuple 后就会转换写入文件,新文件的名字就是第几次转换。

完整代码包括一个随机生成字符串的 spout ,可以到我的 github 上查看。

stormhdfsdemo:https://github.com/shezhiming/stormhdfsdemo


更多干货,欢迎关注公众号,哈尔的数据城堡

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网