当前位置: 代码迷 >> 综合 >> hadoop3.2.0,kafka_2.12-2.4.0 flume采集方案demo
  详细解决方案

hadoop3.2.0,kafka_2.12-2.4.0 flume采集方案demo

热度:32   发布时间:2023-11-22 15:59:24.0

效果

根据消息中的事件时间来分目录,按照年月日分!
在这里插入图片描述

前言

公司有个数据采集需求,因为历史原因,我们公司使用的hadoop和kafka版本比较高,所以我采用了1.9.0的flume进行开发,flume内存调优这里就不讲了,有兴趣的自己去搜下。第一次使用肯定需要调优。

环境准备

java 1.8 flume 1.9.0 kafka_2.12-2.4.0 hadoop3.2.0

flume部署在hadoop所在集群上!

flume的lib中需要将hadoop的一些依赖放进去,在hadoop3.2.0中慢慢去找

hadoop-3.2.0/share/hadoop/common/lib/hadoop-auth-3.2.0.jar
hadoop-3.2.0/share/hadoop/common/hadoop-common-3.2.0.jar
hadoop-3.2.0/share/hadoop/common/lib/htrace-core4-4.1.0-incubating.jar
hadoop-3.2.0/share/hadoop/common/lib/commons-configuration2-2.1.1.jar

笔者放了4个包就行了。看了别的人还放了commons-io-2.5.jar(hadoop-3.2.0/share/hadoop/hdfs/lib中)

方案

将kafka中的数据解析,将事件时间解析出来,然后将时间转换成13位的时间戳!别问我为什么是13位,将时间戳解析以后put到header中,用于hdfssink分目录用!这个sink会解析event头部,然后根据你写的时间转义来分path路径。前提是你path路径是用时间转义写的!

实施

因为是测试数据所以直接贴源码!

POM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.imprexion</groupId><artifactId>flume-collectifdata</artifactId><version>1.0-SNAPSHOT</version><dependencies><!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core --><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version></dependency><!--阿里巴巴开源json解析框架--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version><scope>test</scope></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>

flume配置文件

# 命名每个组件 a1代表agent的名称 
#a1.sources代表a1中配置的source,多个使用空格间隔
#a1.sinks代表a1中配置的sink,多个使用空格间隔
#a1.channels代表a1中配置的channel,多个使用空格间隔
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = yxyxyx001:9092
a1.sources.r1.kafka.topics = kettle04
#控制kafka消费者从主题的最早的位置消费,此参数只会在一个从未提交过offset的组中生效
#a1.sources.r1.kafka.consumer.auto.offset.reset=earliest
a1.sources.r1.kafka.consumer.auto.offset.reset=latest
#a1.sources.r1.kafka.consumer.group.id = flume#拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.interceptors.TestInterceptor$Builder#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 1000000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000000# 配置sink
a1.sinks.k1.type = hdfs
#a1.sinks.k1.hdfs.path=hdfs://yxhdcluster/origin_data/ifcustomer/behavior-log/%Y-%m-%d
a1.sinks.k1.hdfs.path=/origin_data/ifcustomer/behavior-log/%Y-%m-%d
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = iflog-
a1.sinks.k1.hdfs.batchSize=1000
#配置文件滚动
a1.sinks.k1.hdfs.rollInterval = 1
a1.sinks.k1.hdfs.rollSize = 134217700
a1.sinks.k1.hdfs.rollCount = 0
#a1.sinks.k1.hdfs.useLocalTimeStamp = true
#使用文件格式存储数据
a1.sinks.k1.hdfs.fileType = DataStream 
a1.sinks.hdfs-sink.hdfs.callTimeout = 300000#使用压缩格式存储数据
#a1.sinks.k1.hdfs.fileType=CompressedStream 
#指定文件使用LZO压缩
#a1.sinks.k1.hdfs.codeC=lzop
# 绑定和连接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

自定义拦截器

package com.interceptors;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.util.DateUtil;
import com.util.ETLUtil;
import com.util.JsonParseUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;import java.util.ArrayList;
import java.util.List;
import java.util.Map;/*** @ClassName:TestInterceptor* @author: zhengkw* @description: 根据测试数据写的demo* @date: 20/10/27下午 1:49* @version:1.0* @since: jdk 1.8*/
public class TestInterceptor implements Interceptor {
    private List<Event> results = new ArrayList<>();@Overridepublic void initialize() {
    }@Overridepublic Event intercept(Event event) {
    //获取bodybyte[] body = event.getBody();Map<String, String> headers = event.getHeaders();//转换body为string类型String bodyStr = new String(body, StandardCharsets.UTF_8);//验证数据是否合法if (ETLUtil.validStartLog(bodyStr)) {
    JSONObject jsonObj = JsonParseUtil.getJsonObj(bodyStr);JSONArray data = jsonObj.getJSONArray("data");String ts = (String) data.getJSONObject(0).get("create_time");String ts1 = ts.replaceAll("/", "-");String date = ts1.substring(0, ts1.indexOf("."));String formatTime = DateUtil.date2TimeStamp(date, "yyyy-MM-dd HH:mm:ss");headers.put("timestamp", formatTime);return event;}return null;}@Overridepublic List<Event> intercept(List<Event> events) {
    //先清空resultsresults.clear();for (Event event : events) {
    Event e = intercept(event);//判断拦截的数据是否合法if (e != null) {
    //将合法的数据放入到集合中results.add(e);}}return results;}@Overridepublic void close() {
    }public static class Builder implements Interceptor.Builder {
    // 返回一个拦截器对象@Overridepublic Interceptor build() {
    return new TestInterceptor();}//读取agent配置文件中的参数@Overridepublic void configure(Context context) {
    }}
}

时间戳转换工具类

package com.util;import java.text.SimpleDateFormat;/*** @ClassName:DateUtil* @author: zhengkw* @description: 日期工具类* @date: 20/10/27下午 7:02* @version:1.0* @since: jdk 1.8*/
public class DateUtil {
    public static String date2TimeStamp(String date_str, String format) {
    try {
    SimpleDateFormat sdf = new SimpleDateFormat(format);//返回的是13位!return String.valueOf(sdf.parse(date_str).getTime());} catch (Exception e) {
    e.printStackTrace();}return "";}
}

总结

这里除了核心的代码展示,什么验证代码就不展示了!在这个过程中遇到了几个坑的地方。一个是网络


2020-10-27 17:18:21,630 (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:454)] HDFS IO error
java.io.IOException: Callable timed out after 30000 ms on file: hdfs://yxhdcluster/origin_data/ifcustomer/behavior-log/2020-10-27/iflog-.1603790269896.tmpat org.apache.flume.sink.hdfs.BucketWriter.callWithTimeout(BucketWriter.java:741)at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:247)at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:572)at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:412)at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutExceptionat java.util.concurrent.FutureTask.get(FutureTask.java:205)at org.apache.flume.sink.hdfs.BucketWriter.callWithTimeout(BucketWriter.java:734)

解决办法

就是在自定义的conf文件中加入一行
a1.sinks.hdfs-sink.hdfs.callTimeout = 300000

还有就是跑的过程中发现自己flume内存没调优。

#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 1000000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000000 #数据量大就调大

经验公式:byteCapacity>capacity=transactionCapacity

  相关解决方案