当前位置: 移动技术网 > 网络运营>推广>网站建设 > 深入剖析美团基于Flume的网站日志收集系统

深入剖析美团基于Flume的网站日志收集系统

2018年04月18日  | 移动技术网网络运营  | 我要评论

中国056型护卫舰,温县感情交友网,前打竿

美团的日志收集系统负责美团的所有业务日志的收集,并分别给hadoop平台提供离线数据和storm平台提供实时数据流。美团的日志收集系统基于flume设计和搭建而成。

《基于flume的美团日志收集系统》将分两部分给读者呈现美团日志收集系统的架构设计和实战经验。

第一部分架构和设计,将主要着眼于日志收集系统整体的架构设计,以及为什么要做这样的设计。

第二部分改进和优化,将主要着眼于实际部署和使用过程中遇到的问题,对flume做的功能修改和优化等。

1 日志收集系统简介
日志收集是大数据的基石。

许多公司的业务平台每天都会产生大量的日志数据。收集业务日志数据,供离线和在线的分析系统使用,正是日志收集系统的要做的事情。高可用性,高可靠性和可扩展性是日志收集系统所具有的基本特征。

目前常用的开源日志收集系统有flume, scribe等。flume是cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,目前已经是apache的一个子项目。scribe是facebook开源的日志收集系统,它为日志的分布式收集,统一处理提供一个可扩展的,高容错的简单方案。

2 常用的开源日志收集系统对比
下面将对常见的开源日志收集系统flume和scribe的各方面进行对比。对比中flume将主要采用apache下的flume-ng为参考对象。同时,美团将常用的日志收集系统分为三层(agent层,collector层和store层)来进行对比。
2016331104510136.png (731×347)

3 美团日志收集系统架构
美团的日志收集系统负责美团的所有业务日志的收集,并分别给hadoop平台提供离线数据和storm平台提供实时数据流。美团的日志收集系统基于flume设计和搭建而成。目前每天收集和处理约t级别的日志数据。

下图是美团的日志收集系统的整体框架图。
2016331104549313.png (683×414)

a. 整个系统分为三层:agent层,collector层和store层。其中agent层每个机器部署一个进程,负责对单机的日志收集工作;collector层部署在中心服务器上,负责接收agent层发送的日志,并且将日志根据路由规则写到相应的store层中;store层负责提供永久或者临时的日志存储服务,或者将日志流导向其它服务器。

b. agent到collector使用loadbalance策略,将所有的日志均衡地发到所有的collector上,达到负载均衡的目标,同时并处理单个collector失效的问题。

c. collector层的目标主要有三个:sinkhdfs, sinkkafka和sinkbypass。分别提供离线的数据到hdfs,和提供实时的日志流到kafka和bypass。其中sinkhdfs又根据日志量的大小分为sinkhdfs_b,sinkhdfs_m和sinkhdfs_s三个sink,以提高写入到hdfs的性能,具体见后面介绍。

d. 对于store来说,hdfs负责永久地存储所有日志;kafka存储最新的7天日志,并给storm系统提供实时日志流;bypass负责给其它服务器和应用提供实时日志流。

下图是美团的日志收集系统的模块分解图,详解agent, collector和bypass中的source, channel和sink的关系。
2016331104611810.png (747×612)

a. 模块命名规则:所有的source以src开头,所有的channel以ch开头,所有的sink以sink开头;

b. channel统一使用美团开发的dualchannel,具体原因后面详述;对于过滤掉的日志使用nullchannel,具体原因后面详述;

c. 模块之间内部通信统一使用avro接口;

4 架构设计考虑
下面将从可用性,可靠性,可扩展性和兼容性等方面,对上述的架构做细致的解析。

4.1 可用性(availablity)
对日志收集系统来说,可用性(availablity)指固定周期内系统无故障运行总时间。要想提高系统的可用性,就需要消除系统的单点,提高系统的冗余度。下面来看看美团的日志收集系统在可用性方面的考虑。

4.1.1 agent死掉
agent死掉分为两种情况:机器死机或者agent进程死掉。

对于机器死机的情况来说,由于产生日志的进程也同样会死掉,所以不会再产生新的日志,不存在不提供服务的情况。

对于agent进程死掉的情况来说,确实会降低系统的可用性。对此,美团有下面三种方式来提高系统的可用性。首先,所有的agent在supervise的方式下启动,如果进程死掉会被系统立即重启,以提供服务。其次,对所有的agent进行存活监控,发现agent死掉立即报警。最后,对于非常重要的日志,建议应用直接将日志写磁盘,agent使用spooldir的方式获得最新的日志。

4.1.2 collector死掉
由于中心服务器提供的是对等的且无差别的服务,且agent访问collector做了loadbalance和重试机制。所以当某个collector无法提供服务时,agent的重试策略会将数据发送到其它可用的collector上面。所以整个服务不受影响。

4.1.3 hdfs正常停机
美团在collector的hdfssink中提供了开关选项,可以控制collector停止写hdfs,并且将所有的events缓存到filechannel的功能。

4.1.4 hdfs异常停机或不可访问
假如hdfs异常停机或不可访问,此时collector无法写hdfs。由于美团使用dualchannel,collector可以将所收到的events缓存到filechannel,保存在磁盘上,继续提供服务。当hdfs恢复服务以后,再将filechannel中缓存的events再发送到hdfs上。这种机制类似于scribe,可以提供较好的容错性。

4.1.5 collector变慢或者agent/collector网络变慢
如果collector处理速度变慢(比如机器load过高)或者agent/collector之间的网络变慢,可能导致agent发送到collector的速度变慢。同样的,对于此种情况,美团在agent端使用dualchannel,agent可以将收到的events缓存到filechannel,保存在磁盘上,继续提供服务。当collector恢复服务以后,再将filechannel中缓存的events再发送给collector。

4.1.6 hdfs变慢
当hadoop上的任务较多且有大量的读写操作时,hdfs的读写数据往往变的很慢。由于每天,每周都有高峰使用期,所以这种情况非常普遍。

对于hdfs变慢的问题,美团同样使用dualchannel来解决。当hdfs写入较快时,所有的events只经过memchannel传递数据,减少磁盘io,获得较高性能。当hdfs写入较慢时,所有的events只经过filechannel传递数据,有一个较大的数据缓存空间。

4.2 可靠性(reliability)
对日志收集系统来说,可靠性(reliability)是指flume在数据流的传输过程中,保证events的可靠传递。

对flume来说,所有的events都被保存在agent的channel中,然后被发送到数据流中的下一个agent或者最终的存储服务中。那么一个agent的channel中的events什么时候被删除呢?当且仅当它们被保存到下一个agent的channel中或者被保存到最终的存储服务中。这就是flume提供数据流中点到点的可靠性保证的最基本的单跳消息传递语义。

那么flume是如何做到上述最基本的消息传递语义呢?

首先,agent间的事务交换。flume使用事务的办法来保证event的可靠传递。source和sink分别被封装在事务中,这些事务由保存event的存储提供或者由channel提供。这就保证了event在数据流的点对点传输中是可靠的。在多级数据流中,如下图,上一级的sink和下一级的source都被包含在事务中,保证数据可靠地从一个channel到另一个channel转移。
2016331104633594.png (484×221)

其次,数据流中 channel的持久性。flume中memorychannel是可能丢失数据的(当agent死掉时),而filechannel是持久性的,提供类似mysql的日志机制,保证数据不丢失。

4.3 可扩展性(scalability)
对日志收集系统来说,可扩展性(scalability)是指系统能够线性扩展。当日志量增大时,系统能够以简单的增加机器来达到线性扩容的目的。

对于基于flume的日志收集系统来说,需要在设计的每一层,都可以做到线性扩展地提供服务。下面将对每一层的可扩展性做相应的说明。

4.3.1 agent层
对于agent这一层来说,每个机器部署一个agent,可以水平扩展,不受限制。一个方面,agent收集日志的能力受限于机器的性能,正常情况下一个agent可以为单机提供足够服务。另一方面,如果机器比较多,可能受限于后端collector提供的服务,但agent到collector是有load balance机制,使得collector可以线性扩展提高能力。

4.3.2 collector层
对于collector这一层,agent到collector是有load balance机制,并且collector提供无差别服务,所以可以线性扩展。其性能主要受限于store层提供的能力。

4.3.3 store层
对于store这一层来说,hdfs和kafka都是分布式系统,可以做到线性扩展。bypass属于临时的应用,只对应于某一类日志,性能不是瓶颈。

4.4 channel的选择
flume1.4.0中,其官方提供常用的memorychannel和filechannel供大家选择。其优劣如下:

memorychannel: 所有的events被保存在内存中。优点是高吞吐。缺点是容量有限并且agent死掉时会丢失内存中的数据。
filechannel: 所有的events被保存在文件中。优点是容量较大且死掉时数据可恢复。缺点是速度较慢。
上述两种channel,优缺点相反,分别有自己适合的场景。然而,对于大部分应用来说,美团希望channel可以同提供高吞吐和大缓存。基于此,美团开发了dualchannel。

dualchannel:基于 memorychannel和 filechannel开发。当堆积在channel中的events数小于阈值时,所有的events被保存在memorychannel中,sink从memorychannel中读取数据; 当堆积在channel中的events数大于阈值时, 所有的events被自动存放在filechannel中,sink从filechannel中读取数据。这样当系统正常运行时,美团可以使用memorychannel的高吞吐特性;当系统有异常时,美团可以利用filechannel的大缓存的特性。
4.5 和scribe兼容
在设计之初,美团就要求每类日志都有一个category相对应,并且flume的agent提供avrosource和scribesource两种服务。这将保持和之前的scribe相对应,减少业务的更改成本。

4.6 权限控制
在目前的日志收集系统中,美团只使用最简单的权限控制。只有设定的category才可以进入到存储系统。所以目前的权限控制就是category过滤。

如果权限控制放在agent端,优势是可以较好地控制垃圾数据在系统中流转。但劣势是配置修改麻烦,每增加一个日志就需要重启或者重载agent的配置。

如果权限控制放在collector端,优势是方便进行配置的修改和加载。劣势是部分没有注册的数据可能在agent/collector之间传输。

考虑到agent/collector之间的日志传输并非系统瓶颈,且目前日志收集属内部系统,安全问题属于次要问题,所以选择采用collector端控制。

4.7 提供实时流
美团的部分业务,如实时推荐,反爬虫服务等服务,需要处理实时的数据流。因此美团希望flume能够导出一份实时流给kafka/storm系统。

一个非常重要的要求是实时数据流不应该受到其它sink的速度影响,保证实时数据流的速度。这一点,美团是通过collector中设置不同的channel进行隔离,并且dualchannel的大容量保证了日志的处理不受sink的影响。

5 系统监控
对于一个大型复杂系统来说,监控是必不可少的部分。设计合理的监控,可以对异常情况及时发现,只要有一部手机,就可以知道系统是否正常运作。对于美团的日志收集系统,美团建立了多维度的监控,防止未知的异常发生。

5.1 发送速度,拥堵情况,写hdfs速度
通过发送给zabbix的数据,美团可以绘制出发送数量、拥堵情况和写hdfs速度的图表,对于超预期的拥堵,美团会报警出来查找原因。

下面是flume collector hdfssink写数据到hdfs的速度截图:
2016331104655799.png (367×343)

下面是flume collector的filechannel中拥堵的events数据量截图:
2016331104712389.png (369×343)

5.2 flume写hfds状态的监控
flume写入hdfs会先生成tmp文件,对于特别重要的日志,美团会每15分钟左右检查一下各个collector是否都产生了tmp文件,对于没有正常产生tmp文件的collector和日志美团需要检查是否有异常。这样可以及时发现flume和日志的异常.

5.3 日志大小异常监控
对于重要的日志,美团会每个小时都监控日志大小周同比是否有较大波动,并给予提醒,这个报警有效的发现了异常的日志,且多次发现了应用方日志发送的异常,及时给予了对方反馈,帮助他们及早修复自身系统的异常。

通过上述的讲解,美团可以看到,基于flume的美团日志收集系统已经是具备高可用性,高可靠性,可扩展等特性的分布式服务。

改进和优化
下面,美团将会讲述在实际部署和使用过程中遇到的问题,对flume的功能改进和对系统做的优化。

1 flume的问题总结
在flume的使用过程中,遇到的主要问题如下:

a. channel“水土不服”:使用固定大小的memorychannel在日志高峰时常报队列大小不够的异常;使用filechannel又导致io繁忙的问题;

b. hdfssink的性能问题:使用hdfssink向hdfs写日志,在高峰时间速度较慢;

c. 系统的管理问题:配置升级,模块重启等;

2 flume的功能改进和优化点
从上面的问题中可以看到,有一些需求是原生flume无法满足的,因此,基于开源的flume美团增加了许多功能,修改了一些bug,并且进行一些调优。下面将对一些主要的方面做一些说明。

2.1 增加zabbix monitor服务
一方面,flume本身提供了http, ganglia的监控服务,而美团目前主要使用zabbix做监控。因此,美团为flume添加了zabbix监控模块,和sa的监控服务无缝融合。

另一方面,净化flume的metrics。只将美团需要的metrics发送给zabbix,避免 zabbix server造成压力。目前美团最为关心的是flume能否及时把应用端发送过来的日志写到hdfs上, 对应关注的metrics为:

source : 接收的event数和处理的event数
channel : channel中拥堵的event数
sink : 已经处理的event数


2.2 为hdfssink增加自动创建index功能
首先,美团的hdfssink写到hadoop的文件采用lzo压缩存储。 hdfssink可以读取hadoop配置文件中提供的编码类列表,然后通过配置的方式获取使用何种压缩编码,美团目前使用lzo压缩数据。采用lzo压缩而非bz2压缩,是基于以下测试数据:
2016331104735861.png (719×108)

其次,美团的hdfssink增加了创建lzo文件后自动创建index功能。hadoop提供了对lzo创建索引,使得压缩文件是可切分的,这样hadoop job可以并行处理数据文件。hdfssink本身lzo压缩,但写完lzo文件并不会建索引,美团在close文件之后添加了建索引功能。

java code复制内容到剪贴板
  1.   /**  
  2.    * rename bucketpath file from .tmp to permanent location.  
  3.    */  
  4.   private void renamebucket() throws ioexception, interruptedexception {   
  5.       if(bucketpath.equals(targetpath)) {   
  6.               return;   
  7.         }   
  8.   
  9.         final path srcpath = new path(bucketpath);   
  10.         final path dstpath = new path(targetpath);   
  11.   
  12.         callwithtimeout(new callrunner<object>() {   
  13.               @override  
  14.               public object call() throws exception {   
  15.                 if(filesystem.exists(srcpath)) { // could block   
  16.                       log.info("renaming " + srcpath + " to " + dstpath);   
  17.                      filesystem.rename(srcpath, dstpath); // could block   
  18.   
  19.                       //index the dstpath lzo file   
  20.                       if (codec != null && ".lzo".equals(codec.getdefaultextension()) ) {   
  21.                               lzoindexer lzoindexer = new lzoindexer(new configuration());   
  22.                               lzoindexer.index(dstpath);   
  23.                       }   
  24.                 }   
  25.                 return null;   
  26.               }   
  27.     });   
  28. }  


2.3 增加hdfssink的开关
美团在hdfssink和dualchannel中增加开关,当开关打开的情况下,hdfssink不再往hdfs上写数据,并且数据只写向dualchannel中的filechannel。以此策略来防止hdfs的正常停机维护。

2.4 增加dualchannel
flume本身提供了memorychannel和filechannel。memorychannel处理速度快,但缓存大小有限,且没有持久化;filechannel则刚好相反。美团希望利用两者的优势,在sink处理速度够快,channel没有缓存过多日志的时候,就使用memorychannel,当sink处理速度跟不上,又需要channel能够缓存下应用端发送过来的日志时,就使用filechannel,由此美团开发了dualchannel,能够智能的在两个channel之间切换。

其具体的逻辑如下:

java code复制内容到剪贴板
  1. /***  
  2.  * puttomemchannel indicate put event to memchannel or filechannel  
  3.  * takefrommemchannel indicate take event from memchannel or filechannel  
  4.  * */  
  5. private atomicboolean puttomemchannel = new atomicboolean(true);   
  6. private atomicboolean takefrommemchannel = new atomicboolean(true);   
  7.   
  8. void doput(event event) {   
  9.         if (switchon && puttomemchannel.get()) {   
  10.               //往memchannel中写数据   
  11.               memtransaction.put(event);   
  12.   
  13.               if ( memchannel.isfull() || filechannel.getqueuesize() > 100) {   
  14.                 puttomemchannel.set(false);   
  15.               }   
  16.         } else {   
  17.               //往filechannel中写数据   
  18.               filetransaction.put(event);   
  19.         }   
  20.   }   
  21.   
  22. event dotake() {   
  23.     event event = null;   
  24.     if ( takefrommemchannel.get() ) {   
  25.         //从memchannel中取数据   
  26.         event = memtransaction.take();   
  27.         if (event == null) {   
  28.             takefrommemchannel.set(false);   
  29.         }    
  30.     } else {   
  31.         //从filechannel中取数据   
  32.         event = filetransaction.take();   
  33.         if (event == null) {   
  34.             takefrommemchannel.set(true);   
  35.   
  36.             puttomemchannel.set(true);   
  37.         }    
  38.     }   
  39.     return event;   
  40. }  


2.5 增加nullchannel
flume提供了nullsink,可以把不需要的日志通过nullsink直接丢弃,不进行存储。然而,source需要先将events存放到channel中,nullsink再将events取出扔掉。为了提升性能,美团把这一步移到了channel里面做,所以开发了nullchannel。

2.6 增加kafkasink
为支持向storm提供实时数据流,美团增加了kafkasink用来向kafka写实时数据流。其基本的逻辑如下:

java code复制内容到剪贴板
  1. public class kafkasink extends abstractsink implements configurable {   
  2.         private string zkconnect;   
  3.         private integer zktimeout;   
  4.         private integer batchsize;   
  5.         private integer queuesize;   
  6.         private string serializerclass;   
  7.         private string producertype;   
  8.         private string topicprefix;   
  9.   
  10.         private producer<string, string> producer;   
  11.   
  12.         public void configure(context context) {   
  13.             //读取配置,并检查配置   
  14.         }   
  15.   
  16.         @override  
  17.         public synchronized void start() {   
  18.             //初始化producer   
  19.         }   
  20.   
  21.         @override  
  22.         public synchronized void stop() {   
  23.             //关闭producer   
  24.         }   
  25.   
  26.         @override  
  27.         public status process() throws eventdeliveryexception {   
  28.   
  29.             status status = status.ready;   
  30.   
  31.             channel channel = getchannel();   
  32.             transaction tx = channel.gettransaction();   
  33.             try {   
  34.                     tx.begin();   
  35.   
  36.                     //将日志按category分队列存放   
  37.                     map<string, list<string>> topic2eventlist = new hashmap<string, list<string>>();   
  38.   
  39.                     //从channel中取batchsize大小的日志,从header中获取category,生成topic,并存放于上述的map中;   
  40.   
  41.                     //将map中的数据通过producer发送给kafka    
  42.   
  43.                    tx.commit();   
  44.             } catch (exception e) {   
  45.                     tx.rollback();   
  46.                     throw new eventdeliveryexception(e);   
  47.             } finally {   
  48.                 tx.close();   
  49.             }   
  50.             return status;   
  51.         }   
  52. }  


2.7 修复和scribe的兼容问题
scribed在通过scribesource发送数据包给flume时,大于4096字节的包,会先发送一个dummy包检查服务器的反应,而flume的scribesource对于logentry.size()=0的包返回try_later,此时scribed就认为出错,断开连接。这样循环反复尝试,无法真正发送数据。现在在scribesource的thrift接口中,对size为0的情况返回ok,保证后续正常发送数据。

3. flume系统调优经验总结
3.1 基础参数调优经验
hdfssink中默认的serializer会每写一行在行尾添加一个换行符,美团日志本身带有换行符,这样会导致每条日志后面多一个空行,修改配置不要自动添加换行符;
lc.sinks.sink_hdfs.serializer.appendnewline = false
调大memorychannel的capacity,尽量利用memorychannel快速的处理能力;
调大hdfssink的batchsize,增加吞吐量,减少hdfs的flush次数;
适当调大hdfssink的calltimeout,避免不必要的超时错误;


3.2 hdfssink获取filename的优化
hdfssink的path参数指明了日志被写到hdfs的位置,该参数中可以引用格式化的参数,将日志写到一个动态的目录中。这方便了日志的管理。例如美团可以将日志写到category分类的目录,并且按天和按小时存放:

lc.sinks.sink_hdfs.hdfs.path = /user/hive/work/orglog.db/%{category}/dt=%y%m%d/hour=%h
hdfss ink中处理每条event时,都要根据配置获取此event应该写入的hdfs path和filename,默认的获取方法是通过正则表达式替换配置中的变量,获取真实的path和filename。因为此过程是每条event都要做的操作,耗时很长。通过美团的测试,20万条日志,这个操作要耗时6-8s左右。

由于美团目前的path和filename有固定的模式,可以通过字符串拼接获得。而后者比正则匹配快几十倍。拼接定符串的方式,20万条日志的操作只需要几百毫秒。

3.3 hdfssink的b/m/s优化
在美团初始的设计中,所有的日志都通过一个channel和一个hdfssink写到hdfs上。美团来看一看这样做有什么问题。

首先,美团来看一下hdfssink在发送数据的逻辑:

java code复制内容到剪贴板
  1. //从channel中取batchsize大小的events   
  2. for (txneventcount = 0; txneventcount < batchsize; txneventcount++) {   
  3.     //对每条日志根据category append到相应的bucketwriter上;   
  4.     bucketwriter.append(event);   
  5. }   
  6.   
  7. for (bucketwriter bucketwriter : writers) {   
  8.     //然后对每一个bucketwriter调用相应的flush方法将数据flush到hdfs上   
  9.     bucketwriter.flush();   
  10. }  

假设美团的系统中有100个category,batchsize大小设置为20万。则每20万条数据,就需要对100个文件进行append或者flush操作。

其次,对于美团的日志来说,基本符合80/20原则。即20%的category产生了系统80%的日志量。这样对大部分日志来说,每20万条可能只包含几条日志,也需要往hdfs上flush一次。

上述的情况会导致hdfssink写hdfs的效率极差。下图是单channel的情况下每小时的发送量和写hdfs的时间趋势图。
2016331104856487.png (672×371)

鉴于这种实际应用场景,美团把日志进行了大小归类,分为big, middle和small三类,这样可以有效的避免小日志跟着大日志一起频繁的flush,提升效果明显。下图是分队列后big队列的每小时的发送量和写hdfs的时间趋势图。
2016331104934374.png (672×365)

4 未来发展
目前,flume日志收集系统提供了一个高可用,高可靠,可扩展的分布式服务,已经有效地支持了美团的日志数据收集工作。

后续,美团将在如下方面继续研究:

日志管理系统:图形化的展示和控制日志收集系统;
跟进社区发展:跟进flume 1.5的进展,同时回馈社区;

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网