阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Pig简单的代码实例:报表统计行业中的点击和曝光量

144次阅读
没有评论

共计 6924 个字符,预计需要花费 18 分钟才能阅读完成。

注意:pig 中用 run 或者 exec 运行脚本。除了 cd 和 ls,其他命令不用。在本代码中用 rm 和 mv 命令做例子,容易出错。

另外,pig 只有在 store 或 dump 时候才会真正加载数据,否则,只是加载代码,不具体操作数据。所以在 rm 操作时必须注意该文件是否已经生成。如果 rm 的文件为生成,可以第三文件,进行 mv 改名操作

SET job.name ‘test_age_reporth_istorical’;– 定义任务名字,在 http://172.XX.XX.XX:50030/jobtracker.jsp 中查看任务状态,失败成功。

SET job.priority HIGH;– 优先级

– 注册 jar 包,用于读取 sequence file 和输出分析结果文件
REGISTER piggybank.jar;
DEFINE SequenceFileLoader org.apache.pig.piggybank.storage.SequenceFileLoader(); – 读取二进制文件,函数名定义

%default Cleaned_Log /user/C/data/XXX/cleaned/$date/*/part* –$date 是外部传入参数

%default AD_Data /user/XXX/data/xxx/metadata/ad/part*
%default Campaign_Data /user/xxx/data/xxx/metadata/campaign/part*
%default Social_Data /user/xxx/data/report/socialdata/part*

– 所有的输出文件路径:
%default Industry_Path $file_path/report/historical/age/$year/industry
%default Industry_SUM $file_path/report/historical/age/$year/industry_sum
%default Industry_TMP $file_path/report/historical/age/$year/industry_tmp

%default Industry_Brand_Path $file_path/report/historical/age/$year/industry_brand
%default Industry_Brand_SUM $file_path/report/historical/age/$year/industry_brand_sum
%default Industry_Brand_TMP $file_path/report/historical/age/$year/industry_brand_tmp

%default ALL_Path $file_path/report/historical/age/$year/all
%default ALL_SUM $file_path/report/historical/age/$year/all_sum
%default ALL_TMP $file_path/report/historical/age/$year/all_tmp

%default output_path /user/xxx/tmp/result

 

origin_cleaned_data = LOAD ‘$Cleaned_Log’ USING PigStorage(‘,’) – 读取日志文件
AS (ad_network_id:chararray,
    xxx_ad_id:chararray,
    guid:chararray,
    id:chararray,
    create_time:chararray,
    action_time:chararray,
    log_type:chararray,
    ad_id:chararray,
    positioning_method:chararray,
    location_accuracy:chararray,
    lat:chararray,
    lon:chararray,
    cell_id:chararray,
    lac:chararray,
    mcc:chararray,
    mnc:chararray,
    ip:chararray,
    connection_type:chararray,
    Android_id:chararray,
    android_advertising_id:chararray,
    openudid:chararray,
    mac_address:chararray,
    uid:chararray,
    density:chararray,
    screen_height:chararray,
    screen_width:chararray,
    user_agent:chararray,
    app_id:chararray,
    app_category_id:chararray,
    device_model_id:chararray,
    carrier_id:chararray,
    os_id:chararray,
    device_type:chararray,
    os_version:chararray,
    country_region_id:chararray,
    province_region_id:chararray,
    city_region_id:chararray,
    ip_lat:chararray,
    ip_lon:chararray,
    quadkey:chararray);

–loading metadata/ad(adId,campaignId)
metadata_ad = LOAD ‘$AD_Data’ USING PigStorage(‘,’) AS (adId:chararray, campaignId:chararray);

–loading metadata/campaign数据(campaignId, industryId, brandId)
metadata_campaign = LOAD ‘$Campaign_Data’ USING PigStorage(‘,’) AS (campaignId:chararray, industryId:chararray, brandId:chararray);

–ad and campaign for inner join
joinAdCampaignByCampaignId = JOIN metadata_ad BY campaignId,metadata_campaign BY campaignId;–(adId,campaignId,campaignId,industryId,brandId)
–filtering out redundant column of joinAdCampaignByCampaignId
joined_ad_campaign_data = FOREACH joinAdCampaignByCampaignId GENERATE $0 AS adId,$3 AS industryId,$4 AS brandId; –(adId,industryId,brandId)

–extract column for analyzing
origin_historical_age = FOREACH origin_cleaned_data GENERATE xxx_ad_id,guid,log_type;–(xxx_ad_id,guid,log_type)
–distinct
distinct_origin_historical_age = DISTINCT origin_historical_age;–(xxx_ad_id,guid,log_type)

–loading metadata_region(guid_social, sex, age, income, edu, hobby)
metadata_social = LOAD ‘$Social_Data’ USING PigStorage(‘,’) AS (guid_social:chararray, sex:chararray, age:chararray, income:chararray, edu:chararray, hobby:chararray);
–extract needed column in metadata_social
social_age = FOREACH metadata_social GENERATE guid_social,age;

–join socialData(metadata_social) and logData(distinct_origin_historical_age):
joinedByGUID = JOIN social_age BY guid_social, distinct_origin_historical_age BY guid;
–(guid_social, age; xxx_ad_id,guid,log_type)

 

–generating analyzing age data
joined_orgin_age_data = FOREACH joinedByGUID GENERATE xxx_ad_id,guid,log_type,age;
joinedByAdId = JOIN joined_ad_campaign_data BY adId, joined_orgin_age_data BY xxx_ad_id; –(adId,industryId,brandId,xxx_ad_id,guid,log_type,age)
–filtering
all_current_data = FOREACH joinedByAdId GENERATE guid,log_type,industryId,brandId,age; –(guid,log_type,industryId,brandId,age)

–for industry analyzing
industry_current_data = FOREACH all_current_data GENERATE industryId,guid,age,log_type;  –(industryId,guid,age,log_type)

–load all in the path “industry”
industry_existed_Data = LOAD ‘$Industry_Path’ USING PigStorage(‘,’) AS (industryId:chararray,guid:chararray,age:chararray,log_type:chararray);

–merge with history data
union_Industry = UNION industry_existed_Data, industry_current_data;
distict_union_industry = DISTINCT union_Industry;
group_industry = GROUP distict_union_industry BY ($2,$0,$3);
count_guid_for_industry = FOREACH group_industry GENERATE FLATTEN(group),COUNT($1.$1);

rm $Industry_SUM;
STORE count_guid_for_industry INTO ‘$Industry_SUM’ USING PigStorage(‘,’);

–storing union industry data(current and history)
STORE distict_union_industry INTO ‘$Industry_TMP’ USING PigStorage(‘,’);
rm $Industry_Path
mv $Industry_TMP $Industry_Path

–counting guid for industry and brand
industry_brand_current = FOREACH all_current_data GENERATE age,industryId,brandId,log_type,guid;
–(age,industryId,brandId,log_type,guid)

–load history data of industry_brand
industry_brand_history = LOAD ‘$Industry_Brand_Path’ USING PigStorage(‘,’) AS(age:chararray, industryId:chararray, brandId:chararray, log_type:chararray, guid:chararray);

–union all data of industry_brand
union_industry_brand = UNION industry_brand_current,industry_brand_history;
unique_industry_brand = DISTINCT union_industry_brand;
–(age,industryId,brandId,log_type,guid)

–counting users’ number for industry and brand
group_industry_brand = GROUP unique_industry_brand BY ($0,$1,$2,$3);
count_guid_for_industry_brand = FOREACH group_industry_brand GENERATE FLATTEN(group),COUNT($1.$4);

rm $Industry_Brand_SUM;
STORE count_guid_for_industry_brand INTO ‘$Industry_Brand_SUM’ USING PigStorage(‘,’);

STORE unique_industry_brand INTO ‘$Industry_Brand_TMP’ USING PigStorage(‘,’);
rm $Industry_Brand_Path;
mv $Industry_Brand_TMP $Industry_Brand_Path

–counting user number for age and logtype
current_data = FOREACH all_current_data GENERATE age,log_type,guid;–(age,log_type,guid)

–load history data of age and logtype
history_data = LOAD ‘$ALL_Path’ USING PigStorage(‘,’) AS(age:chararray,log_type:chararray,guid:chararray);

–union current and history data
union_all_data = UNION history_data, current_data;
unique_all_data = DISTINCT union_all_data;

–count users’ number
group_all_data = GROUP unique_all_data BY ($0,$1);
count_guid_for_age_logtype = FOREACH group_all_data GENERATE FLATTEN(group),COUNT($1.$2);

rm $ALL_SUM;
STORE count_guid_for_age_logtype INTO ‘$ALL_SUM’ USING PigStorage(‘,’);

STORE unique_all_data INTO ‘$ALL_TMP’ USING PigStorage(‘,’);
rm $ALL_Path
mv $ALL_TMP $ALL_Path

Pig 的安装与测试 http://www.linuxidc.com/Linux/2014-07/104039.htm

Pig 安装与配置教程 http://www.linuxidc.com/Linux/2013-04/82785.htm

Pig 安装部署及 MapReduce 模式下测试 http://www.linuxidc.com/Linux/2013-04/82786.htm

Pig 安装及本地模式测试, 体验 http://www.linuxidc.com/Linux/2013-04/82783.htm

Pig 的安装配置与基本使用 http://www.linuxidc.com/Linux/2013-02/79928.htm

Hadoop Pig 进阶语法 http://www.linuxidc.com/Linux/2013-02/79462.htm

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-20发表,共计6924字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中