flume-1.6依赖的kafka、elasticsearch的版本与我这使用程序的版本不一致,部分jar依赖需要替换,flume-elasticsearch-sink源码需要进行一些修改来适配elasticsearch-2.2。
- flume-1.6.0
- kafka_2.11-0.9.0.1(可以与0.8.2客户端通信, flume-kafka-channel-1.6.0不改)
- elasticsearch-2.2.0
由于版本的差异,需要替换/添加以下jar到 flume/lib
下:
使用 mvn dependecy:copy-dependencies
导出所需依赖的包
jackson一堆,hppc-0.7.1.jar,t-digest-3.0.jar,jsr166e-1.1.0.jar,guava-18.0.jar,lucene一堆,elasticsearch-2.2.0.jar。
远程调试配置:
source由于项目上的一些特殊规则,需要自己编写。通过远程DEBUG来打断点来排查BUG。
1
2
| [hadoop@ccc2 flume]$ vi conf/flume-env.sh
export JAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8092"
|
实战1
KafkaChannel:考虑到其他功能也需要用到这些数据。
先写一个配置把flume自带功能跑通,这里用 netcat 作为输入运行:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| [hadoop@ccc2 flumebin]$ cat dta.flume
dta.sources=s1
dta.channels=c1
dta.sinks=k1
dta.channels.c1.type=org.apache.flume.channel.kafka.KafkaChannel
dta.channels.c1.capacity=10000
dta.channels.c1.transactionCapacity=1000
dta.channels.c1.brokerList=ccc5:9093
dta.channels.c1.topic=flume_cmdid_1234
dta.channels.c1.groupId=flume_dta
dta.channels.c1.zookeeperConnect=ccc3:2181/kafka_0_9
dta.channels.c1.parseAsFlumeEvent=false
dta.sources.s1.channels=c1
dta.sources.s1.type=netcat
dta.sources.s1.bind=0.0.0.0
dta.sources.s1.port=6666
dta.sources.s1.max-line-length=88888888
dta.sinks.k1.channel=c1
dta.sinks.k1.type=elasticsearch
dta.sinks.k1.hostNames=ccc2:9300
dta.sinks.k1.indexName=foo_index
dta.sinks.k1.indexType=idcisp
dta.sinks.k1.clusterName=eee-ccc
dta.sinks.k1.batchSize=500
dta.sinks.k1.ttl=5d
dta.sinks.k1.serializer=com.esw.zhfx.collector.InfoSecurityLogIndexRequestBuilderFactory
dta.sinks.k1.serializer.idcispUrlBase64=true
[hadoop@ccc2 flumebin]$ bin/flume-ng agent --classpath flume-dta-source-2.1.jar -n dta -c conf -f dta.flume
# 新开一个窗口
[hadoop@ccc2 ~]$ nc localhost 6666
|
kafka的主题、ES的索引可以不要手动建,当然为了更好的控制ES索引创建可以添加一个索引名的template。
InfoSecurityLogIndexRequestBuilderFactory 实现 ElasticSearchIndexRequestBuilderFactory 把原始记录转换成 ES 的JSON对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| private Counter allRecordMetric = MetricManager.getInstance().counter("all_infosecurity");
private Counter errorRecordMetric = MetricManager.getInstance().counter("error_infosecurity");
public IndexRequestBuilder createIndexRequest(Client client, String indexPrefix, String indexType, Event event)
throws IOException {
allRecordMetric.inc();
String record = new String(event.getBody(), outputCharset);
context.put(ElasticSearchSinkConstants.INDEX_NAME, indexPrefix);
indexNameBuilder.configure(context);
IndexRequestBuilder indexRequestBuilder = client.prepareIndex(indexNameBuilder.getIndexName(event), indexType);
try {
Gson gson = new Gson();
IdcIspLog log = parseRecord(record);
BytesArray data = new BytesArray(gson.toJson(log));
indexRequestBuilder.setSource(data);
indexRequestBuilder.setRouting(log.commandld);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
errorRecordMetric.inc();
indexRequestBuilder.setSource(record.getBytes(outputCharset));
// 保留错误的数据
indexRequestBuilder.setRouting("error");
}
return indexRequestBuilder;
}
|
实战2
测试自定义的Source:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| dta.sources=s1
dta.channels=c1
dta.sinks=k1
dta.channels.c1.type=memory
dta.channels.c1.capacity=1000000
dta.channels.c1.transactionCapacity=1000000
dta.channels.c1.byteCapacity=7000000000
dta.sources.s1.channels=c1
dta.sources.s1.type=com.esw.zhfx.collector.CollectSource
dta.sources.s1.spoolDir=/home/hadoop/flume/data/
dta.sources.s1.trackerDir=/tmp/dtaspool
dta.sinks.k1.channel=c1
dta.sinks.k1.type=logger
|
CollectSource 实现PollableSource 继承AbstractSource类。参考Flume开发文档: http://flume.apache.org/FlumeDeveloperGuide.html#source org.apache.flume.source.SequenceGeneratorSource
类。
方法process主逻辑代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| public Status process() throws EventDeliveryException {
Status status = Status.READY;
try {
List<Event> events = readEvent(batchSize);
if (!events.isEmpty()) {
sourceCounter.addToEventReceivedCount(events.size());
sourceCounter.incrementAppendBatchReceivedCount();
getChannelProcessor().processEventBatch(events);
// 记录文件已经处理的位置
commit();
sourceCounter.addToEventAcceptedCount(events.size());
sourceCounter.incrementAppendBatchAcceptedCount();
}
} catch (ChannelException | IOException e) {
status = Status.BACKOFF;
Throwables.propagate(e);
}
return status;
}
|
实例:Flume+Kafka+ES
把两个实例整合起来,把实例1的Source替换下即可。
附-kafka基本操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| [hadoop@ccc5 kafka_2.11-0.9.0.1]$ bin/kafka-server-start.sh config/server1.properties
[hadoop@ccc5 kafka_2.11-0.9.0.1]$ cat config/server1.properties
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs1
num.partitions=1
zookeeper.connect=ccc3,ccc4,ccc5/kafka_0_9
[hadoop@ccc5 kafka_2.11-0.9.0.1]$ bin/kafka-topics.sh --create --zookeeper ccc3:2181/kafka_0_9 --replication 1 --partitions 1 --topic flume
Created topic "flume".
[hadoop@ccc5 kafka_2.11-0.9.0.1]$ bin/kafka-topics.sh --list --zookeeper ccc3:2181/kafka_0_9
flume
[hadoop@ccc5 kafka_2.11-0.9.0.1]$ bin/kafka-console-producer.sh --broker-list ccc5:9093 --topic flume
[hadoop@ccc5 kafka_2.11-0.9.0.1]$ bin/kafka-console-consumer.sh --zookeeper ccc3:2181/kafka_0_9 --topic flume --from-beginning
##添加kafka-manager:
启动kafka添加JMX
export JMX_PORT=19999
nohup bin/kafka-server-start.sh config/server.properties &
# https://github.com/yahoo/kafka-manager/tree/1.3.1.8
nohup bin/kafka-manager -Dhttp.port=9090 &
|
附-Flume操作
1
2
3
4
5
6
7
8
9
10
11
| https://flume.apache.org/FlumeUserGuide.html#fan-out-flow
#conf/flume-env.sh
export FLUME_JAVA_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8092"
bin/flume-ng agent --classpath "flume-dta-libs/*" -Dflume.root.logger=DEBUG,console -n dta -c conf -f accesslog.flume
# with ganglia
[ud@ccc-ud1 apache-flume-1.6.0-bin]$ bin/flume-ng agent --classpath "/home/ud/collector/common-lib/*" -Dflume.root.logger=Debug,console -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=239.2.11.71:8649 -n dta -c conf -f accesslog.flume
# windows
bin\flume-ng.cmd agent -n agent -c conf -f helloworld.flume -property "flume.root.logger=INFO,console"
|
–END