当前位置: 代码迷 >> 综合 >> 基于Hadoo的日志收集框架---Chukwa的源码分析(数据处理)
  详细解决方案

基于Hadoo的日志收集框架---Chukwa的源码分析(数据处理)

热度:6   发布时间:2023-12-22 16:40:44.0

1.工具类、接口简介

(1)

   // 用于对数据进行分类org.apache.hadoop.chukwa.extraction.demux.DemuxManager// mapreduce程序的map处理接口org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor// MapProcessor的工厂类, 用HashMap存储了数据类型与其对应的MapProcessororg.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessorFactory// map处理接口的实现类, 实现了数据块(ChunkImpl)处理、逐行获取内容、创建ChukwaRecord对象,// 但是将写入OutputCollector的实现放在了抽象方法parse()的实现类中org.apache.hadoop.chukwa.extraction.demux.processor.mapper.AbstractProcessor// AbstractProcessor中parse()方法的实现类org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor// mapreduce程序的reduce处理接口org.apache.hadoop.chukwa.extraction.demux.processor.reducer.ReduceProcessor// reduce处理接口的实现类, 实现了将key、value写入OutputCollector的过程org.apache.hadoop.chukwa.extraction.demux.processor.reducer.IdentityReducer

 

(2)

   // 用于对数据进行归档org.apache.hadoop.chukwa.extraction.archive.ChukwaArchiveManager

 

(3)

   // 用于将分类后的数据转储备份org.apache.hadoop.chukwa.extraction.demux.PostProcessorManager

 

(4)

   // 用于将备份数据按小时进行合并org.apache.hadoop.chukwa.extraction.demux.HourlyChukwaRecordRolling

 

(5)

   // 用于将备份数据按日期进行合并org.apache.hadoop.chukwa.extraction.demux.DailyChukwaRecordRolling

 

2.处理流程

(1)数据分类
    org.apache.hadoop.chukwa.extraction.demux.DemuxManager

/*** 创建守护进程"DemuxManager",将进程号写入相应的pid文件,以便于运行stop命令时可根据此pid文件杀死进程* 调用start方法启动循环处理过程*/
public static void main(String[] args)/*** 1. 初始化处理过程中用到的目录*    chukwaRootDir 	/chukwa/*	  demuxRootDir		/chukwa/demuxProcessing/*	  demuxErrorDir		/chukwa/demuxProcessing/inError/*	  demuxInputDir		/chukwa/demuxProcessing/mrInput/*	  demuxOutputDir	/chukwa/demuxProcessing/mrOutput/*	  dataSinkDir		/chukwa/logs/*	  postProcessDir	/chukwa/postProcess/*	  archiveRootDir	/chukwa/dataSinkArchives/* 2. 从系统配置文件中获取nagios的配置信息* 3. 循环处理*	  统计异常次数,如果超过五次则退出*	  检测demuxOutputDir是否存在,如果存在则将其删除*	  检测demuxInputDir是否存在,如果存在则设置可以处理并将处理次数加1,如果已经尝试过3次则将则将demuxInputDir转移到demuxErrorDir* 	  如果以上两个目录都不存在,则:*	  	 reprocess归0*		 在dataSinkDir目录下检测是否有.done文件,如果存在则将其移动到demuxInputDir目录下(最多移动500个),移动完成后将demuxReady设为true*    如果demuxReady已经为true,则开始处理数据(processData)*    发送demux信息到nagios 	*/
public void start()/*** 1. 运行Demux,设置reduce数为demuxReducerCount,设置输入输出目录分别为demuxInputDir、demuxOutputDir* 2. 检测demuxOutputDir是否存在,存在则将demuxOutputDir转移到postProcessDir* 3. 将demuxInputDir转移到archiveDir(archiveRootDir)*/
protected boolean processData(String dataSinkDir, String demuxInputDir, String demuxOutputDir, String postProcessDir, String archiveDir)/*** 在main方法中使用ToolRunner启动Demux, 调用内部类MapClass、ReduceClass进行mapreduce处理* 在运行时设置输出的格式化类为ChukwaRecordOutputFormat*/
public static void main(String[] args) 

 

 

    org.apache.hadoop.chukwa.extraction.demux.Demux
    在Demux中实现了对数据分类的Mapper类和Reducer类

    org.apache.hadoop.chukwa.extraction.demux.Demux$MapClass

/*** 根据ChunkImpl的datatype在$CHUKWA_HOME/conf/chukwa-demux-conf.xml中查找是否配置了mapProcessorClass* 如果没有则默认使用DefaultProcessor处理* 将获取的mapProcessorClass添加到mapProcessor工厂类的hashmap中* 在AbstractProcessor中实现了process方法*	  获取数据块的相关信息*	  从数据块中逐行读取内容*	  将处理行内容的parse方法实现放在了DefaultProcessor中, 在此方法中调用父类的buildGenericRecord方法完成*	  ChukwaArchiveKey 	-> 	ChukwaRecordKey*	  ChunkImpl			-> 	ChukwaRecord*	  的转换*	  写入OutputCollector*/
public void map(ChukwaArchiveKey key, ChunkImpl chunk,OutputCollector<ChukwaRecordKey, ChukwaRecord> output,Reporter reporter)

 

    org.apache.hadoop.chukwa.extraction.demux.Demux$ReduceClass

/*** 根据ChukwaRecordKey的reduceType来确定reduceProcessorClass* org.apache.hadoop.chukwa.extraction.demux.processor.reducer. + reduceType* 如果没有找到类则使用IdentityReducer* 在IdentityReducer的process方法中只是将key、value写入OutputCollector*/
public void reduce(ChukwaRecordKey key, Iterator<ChukwaRecord> values,OutputCollector<ChukwaRecordKey, ChukwaRecord> output,Reporter reporter)

 

    org.apache.hadoop.chukwa.extraction.demux.ChukwaRecordOutputFormat
    继承自MultipleSequenceFileOutputFormat,用于格式化输出结果

/*** 使输出目录为clusterName/dataType/dataType+Util.generateTimeOutput(record.getTime())*/
protected String generateFileNameForKeyValue(ChukwaRecordKey key,ChukwaRecord record, String name) 

 

    org.apache.hadoop.chukwa.extraction.demux.processor.Util

/*** 根据时间戳构造文件名* 从timestamp中获取workingDay、workingHour、minutes* 如果运行日期不等于当前日期,则输出文件为*	  [dataType]_[yyyyMMdd].D.evt* 否则如果运行时间(小时)不等于当前时间(小时),则输出文件为*	  [dataType]_[yyyyMMdd]_[HH].H.evt* 否则输出结果文件为*	  分钟的个位数小于5,*		 [dataType]_[yyyyMMdd]_[HH]_[minutes/10].0.R.evt*    分钟的个位数大于等于5*		 [dataType]_[yyyyMMdd]_[HH]_[minutes/10].5.R.evt*/
public static String generateTimeOutput(long timestamp)

 

(2)数据归档
    org.apache.hadoop.chukwa.extraction.archive.ChukwaArchiveManager

/*** 创建守护进程"DemuxManager",将进程号写入相应的pid文件,以便于运行stop命令时可根据此pid文件杀死进程* 调用start方法启动循环处理过程*/
public static void main(String[] args)/*** 1. 初始化处理过程中用到的目录*    chukwaRootDir 				/chukwa/*	  archiveRootDir				/chukwa/dataSinkArchives/*	  archivesRootProcessingDir		/chukwa/archivesProcessing/*	  archivesMRInputDir			/chukwa/archivesProcessing/mrInput/*	  archivesMROutputDir			/chukwa/archivesProcessing/mrOutput/*	  finalArchiveOutput			/chukwa/finalArchives/* 2. 开始循环处理*		errorCount记录错误次数,当超过四次时退出*		判断archivesMRInputDir目录是否存在(是否已经有archive job的输入目录存在),存在则使用runArchive方法对当天数据进行归档*		获取archiveRootDir目录下的文件,*		如果没有文件则休眠一分钟*		如果只有一个日期目录,则判断是否是当天且当前时间是否与上次运行间隔了两个小时,如果还不到两个小时则休眠半个小时*		如果有多个日期目录,则使用processDay方法处理每个日期目录下的文件*/
public void start()/*** 处理指定日期的数据*     now				当前时间		*	   currentDay		当前日期*	   workingDay		要处理的日期目录*	   oneHourAgo		一个小时之前的时间* 如果要处理的日期目录下没有文件且日期在当天日期之前,则将目录删除后返回* 使用fileCount记录文件总数, 遍历日期目录* 	   如果文件修改时间不到一个小时*		   记录日期目录下的文件数*		   将文件转移到archivesMRInputDir目录*		   如果文件数已经到达最大文件数(500),则调用runArchive方法进行归档,并将fileCount重置为0*/
public void processDay(FileStatus fsDay, String archivesMRInputDir,String archivesMROutputDir, String finalArchiveOutput)/*** 1. 构建job运行参数,从配置文件中加载配置项"archive.grouper"(归档分组类型)* 2. 检测mrOutput目录是否存在,存在则删除* 3. 使用ToolRunner运行ChukwaArchiveBuilder* 4. 使用mrInput中的日期在finalArchiveOutput中创建对应的目录* 5. 将mrOutput目录转储到上一步创建的目录下的"archive_+当前时间"* 6. 删除mrInput目录*/
public void runArchive(String archivesMRInputDir,String archivesMROutputDir, String finalArchiveOutput)

 

    org.apache.hadoop.chukwa.extraction.archive.ChukwaArchiveBuilder

    org.apache.hadoop.chukwa.extraction.archive.ChukwaArchiveBuilder$UniqueKeyReduce

/*** 统计每个key有多少个value*/ 
public void reduce(ChukwaArchiveKey key, Iterator<ChunkImpl> vals,OutputCollector<ChukwaArchiveKey, ChunkImpl> out, Reporter r)/*** 在main方法中使用ToolRunner启动ChukwaArchiveBuilder,* 在run方法中调用IdentityMapper、内部类UniqueKeyReduce进行mapreduce处理* 根据不同的参数设置不同的partitionerClass、outputFormat、jobName 	*/
public static void main(String[] args)

 

(3)数据转储
    org.apache.hadoop.chukwa.extraction.demux.PostProcessorManager

/*** 创建守护进程"PostProcessorManager",将进程号写入相应的pid文件,以便于运行stop命令时可根据此pid文件杀死进程* 调用start方法启动循环处理过程*/
public static void main(String[] args)/*** 1. 初始化处理过程中用到的目录*    chukwaRootDir 					/chukwa/*	  postProcessDir					/chukwa/postProcess/*	  chukwaRootReposDir				/chukwa/repos/*	  chukwaPostProcessInErrorDir		/chukwa/postProcessInError/* 2. 开始循环处理*		errorCount记录了错误次数,如果已经超过四次则退出*		遍历demuxOutputDir,*		如果此目录下没有文件则休眠10秒*		如果有文件则将文件添加到directories(ArrayList<String>)并进行排序*		遍历directories*			对每一个文件使用processDemuxPigOutput方法进行处理,完成后调用*			movetoMainRepository方法移动文件,然后将文件删除,如果出现异常,*			则将文件转移到错误目录*/
public void start()/*** 从$CHUKWA_HOME/conf/chukwa-demux-conf.xml中加载配置项*	chukwa.post.demux.data.loader,得到两个实现类*	org.apache.hadoop.chukwa.dataloader.MetricDataLoaderPool,*	org.apache.hadoop.chukwa.dataloader.FSMDataLoader*	遍历配置的实现类*		分别获取实例,构建查找对象*		(/chukwa/postProcess/cluster/dataType/*.evt)*		将查找到的文件交由dataloader处理*/
public boolean processDemuxPigOutput(String directory)/*** 这里将移动到repository的功能整合到了org.apache.hadoop.chukwa.extraction.demux.MoveToRepository中*/
public boolean movetoMainRepository(String sourceDirectory,String repoRootDirectory)

 

    org.apache.hadoop.chukwa.extraction.demux.MoveToRepository

/*** 遍历要转储的目录*	  获取cluster名称,过滤掉"_logs"、"_temporary"*	  使用processClusterDirectory方法处理每个cluster下的文件*	  删除已经完成转储的cluster目录*/
public static void main(String[] args)/*** 遍历cluster目录*	  获取dataType名称*	  使用processDatasourceDirectory方法处理每个dataType下的文件*/
static void processClusterDirectory(Path srcDir, String destDir)/*** 遍历某个dataType下的evt文件*	  如果是以".D.evt"结尾的文件*		  获取日期*		  调用writeRecordFile方法写入文件,其中*		  输出目录为/chukwa/repos/<cluster>/<dataType>/<yyyyMMdd>/*	      输出文件为<dataType>_<yyyyMMdd>*	  如果是以".H.evt"结尾的文件*		  获取日期、小时*		  调用writeRecordFile方法写入文件,其中*		  输出目录为/chukwa/repos/<cluster>/<dataType>/<yyyyMMdd>/<HH>/*		  输出文件为<dataType>_<yyyyMMdd>_<HH>*		  调用addDirectory4Rolling方法对日期、小时做标记*	  如果是以".R.evt"结尾的文件*		  获取日期、小时、分钟*		  调用writeRecordFile方法写入文件,其中*		  输出目录为/chukwa/repos/<cluster>/<dataType>/<yyyyMMdd>/<HH>/<mm>/*		  输出文件为<dataType>_<yyyyMMdd>_<HH>_<mm>*		  调用addDirectory4Rolling方法对日期、小时做标记*/
static void processDatasourceDirectory(String cluster, Path srcDir, String destDir)/*** 将文件转储并改名*/
static void writeRecordFile(String destDir, Path recordFile, String fileName)/*** 在/chukwa/rolling/目录下生成相应数据类型的目录以便于进行数据合并* 如果不在当前日期,在/chukwa/rolling/下生成*	  daily/<yyyyMMdd>/<cluster>/<dataType>* 否则,在/chukwa/rolling/下生成*	  hourly/<yyyyMMdd>/<cluster>/<dataType>	*/
static void addDirectory4Rolling(boolean isDailyOnly, int day, int hour,String cluster, String dataSource)

 

(4)数据合并
    org.apache.hadoop.chukwa.extraction.demux.HourlyChukwaRecordRolling

/*** 1. 创建守护进程"PostProcessorManager",将进程号写入相应的pid文件,以便于运行stop命令时可根据此pid文件杀死进程* 2. 初始化处理过程中用到的目录* 	  rollingFolder 			/chukwa/rolling/*	  chukwaMainRepository    	/chukwa/repos/* 	  tempDir                 	/chukwa/temp/hourlyRolling/	* 3. 从执行参数中获取rollInSequence(是否顺序执行合并线程),deleteRawdata(是否删除原始数据)这两个参数设置的值* 4. 获取当前日期和小时* 5. 遍历 /chukwa/rolling/hourly/ 目录*   	 获取此目录下的每个日期,对其遍历*       	获取此目录下的小时,如果此目录日期小于当前日期,或两者相同而且此小时小于当前小时(即只对当前小时之前的数据进行合并)*          调用buildHourlyFiles*/
public static void main(String[] args)/*** 获取处理目录 /chukwa/rolling/hourly/workingDay/workingHour* 遍历此目录下所有的cluster*    遍历每个cluster目录下的所有数据类型(dataSource)*       创建目录 /chukwa/repos/cluster/dataSource/workingDay/workingHour/rotateDone*       构建合并参数数组 mergeArgs*       mergeArgs[0] = /chukwa/repos/cluster/dataSource/workingDay/workingHour/[0-5]* /*.evt*       mergeArgs[1] = /chukwa/temp/hourlyRolling/cluster/dataSource/workingDay/workingHour_currentTimeMillis*       mergeArgs[2] = /chukwa/repos/cluster/dataSource/workingDay/workingHour*       mergeArgs[3] = dataSource_HourlyDone_workingDay_workingHour*       mergeArgs[4] = /chukwa/rolling/hourly/workingDay/workingHour/cluster/dataSource*       新建RecordMerger对象merge*       如果rollInSequence为true,则启动merge线程*       否则将merge添加到列表并启动线程,等待此线程运行完成*       删除dataSource目录*    删除cluster目录* 删除hour目录*/
public static void buildHourlyFiles(String chukwaMainRepository,String tempDir, String rollingFolder, int workingDay,int workingHour)/*** 使用IdentityMapper作为mapperClass, 使用IdentityReducer作为reducerClass, 只是简单的合并数据*/
public int run(String[] args) 

 

    org.apache.hadoop.chukwa.extraction.demux.RecordMerger

/*** 调用ToolRunner.run运行tool, 即调用HourlyChukwaRecordRolling中的run方法,将* /chukwa/repos/cluster/dataSource/workingDay/workingHour/[0-5]* /*.evt* 合并到* /chukwa/temp/hourlyRolling/cluster/dataSource/workingDay/workingHour_currentTimeMillis* 如果此任务处理成功*    调用 writeRecordFile()*    如果deleteRawData参数为true*       删除输入文件*       /chukwa/repos/cluster/dataSource/workingDay/workingHour/[0-5]* /*.evt*       遍历 /chukwa/repos/cluster/dataSource/workingDay/workingHour*          删除小时或分钟的目录*          /chukwa/repos/cluster/dataSource/workingDay/workingHour[0-5]* /*    删除合并标记文件*    dataSource_HourlyDone_workingDay_workingHour*    删除临时目录*    /chukwa/temp/hourlyRolling/cluster/dataSource/workingDay/workingHour_currentTimeMillis*/
public void run()/*** 将mapreduce运行结果文件改名转储, 即将HourlyChukwaRecordRolling中的run方法合并后的文件转储* input 		/chukwa/temp/hourlyRolling/cluster/dataSource/workingDay/workingHour_currentTimeMillis/part-00000* outputDir  	/chukwa/repos/cluster/dataSource/workingDay/workingHour* filename 	dataSource_HourlyDone_workingDay_workingHour* destFile 	/chukwa/repos/cluster/dataSource/workingDay/workingHour/dataSource_HourlyDone_workingDay_workingHour.1.evt*/
void writeRecordFile(String input, String outputDir, String fileName)