Mysql 流增量写入 Hdfs(二) --Storm + hdfs 的流式处理

一. 概述

上一篇我们介绍了如何将数据从 mysql 抛到 kafka,这次我们就专注于利用 storm 将数据写入到 hdfs 的过程,由于 storm 写入 hdfs 的可定制东西有些多,我们先不从 kafka 读取,而先自己定义一个 spout 数据充当数据源,下章再进行整合。这里默认你是拥有一定的 storm 知识的基础,起码知道 spout 和 bolt 是什么。

写入 hdfs 可以有以下的定制策略:

  1. 自定义写入文件的名字
  2. 定义写入内容格式
  3. 满足给定条件后更改写入的文件
  4. 更改写入文件时触发的 action

本篇会先说明如何用 storm 写入 hdfs,写入过程一些 api 的描述,以及最后给定一个例子:

storm 每接收到 10 个 tuple 后就会改变 hdfs 写入文件,新文件的名字就是第几次改变。

ps:storm 版本:1.1.1 。hadoop 版本:2.7.4 。

接下来我们首先看看 storm 如何写入 hdfs 。

二. storm 写入 hdfs

storm 官方有提供了相应的 api 让我们可以使用。可以通过创建 hdfsbolt 以及定义相应的规则,即可写入 hdfs 。

首先通过 maven 配置依赖以及插件。





        <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-hdfs -->



这里要提一下,如果要打包部署到集群上的话,打包的插件需要使用 maven-shade-plugin 这个插件,然后使用 maven lifecycle 中的 package 打包。而不是用 maven-assembly-plugin 插件进行打包。

因为使用 maven-assembly-plugin 的时候,会将所有依赖的包unpack,然后在pack,这样就会出现,同样的文件被覆盖的情况。发布到集群上的时候就会报 no filesystem for scheme: hdfs 的错 。

然后是使用 hdfsbolt 写入 hdfs。这里来看看官方文档中的例子吧。

// 使用 "|" 来替代 ",",来进行字符分割
recordformat format = new delimitedrecordformat()

// 每输入 1k 后将内容同步到 hdfs 中
syncpolicy syncpolicy = new countsyncpolicy(1000);

// 当文件大小达到 5mb ,转换写入文件,即写入到一个新的文件中
filerotationpolicy rotationpolicy = new filesizerotationpolicy(5.0f, units.mb);

filenameformat filenameformat = new defaultfilenameformat()

hdfsbolt bolt = new hdfsbolt()

//生成该 bolt
topologybuilder.setbolt("hdfsbolt", bolt, 5).globalgrouping("randomstrspout");

到这里就结束了。可以将 hdfsbolt 当作一个 storm 中特殊一些的 bolt 即可。这个 bolt 的作用即使根据接收信息写入 hdfs。

而在新建 hdfsbolt 中,storm 为我们提供了相当强的灵活性,我们可以定义一些策略,比如当达成某个条件的时候转换写入文件,新写入文件的名字,写入时候的分隔符等等。

如果选择使用的话,storm 有提供部分接口供我们使用,但如果我们觉得不够丰富也可以自定义相应的类。下面我们看看如何控制这些策略吧。



public interface recordformat extends serializable {
    byte[] format(tuple tuple);

storm 提供了 delimitedrecordformat ,使用方法在上面已经有了。这个类默认的分割符是逗号",",而你可以通过 withfielddelimiter 方法改变分隔符。
如果你的初始分隔符不是逗号的话,那么也可以重写写一个类实现 recordformat 接口即可。



public interface filenameformat extends serializable {
    void prepare(map conf, topologycontext topologycontext);
    string getname(long rotation, long timestamp);
    string getpath();

storm 所提供的默认的是 org.apache.storm.hdfs.format.defaultfilenameformat 。默认人使用的转换文件名有点长,格式是这样的:






同步策略允许你将 buffered data 缓冲到 hdfs 文件中(从而client可以读取数据),通过实现org.apache.storm.hdfs.sync.syncpolicy 接口:

public interface syncpolicy extends serializable {
    boolean mark(tuple tuple, long offset);
    void reset();



public interface filerotationpolicy extends serializable {
    boolean mark(tuple tuple, long offset);
    void reset();

storm 有提供三个实现该接口的类:

  • 最简单的就是不进行转换的org.apache.storm.hdfs.bolt.rotation.norotationpolicy ,就是什么也不干。

  • 通过文件大小触发转换的 org.apache.storm.hdfs.bolt.rotation.filesizerotationpolicy。

  • 通过时间条件来触发转换的 org.apache.storm.hdfs.bolt.rotation.timedrotationpolicy。



这个主要是提供一个或多个 hook ,可加可不加。主要是在触发写入文件转换的时候会启动。

public interface rotationaction extends serializable {
    void execute(filesystem filesystem, path filepath) throws ioexception;



首先来看看 hdfsbolt 的内容:

        recordformat format = new delimitedrecordformat().withfielddelimiter(" ");
        // sync the filesystem after every 1k tuples
        syncpolicy syncpolicy = new countsyncpolicy(1000);
//        filerotationpolicy rotationpolicy = new filesizerotationpolicy(1.0f, filesizerotationpolicy.units.kb);
        /** rotate file with date,every month create a new file
         * format:yyyymm.txt
        filerotationpolicy rotationpolicy = new countstrrotationpolicy();
        filenameformat filenameformat = new timesfilenameformat().withpath("/test/");
        rotationaction action = new newfileaction();
        hdfsbolt bolt = new hdfsbolt()



import org.apache.storm.hdfs.bolt.rotation.filerotationpolicy;
import org.apache.storm.tuple.tuple;

import java.text.simpledateformat;
import java.util.date;

 * 计数以改变hdfs写入文件的位置,当写入10次的时候,则更改写入文件,更改名字取决于 “timesfilenameformat”
 * 这个类是线程安全

public class countstrrotationpolicy implements filerotationpolicy {

    private simpledateformat df = new simpledateformat("yyyymm");

    private string date =  null;

    private int count = 0;

    public countstrrotationpolicy(){
        this.date =  df.format(new date());
//        this.date = df.format(new date());

     * called for every tuple the hdfsbolt executes.
     * @param tuple  the tuple executed.
     * @param offset current offset of file being written
     * @return true if a file rotation should be performed
    public boolean mark(tuple tuple, long offset) {
        count ++;
        if(count == 10) {
            system.out.print("num :" +count + "   ");
            count = 0;
            return true;

        else {
            return false;

     * called after the hdfsbolt rotates a file.
    public void reset() {


    public filerotationpolicy copy() {
        return new countstrrotationpolicy();



import org.apache.storm.hdfs.bolt.format.filenameformat;
import org.apache.storm.task.topologycontext;

import java.util.map;

 * 决定重新写入文件时候的名字
 * 这里会返回是第几次转换写入文件,将这个第几次做为文件名
public class timesfilenameformat implements filenameformat {
    private string path = "/storm";
    private string extension = ".txt";
    private long times = new long(0);

    public timesfilenameformat withpath(string path){
        this.path = path;
        return this;

    public void prepare(map conf, topologycontext topologycontext) {

    public string getname(long rotation, long timestamp) {
        times ++ ;
        return times.tostring() + this.extension;

    public string getpath(){
        return this.path;


import org.apache.hadoop.fs.filecontext;
import org.apache.hadoop.fs.filesystem;
import org.apache.hadoop.fs.path;
import org.apache.storm.hdfs.common.rotation.rotationaction;
import org.slf4j.logger;
import org.slf4j.loggerfactory;

import java.io.ioexception;
import java.net.uri;
    当转换写入文件时候调用的 hook ,这里仅写入日志。
public class newfileaction implements rotationaction {
    private static final logger log = loggerfactory.getlogger(newfileaction.class);

    public void execute(filesystem filesystem, path filepath) throws ioexception {
        log.info("hdfs change the written file!!");


ok,这样就大功告成了。通过上面的代码,每接收到 10 个 tuple 后就会转换写入文件,新文件的名字就是第几次转换。

完整代码包括一个随机生成字符串的 spout ,可以到我的 github 上查看。



