Winse Blog

走走停停都是风景, 熙熙攘攘都向最好, 忙忙碌碌都为明朝, 何畏之.

使用 Flume+kafka+elasticsearch 处理数据

flume-1.6依赖的kafka、elasticsearch的版本与我这使用程序的版本不一致,部分jar依赖需要替换,flume-elasticsearch-sink源码需要进行一些修改来适配elasticsearch-2.2。

  • flume-1.6.0
  • kafka_2.11-0.9.0.1(可以与0.8.2客户端通信, flume-kafka-channel-1.6.0不改)
  • elasticsearch-2.2.0

由于版本的差异,需要替换/添加以下jar到 flume/lib 下:

使用 mvn dependecy:copy-dependencies 导出所需依赖的包

jackson一堆,hppc-0.7.1.jar,t-digest-3.0.jar,jsr166e-1.1.0.jar,guava-18.0.jar,lucene一堆,elasticsearch-2.2.0.jar。

远程调试配置:

source由于项目上的一些特殊规则,需要自己编写。通过远程DEBUG来打断点来排查BUG。

1
2
[hadoop@ccc2 flume]$ vi conf/flume-env.sh
export JAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8092"

实战1

KafkaChannel:考虑到其他功能也需要用到这些数据。

先写一个配置把flume自带功能跑通,这里用 netcat 作为输入运行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
[hadoop@ccc2 flumebin]$ cat dta.flume 
dta.sources=s1
dta.channels=c1
dta.sinks=k1

dta.channels.c1.type=org.apache.flume.channel.kafka.KafkaChannel
dta.channels.c1.capacity=10000
dta.channels.c1.transactionCapacity=1000
dta.channels.c1.brokerList=ccc5:9093
dta.channels.c1.topic=flume_cmdid_1234
dta.channels.c1.groupId=flume_dta
dta.channels.c1.zookeeperConnect=ccc3:2181/kafka_0_9
dta.channels.c1.parseAsFlumeEvent=false

dta.sources.s1.channels=c1
dta.sources.s1.type=netcat
dta.sources.s1.bind=0.0.0.0
dta.sources.s1.port=6666
dta.sources.s1.max-line-length=88888888

dta.sinks.k1.channel=c1
dta.sinks.k1.type=elasticsearch
dta.sinks.k1.hostNames=ccc2:9300
dta.sinks.k1.indexName=foo_index
dta.sinks.k1.indexType=idcisp
dta.sinks.k1.clusterName=eee-ccc
dta.sinks.k1.batchSize=500
dta.sinks.k1.ttl=5d
dta.sinks.k1.serializer=com.esw.zhfx.collector.InfoSecurityLogIndexRequestBuilderFactory
dta.sinks.k1.serializer.idcispUrlBase64=true

[hadoop@ccc2 flumebin]$ bin/flume-ng agent --classpath flume-dta-source-2.1.jar  -n dta -c conf -f dta.flume

# 新开一个窗口
[hadoop@ccc2 ~]$ nc localhost 6666

kafka的主题、ES的索引可以不要手动建,当然为了更好的控制ES索引创建可以添加一个索引名的template。

InfoSecurityLogIndexRequestBuilderFactory 实现 ElasticSearchIndexRequestBuilderFactory 把原始记录转换成 ES 的JSON对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
  private Counter allRecordMetric = MetricManager.getInstance().counter("all_infosecurity");
  private Counter errorRecordMetric = MetricManager.getInstance().counter("error_infosecurity");
  
  public IndexRequestBuilder createIndexRequest(Client client, String indexPrefix, String indexType, Event event)
      throws IOException {
    allRecordMetric.inc();

    String record = new String(event.getBody(), outputCharset);

    context.put(ElasticSearchSinkConstants.INDEX_NAME, indexPrefix);
    indexNameBuilder.configure(context);
    IndexRequestBuilder indexRequestBuilder = client.prepareIndex(indexNameBuilder.getIndexName(event), indexType);

    try {
      Gson gson = new Gson();
      IdcIspLog log = parseRecord(record);
      BytesArray data = new BytesArray(gson.toJson(log));

      indexRequestBuilder.setSource(data);
      indexRequestBuilder.setRouting(log.commandld);
    } catch (Exception e) {
      LOG.error(e.getMessage(), e);
      errorRecordMetric.inc();

      indexRequestBuilder.setSource(record.getBytes(outputCharset));
      // 保留错误的数据
      indexRequestBuilder.setRouting("error");
    }

    return indexRequestBuilder;
  }

实战2

测试自定义的Source:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
dta.sources=s1
dta.channels=c1
dta.sinks=k1

dta.channels.c1.type=memory
dta.channels.c1.capacity=1000000
dta.channels.c1.transactionCapacity=1000000
dta.channels.c1.byteCapacity=7000000000

dta.sources.s1.channels=c1
dta.sources.s1.type=com.esw.zhfx.collector.CollectSource
dta.sources.s1.spoolDir=/home/hadoop/flume/data/
dta.sources.s1.trackerDir=/tmp/dtaspool

dta.sinks.k1.channel=c1
dta.sinks.k1.type=logger

CollectSource 实现PollableSource 继承AbstractSource类。参考Flume开发文档: http://flume.apache.org/FlumeDeveloperGuide.html#source org.apache.flume.source.SequenceGeneratorSource 类。

方法process主逻辑代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  public Status process() throws EventDeliveryException {
    Status status = Status.READY;

    try {
      List<Event> events = readEvent(batchSize);
      if (!events.isEmpty()) {
        sourceCounter.addToEventReceivedCount(events.size());
        sourceCounter.incrementAppendBatchReceivedCount();

        getChannelProcessor().processEventBatch(events);
        // 记录文件已经处理的位置
        commit();

        sourceCounter.addToEventAcceptedCount(events.size());
        sourceCounter.incrementAppendBatchAcceptedCount();
      }
    } catch (ChannelException | IOException e) {
      status = Status.BACKOFF;
      Throwables.propagate(e);
    }

    return status;
  }

实例:Flume+Kafka+ES

把两个实例整合起来,把实例1的Source替换下即可。

附-kafka基本操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
[hadoop@ccc5 kafka_2.11-0.9.0.1]$ bin/kafka-server-start.sh config/server1.properties 

[hadoop@ccc5 kafka_2.11-0.9.0.1]$ cat config/server1.properties 
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs1
num.partitions=1
zookeeper.connect=ccc3,ccc4,ccc5/kafka_0_9

[hadoop@ccc5 kafka_2.11-0.9.0.1]$ bin/kafka-topics.sh --create --zookeeper ccc3:2181/kafka_0_9 --replication 1 --partitions 1 --topic flume
Created topic "flume".

[hadoop@ccc5 kafka_2.11-0.9.0.1]$ bin/kafka-topics.sh --list --zookeeper ccc3:2181/kafka_0_9
flume

[hadoop@ccc5 kafka_2.11-0.9.0.1]$ bin/kafka-console-producer.sh --broker-list ccc5:9093 --topic flume

[hadoop@ccc5 kafka_2.11-0.9.0.1]$ bin/kafka-console-consumer.sh --zookeeper ccc3:2181/kafka_0_9 --topic flume --from-beginning

##添加kafka-manager:

启动kafka添加JMX
export JMX_PORT=19999
nohup bin/kafka-server-start.sh config/server.properties &

# https://github.com/yahoo/kafka-manager/tree/1.3.1.8
nohup bin/kafka-manager -Dhttp.port=9090 &

附-Flume操作

1
2
3
4
5
6
7
8
9
10
11
https://flume.apache.org/FlumeUserGuide.html#fan-out-flow

#conf/flume-env.sh
export FLUME_JAVA_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8092"
bin/flume-ng agent --classpath "flume-dta-libs/*" -Dflume.root.logger=DEBUG,console  -n dta -c conf -f accesslog.flume

# with ganglia
[ud@ccc-ud1 apache-flume-1.6.0-bin]$ bin/flume-ng agent --classpath "/home/ud/collector/common-lib/*"  -Dflume.root.logger=Debug,console -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=239.2.11.71:8649 -n dta -c conf -f accesslog.flume 

# windows
bin\flume-ng.cmd agent -n agent -c conf -f helloworld.flume -property "flume.root.logger=INFO,console"

–END

Comments