使用CDH上的Flume

Posted by Kaka Blog on July 30, 2019

Flume简介

Flume是Cloudera提供的一个高可用、高可靠、分布式的海量日志采集、聚合和传输的系统。Flume支持在日志系统中定制各类数据发送方用于收集数据,同时Flume提供对数据的简单处理,并将数据处理结果写入各种数据接收方的能力。

2010年11月Cloudera开源了Flume的第一个可用版本0.9.2,这个系列版本被统称为Flume-OG,重构后的版本统称为Flume-NG。

工作原理

Flume (水道) 以agent为最小的独立运行单位。一个agent就是一个JVM。单agent由SourceSinkChannel三大组件构成,如下图:

img

Flume的数据流由事件 (Event) 贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些EventAgent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。以下是Flume的一些核心概念:

组件 功能
Agent 使用JVM 运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。
Client 生产数据,运行在一个独立的线程。
Source 从Client收集数据,传递给Channel。
Sink 从Channel收集数据,运行在一个独立线程。
Channel 连接 sources 和 sinks ,这个有点像一个队列。
Events 可以是日志记录、 avro 对象等。

Flume常用的Type

Source:

名称 含义
avro avro协议的数据源
exec unix命令
spooldir 监控一个文件夹
TAILDIR 既可以监控文件也可以监控文件夹
netcat 监听某个端口
kafka 监控kafka数据

Sink:

名称 含义
kafka 写到kafka中
HDFS 将数据写到HDFS中
logger 输出到控制台
avro avro协议

Channel:

名称 含义
memory 存在内存中
kafka 将数据存到kafka中
file 存在本地磁盘文件中

在cdh中添加flume服务

1、安装flume服务

img

img

img

2、启动flume服务

在CDH上启动flume服务。

测试

默认配置文件配置了以netcat(网络打印输出)作为source,以内存memery作为channel,以logger作为sink输出到日志文件中的一个简单样例配置。

默认配置如下:

# Please paste flume.conf here. Example:

# Sources, channels, and sinks are defined per
# agent name, in this case 'tier1'.

tier1.sources  = source1
tier1.channels = channel1
tier1.sinks    = sink1

# For each source, channel, and sink, set
# standard properties.

tier1.sources.source1.type     = netcat
tier1.sources.source1.bind     = 127.0.0.1
tier1.sources.source1.port     = 9999
tier1.sources.source1.channels = channel1
tier1.channels.channel1.type   = memory
tier1.sinks.sink1.type         = logger
tier1.sinks.sink1.channel      = channel1

# Other properties are specific to each type of
# source, channel, or sink. In this case, we
# specify the capacity of the memory channel.

tier1.channels.channel1.capacity = 100

Agent配置:

  • agent的名字是tier1
  • source是source1
  • channel是channel1
  • sink是sink1

Source配置:

  • source的类型是netcat(来自网络的屏幕输出)
  • 监听的网络地址是127.0.0.1本地
  • 监听端口是 9999
  • source输出给channel1

Channel配置:

  • 使用memory作为channel1
  • 最后一行是规定channel1每次的缓存能力是100

Sink配置:

  • channel1输出给sink1
  • sink1的类型是logger(日志)

1、使用telnet连接到localhost本主机

telnet localhost 9999

输入测试内容,例如:

aaaa
2222

2、查看经过flume采集到日志中的情况

日志位置:/var/log/flume-ng

tail -100 flume-cmf-flume-AGENT-398.cdh.slave1.log

img

参考