当前位置: 代码迷 >> Ruby/Rails >> ruby接收pig流式处置文件内容
  详细解决方案

ruby接收pig流式处置文件内容

热度:117   发布时间:2016-04-29 02:20:54.0
ruby接收pig流式处理文件内容
大数据操作中涉及到数据清洗步奏还是用脚本处理比较方便,下边介绍一下pig加载hdfs文件后调用ruby脚本处理数据,再返回数据流至pig中处理的一个简单案例。
注意:ruby的流式处理用到wukong这个gem包,相关下载:
https://github.com/mrflip/wukong

pig中加载分布式文件调用ruby流式处理:
log = load '$INFILE' using PigStorage('\t');define tracking_parser `/usr/local/bin/ruby parse_click.rb --map` SHIP('parse_click.rb', 'click_tracking.rb');strmo = stream log through tracking_parser;store strmo into '$OUTFILE' using PigStorage('\t');


require 'wukong'require 'json'require './click_tracking.rb'module ParseClick  class Mapper < Wukong::Streamer::RecordStreamer    def before_stream      @bad_count = 0    end    def after_stream      raise RuntimeError, "Exceeded bad records : [email protected]_count}" if @bad_count > 10    end    def process *records      yield ClickTracking.new(JSON.parse(records[2])).to_a    rescue => e      @bad_count += 1      warn "Bad record #{e}: #{records[2]}"    end  endendWukong.run ParseClick::Mapper, nil


require 'date'require './models.rb'class ClickTracking    output :ip  output :c_date  #output your other atrributes  def c_date    click_date.strftime("%Y%m%d").to_i  end    def ip    browser_ip.to_i  endend


其中
strmo = stream log through tracking_parser;调用定义的外部程序tracking_parser处理log对象。
Wukong.run ParseClick::Mapper, nil执行完后,将ruby执行结果回调pig接收。
store strmo into '$OUTFILE' using PigStorage('\t');做结果存储持久化。