flume典型应用场景

发布时间:2020-06-12 10:40:48 来源:51CTO 阅读:1779 作者:原生zzy 栏目:大数据

1.flume不同Source、Sink的配置文件编写 (1)Source---spool

 监听是一个目录,这个目录不能有子目录,监控的是这个目录下的文件。采集完成,这个目录下的文件会加上后缀(.COMPLETED)
配置文件

#Name the components on this agent #这里的a1指的是agent的名字,可以自定义,但注意:同一个节点下的agent的名字不能相同 #定义的是sources、sinks、channels的别名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 #指定source的类型和相关的参数 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /home/hadoop/flumedata #设定channel a1.channels.c1.type = memory #设定sink a1.sinks.k1.type = logger #Bind the source and sink to the channel #设置sources的通道 a1.sources.r1.channels = c1 #设置sink的通道 a1.sinks.k1.channel = c1 (2)Source---netcat

 一个NetCat Source用来监听一个指定端口,并将接收到的数据的每一行转换为一个事件。
数据源: netcat(监控tcp协议)
Channel:内存
数据目的地:控制台

配置文件

#指定代理 a1.sources = r1 a1.channels = c1 a1.sinks = k1 #指定sources a1.sources.r1.channels = c1 #指定source的类型 a1.sources.r1.type = netcat #指定需要监控的主机 a1.sources.r1.bind = 192.168.191.130 #指定需要监控的端口 a1.sources.r1.port = 3212 #指定channel a1.channels.c1.type = memory #sinks 写出数据 logger a1.sinks.k1.channel=c1 a1.sinks.k1.type=logger (3)Source---avro

 监听AVRO端口来接受来自外部AVRO客户端的事件流。利用Avro Source可以实现多级流动、扇出流、扇入流等效果。另外也可以接受通过flume提供的Avro客户端发送的日志信息。
数据源: avro
Channel:内存
数据目的地:控制台
配置文件

#指定代理 a1.sources = r1 a1.channels = c1 a1.sinks = k1 #指定sources a1.sources.r1 channels. = c1 #指定source的类型 a1.sources.r1.type = avro #指定需要监控的主机名 a1.sources.r1.bind = hadoop03 #指定需要监控的端口 a1.sources.r1.port = 3212 #指定channel a1.channels.c1.type = memory #指定sink a1.sinks.k1.channel = c1 a1.sinks.k1.type = logger (4)采集日志文件到hdfs

source ====exec (一个Linux命令: tail -f)
channel====memory
sink====hdfs
注意:如果集群是高可用的集群,需要将core-site.xml 和hdfs-site.xml 放入flume的conf中。
配置文件

a1.sources = r1 a1.channels = c1 a1.sinks = k1 #指定sources a1.sources.r1.channels = c1 #指定source的类型 a1.sources.r1.type = exec #指定exec的command a1.sources.r1.command = tail -F /home/hadoop/flumedata/zy.log #指定channel a1.channels.c1.type = memory #指定sink 写入hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.type = hdfs #指定hdfs上生成的文件的路径年-月-日,时_分 a1.sinks.k1.hdfs.path = /flume/%y-%m-%d/%H_%M #开启滚动 a1.sinks.k1.hdfs.round = true #设定滚动的时间(设定目录的滚动) a1.sinks.k1.hdfs.roundValue = 24 #时间的单位 a1.sinks.k1.hdfs.roundUnit = hour #设定文件的滚动 #当前文件滚动的时间间隔(单位是:秒) a1.sinks.k1.hdfs.rollInterval = 10 #设定文件滚动的大小(文件多大,滚动一次) a1.sinks.k1.hdfs.rollSize = 1024 #设定文件滚动的条数(多少条滚动一次) a1.sinks.k1.hdfs.rollCount = 10 #指定时间来源(true表示指定使用本地时间) a1.sinks.k1.hdfs.useLocalTimeStamp = true #设定存储在hdfs上的文件类型,(DataStream,文本) a1.sinks.k1.hdfs.fileType = DataStream #加文件前缀 a1.sinks.k1.hdfs.filePrefix = zzy #加文件后缀 a1.sinks.k1.hdfs.fileSuffix = .log 2.flume典型的使用场景 (1)多代理流

flume典型应用场景


 从第一台机器的flume agent传送到第二台机器的flume agent。
例:
规划
hadoop02:tail-avro.properties
   使用 exec “tail -F /home/hadoop/testlog/welog.log”获取采集数据
   使用 avro sink 数据都下一个 agent
hadoop03:avro-hdfs.properties
   使用 avro 接收采集数据
   使用 hdfs sink 数据到目的地
配置文件

#tail-avro.properties a1.sources = r1 a1.sinks = k1 a1.channels = c1 #Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/hadoop/testlog/date.log a1.sources.r1.channels = c1 #Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = hadoop02 a1.sinks.k1.port = 4141 a1.sinks.k1.batch-size = 2 #Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 #avro-hdfs.properties a1.sources = r1 a1.sinks = k1 a1.channels = c1 #Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4141 #Describe k1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path =hdfs://myha01/testlog/flume-event/%y-%m-%d/%H-%M a1.sinks.k1.hdfs.filePrefix = date_ a1.sinks.k1.hdfs.maxOpenFiles = 5000 a1.sinks.k1.hdfs.batchSize= 100 a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat =Text a1.sinks.k1.hdfs.rollSize = 102400 a1.sinks.k1.hdfs.rollCount = 1000000 a1.sinks.k1.hdfs.rollInterval = 60 a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.useLocalTimeStamp = true #Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 (2)多路复用采集

郑重声明:本文版权归原作者所有,转载文章仅为传播更多信息之目的,如作者信息标记有误,请第一时间联系我们修改或删除,多谢。