此处简介
flume记录 from kafka
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 a1.sources = source1 a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.source1.channels = c1 a1.sources.source1.batchSize = 5000 a1.sources.source1.batchDurationMillis = 2000 a1.sources.source1.zookeeperConnect = localhost:2181 #a1.sources.source1.kafka.brokerList = localhost:9092 a1.sources.source1.kafka.bootstrap.servers = localhost:9092 a1.sources.source1.topic = flumetest a1.sources.source1.kafka.consumer.group.id = custom.g.id a1.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 800000 a1.sinks = k1 a1.sinks.k1.type = file_roll a1.sinks.k1.channel = c1 a1.sinks.k1.sink.directory = /home/hadoop/testfile/flume
这里也有版本匹配的问题.经过多番尝试,这里的组合版本是flume1.6+kafka_2.11-2.2.0.tgz 其它版本可能会有request header 问题. 另外还遇到了指定topic 和 zookeeper的问题.
执行语句:flume-ng agent -n a1 -c conf -f kafka.properties -Dflume.root.logger=INFO,console
flume 采集到kafka 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 agent.sources=r1 agent.sinks=k1 agent.channels=c1 agent.sources.r1.type=exec agent.sources.r1.command=tail /root/tomcat/logs/catalina.out agent.sources.r1.restart=true agent.sources.r1.batchSize=1000 agent.sources.r1.batchTimeout=3000 agent.sources.r1.channels=c1 agent.channels.c1.type=memory agent.channels.c1.capacity=102400 agent.channels.c1.transactionCapacity=1000 agent.channels.c1.byteCapacity=134217728 agent.channels.c1.byteCapacityBufferPercentage=80 agent.sinks.k1.channel=c1 agent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink agent.sinks.k1.kafka.topic=sparkstreaming agent.sinks.k1.kafka.zookeeperConnect=47.102.199.215:2181 #agent.sinks.k1.kafka.bootstrap.servers=47.102.199.215:9092 agent.sinks.k1.kafka.brokerList =47.102.199.215:9092 agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder agent.sinks.k1.flumeBatchSize=1000 agent.sinks.k1.useFlumeEventFormat=true