樣例包含三部分代碼,周的處理函數部分、業務數據處理部分及多線程跑批調度處理部分。
代碼按功能分類存放,有助於使代碼更清晰,通過from...import的方式,使代碼重復使用。
另外,多線程的調用部分,有效處理了程序先後依賴及多程序串並行跑批問題,為以後相似問題的處理,提供了借鑒。
1、周處理函數
/Users/nisj/PycharmProjects/EsDataProc/Remain_Data_Proc/WeekCalc.py
# -*- coding=utf-8 -*-
import warnings
import datetime
warnings.filterwarnings("ignore")
def getNowYearWeek():
# 當前時間年第幾周的計算
timenow = datetime.datetime.now() - datetime.timedelta(days=7)
NowYearWeek = timenow.isocalendar()
return str(NowYearWeek[0])+"#"+str(NowYearWeek[1])
def dateRange(beginDate, endDate):
dates = []
dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d")
date = beginDate[:]
while date <= endDate:
dates.append(date)
dt = dt + datetime.timedelta(1)
date = dt.strftime("%Y-%m-%d")
return dates
def weekRang(beginDate, endDate):
week = set()
for date in dateRange(beginDate, endDate):
week.add(datetime.date(int(date[0:4]), int(date[5:7]), int(date[8:10])).isocalendar()[0:2])
wk_l = []
for wl in sorted(list(week)):
wk_l.append(str(wl[0])+'#'+str(wl[1]))
return wk_l
def currWeekList(his_week):
last_wk = datetime.datetime.now() - datetime.timedelta(days=7)
end_day = str(last_wk)[0:10]
curr_week_list = []
for week in weekRang('2015-07-01', end_day):
if (int(week[0:4]) == int(his_week[0:4]) and int(week[5:]) >= int(his_week[5:])) or (int(week[0:4]) > int(his_week[0:4])):
curr_week_list.append(week)
return curr_week_list
def hisRunWeekList(his_week):
batch_week_list = []
for curr_week in currWeekList(his_week):
if (int(his_week[0:4]) == int(curr_week[0:4]) and int(his_week[5:]) <= int(curr_week[5:])) or (int(his_week[0:4]) < int(curr_week[0:4])):
batch_week_list.append(([curr_week, his_week],None))
return batch_week_list
def RuningWeekList():
curr_week = getNowYearWeek()
batch_week_list = []
for his_week in currWeekList('2015#27'):
if (int(his_week[0:4]) == int(curr_week[0:4]) and int(his_week[5:]) <= int(curr_week[5:])) or (int(his_week[0:4]) < int(curr_week[0:4])):
batch_week_list.append(([curr_week, his_week],None))
return batch_week_list
def getWeekFristday(weekflag):
yearnum = weekflag[0:4] # 取到年份
weeknum = weekflag[5:7] # 取到周
stryearstart = yearnum + '0101' # 當年第一天
yearstart = datetime.datetime.strptime(stryearstart, '%Y%m%d') # 格式化為日期格式
yearstartcalendarmsg = yearstart.isocalendar() # 當年第一天的周信息
yearstartweekday = yearstartcalendarmsg[2]
yearstartyear = yearstartcalendarmsg[0]
if yearstartyear < int(yearnum):
daydelat = (8 - int(yearstartweekday)) + (int(weeknum) - 1) * 7
else:
daydelat = (8 - int(yearstartweekday)) + (int(weeknum) - 2) * 7
week1day = (yearstart + datetime.timedelta(days=daydelat)).date()
return week1day
# Batch Test
# his_week_list = ['2015#46', '2015#45', '2016#2']
# batch_week_list = []
# for his_week in his_week_list:
# batch_week_list.extend(hisRunWeekList(his_week))
# print batch_week_list
# print getWeekFristday('2016#11')
# his_week = '2016#11'
# print currWeekList(his_week)
# print getNowYearWeek()
2、業務處理部分
/Users/nisj/PycharmProjects/EsDataProc/Remain_Data_Proc/Hive_remain_byWeek_proc.py
# -*- coding=utf-8 -*-
import time
import os
import re
from WeekCalc import *
warnings.filterwarnings("ignore")
def newuser_byweek_proc(batch_week):
week1day = getWeekFristday(batch_week)
os.system("""/usr/lib/hive-current/bin/hive -e " \
alter table bi_newuser_byweek drop if exists partition(pt_week='%s'); \
alter table bi_newuser_byweek add partition(pt_week='%s'); \
insert into table bi_newuser_byweek partition (pt_week='%s') \
select a1.appsource,a1.appkey,a1.identifier,a1.uid from ( \
select appsource,appkey,identifier,uid \
from bi_all_access_log \
where case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,'#',weekofyear(pt_day)) else concat(year(pt_day),'#',weekofyear(pt_day)) end = '%s' \
group by appsource,appkey,identifier,uid) a1 \
left join \
(select appsource,appkey,identifier,uid \
from bi_all_access_log \
where pt_day < '%s' ) a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource and a1.uid=a2.uid \
where a2.identifier is null \
;" \
""" % (batch_week, batch_week, batch_week, batch_week, week1day));
def user_remain_payamount_byweek(curr_week, his_week):
os.system("""/usr/bin/mysql -hMysqlHost -P6603 -uHadoop -pMysqlPass -e "use funnyai_data; \
delete from bi_user_remain_payamount_byweek where data_week='%s' and remain_week='%s'; \
" """ % (his_week, curr_week))
newuser_remain_pay_data = os.popen("""source /etc/profile; \
/usr/lib/hive-current/bin/hive -e " \
add jar /home/hadoop/nisj/udf-jar/hadoop_udf_radixChange.jar; \
create temporary function RadixChange as 'com.kascend.hadoop.RadixChange'; \
with his_new_user as (select appsource,appkey,identifier,RadixChange(uid,16,10) uid \
from bi_newuser_byweek \
where pt_week = '%s' \
), \
curr_week_data as (select appsource,appkey,identifier,RadixChange(uid,16,10) uid \
from bi_all_access_log \
where case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,'#',weekofyear(pt_day)) else concat(year(pt_day),'#',weekofyear(pt_day)) end = '%s' \
group by appsource,appkey,identifier,RadixChange(uid,16,10)), \
curr_week_pay as (select uid,sum(amount) amount \
from data_chushou_pay_info \
where state=0 and \
case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,'#',weekofyear(pt_day)) else concat(year(pt_day),'#',weekofyear(pt_day)) end = '%s' \
group by uid) \
select b1.appkey,b1.appsource,sum(b2.amount) pay_amount from \
(select a1.appkey,a1.appsource,a1.uid \
from his_new_user a1 \
inner join curr_week_data a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource \
group by a1.appkey,a1.appsource,a1.uid) b1 \
left join curr_week_pay b2 on b1.uid=b2.uid \
group by b1.appkey,b1.appsource \
;" \
""" % (his_week, curr_week, curr_week)).readlines();
nrpd_list = []
for nrp_list in newuser_remain_pay_data:
nrp = re.split('\t', nrp_list.replace('\n', ''))
nrpd_list.append(nrp)
for nrpd in nrpd_list:
remain_week = curr_week
appkey = nrpd[0]
appsource = nrpd[1]
pay_amount = nrpd[2]
etl_time = time.strftime('%Y-%m-%d %X', time.localtime())
os.system("""/usr/bin/mysql -hMysqlHost -P6603 -uhadoop -pMysqlPass -e "use funnyai_data; \
insert into bi_user_remain_payamount_byweek(data_week,appsource,appkey,remain_week,pay_amount,etl_time) \
select '%s','%s','%s','%s','%s','%s'; \
" """ % (his_week, appsource, appkey, remain_week, pay_amount, etl_time))
def user_remain_pay_byweek(curr_week, his_week):
os.system("""/usr/bin/mysql -hMysqlHost -P6603 -uhadoop -pMysqlPass -e "use funnyai_data; \
delete from bi_user_remain_pay_byweek where data_week='%s' and remain_week='%s'; \
" """ % (his_week, curr_week))
newuser_remain_pay_data = os.popen("""source /etc/profile; \
/usr/lib/hive-current/bin/hive -e " \
add jar /home/hadoop/nisj/udf-jar/hadoop_udf_radixChange.jar; \
create temporary function RadixChange as 'com.kascend.hadoop.RadixChange'; \
with his_new_user as (select appsource,appkey,identifier,RadixChange(uid,16,10) uid \
from bi_newuser_byweek \
where pt_week = '%s' \
), \
curr_week_data as (select appsource,appkey,identifier,RadixChange(uid,16,10) uid \
from bi_all_access_log \
where case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,'#',weekofyear(pt_day)) else concat(year(pt_day),'#',weekofyear(pt_day)) end = '%s' \
group by appsource,appkey,identifier,RadixChange(uid,16,10)) \
select a1.appkey,a1.appsource,count(distinct a2.identifier) remain_cnt,0 pay_amount \
from his_new_user a1 \
inner join curr_week_data a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource \
group by a1.appkey,a1.appsource \
;" \
""" % (his_week, curr_week)).readlines();
nrpd_list = []
for nrp_list in newuser_remain_pay_data:
nrp = re.split('\t', nrp_list.replace('\n', ''))
nrpd_list.append(nrp)
for nrpd in nrpd_list:
remain_week = curr_week
appkey = nrpd[0]
appsource = nrpd[1]
remain_cnt = nrpd[2]
pay_amount = nrpd[3]
etl_time = time.strftime('%Y-%m-%d %X', time.localtime())
os.system("""/usr/bin/mysql -hMysqlHost -P6603 -uhadoop -pMysqlPass -e "use funnyai_data; \
insert into bi_user_remain_pay_byweek(data_week,appsource,appkey,remain_week,remain_cnt,pay_amount,etl_time) \
select '%s','%s','%s','%s','%s','%s','%s'; \
" """ % (his_week, appsource, appkey, remain_week, remain_cnt, pay_amount, etl_time))
# Batch Test
# curr_week = '2016#6'
# his_week = '2015#46'
# user_remain_payamount_byweek(curr_week, his_week)
# user_remain_pay_byweek(curr_week, his_week)
# batch_week = '2015#46'
# newuser_byweek_proc(batch_week)
另:供打印sql測試的代碼
/Users/nisj/PycharmProjects/EsDataProc/Remain_Data_Proc/xx.py
# -*- coding=utf-8 -*-
import time
import os
import re
from WeekCalc import *
warnings.filterwarnings("ignore")
def newuser_byweek_proc(batch_week):
week1day = getWeekFristday(batch_week)
sql_text = """/usr/lib/hive-current/bin/hive -e " \
alter table bi_newuser_byweek drop if exists partition(pt_week='%s'); \
alter table bi_newuser_byweek add partition(pt_week='%s'); \
insert into table bi_newuser_byweek partition (pt_week='%s') \
select a1.appsource,a1.appkey,a1.identifier,a1.uid from ( \
select appsource,appkey,identifier,uid \
from bi_all_access_log \
where case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,'#',weekofyear(pt_day)) else concat(year(pt_day),'#',weekofyear(pt_day)) end = '%s' \
group by appsource,appkey,identifier,uid) a1 \
left join \
(select appsource,appkey,identifier,uid \
from bi_all_access_log \
where pt_day < '%s' ) a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource and a1.uid=a2.uid \
where a2.identifier is null \
;" \
""" % (batch_week, batch_week, batch_week, batch_week, week1day);
print sql_text
def user_remain_payamount_byweek(curr_week, his_week):
sql_text="""source /etc/profile; \
/usr/lib/hive-current/bin/hive -e " \
add jar /home/hadoop/nisj/udf-jar/hadoop_udf_radixChange.jar; \
create temporary function RadixChange as 'com.kascend.hadoop.RadixChange'; \
with his_new_user as (select appsource,appkey,identifier,RadixChange(uid,16,10) uid \
from bi_newuser_byweek \
where pt_week = '%s' \
), \
curr_week_data as (select appsource,appkey,identifier,RadixChange(uid,16,10) uid \
from bi_all_access_log \
where case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,'#',weekofyear(pt_day)) else concat(year(pt_day),'#',weekofyear(pt_day)) end = '%s' \
group by appsource,appkey,identifier,RadixChange(uid,16,10)), \
curr_week_pay as (select uid,sum(amount) amount \
from data_chushou_pay_info \
where state=0 and \
case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,'#',weekofyear(pt_day)) else concat(year(pt_day),'#',weekofyear(pt_day)) end = '%s' \
group by uid) \
select b1.appkey,b1.appsource,sum(b2.amount) pay_amount from \
(select a1.appkey,a1.appsource,a1.uid \
from his_new_user a1 \
inner join curr_week_data a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource \
group by a1.appkey,a1.appsource,a1.uid) b1 \
left join curr_week_pay b2 on b1.uid=b2.uid \
group by b1.appkey,b1.appsource \
;" \
""" % (his_week, curr_week, curr_week);
print sql_text
def user_remain_pay_byweek(curr_week, his_week):
sql_text="""source /etc/profile; \
/usr/lib/hive-current/bin/hive -e " \
add jar /home/hadoop/nisj/udf-jar/hadoop_udf_radixChange.jar; \
create temporary function RadixChange as 'com.kascend.hadoop.RadixChange'; \
with his_new_user as (select appsource,appkey,identifier,RadixChange(uid,16,10) uid \
from bi_newuser_byweek \
where pt_week = '%s' \
), \
curr_week_data as (select appsource,appkey,identifier,RadixChange(uid,16,10) uid \
from bi_all_access_log \
where case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,'#',weekofyear(pt_day)) else concat(year(pt_day),'#',weekofyear(pt_day)) end = '%s' \
group by appsource,appkey,identifier,RadixChange(uid,16,10)) \
select a1.appkey,a1.appsource,count(distinct a2.identifier) remain_cnt,0 pay_amount \
from his_new_user a1 \
inner join curr_week_data a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource \
group by a1.appkey,a1.appsource \
;" \
""" % (his_week, curr_week);
print sql_text
# Batch Test
# curr_week = '2016#6'
# his_week = '2015#46'
# user_remain_payamount_byweek(curr_week, his_week)
# user_remain_pay_byweek(curr_week, his_week)
# batch_week = '2015#46'
# newuser_byweek_proc(batch_week)
3、多線程批調度
/Users/nisj/PycharmProjects/EsDataProc/Remain_Data_Proc/BatchThread.py
# -*- coding=utf-8 -*-
import threadpool
from Hive_remain_byWeek_proc import *
# from xx import *
warnings.filterwarnings("ignore")
today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)
now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "當前時間是:",now_time
# 新用戶數據先跑出來
last_week = [getNowYearWeek()]
request_newuser_byweek_proc = threadpool.makeRequests(newuser_byweek_proc, last_week)
frist_pool = threadpool.ThreadPool(8)
[frist_pool.putRequest(req) for req in request_newuser_byweek_proc]
frist_pool.wait()
# 然後再執行用戶留存和充值金額數據
if True:
batch_week_list = RuningWeekList()
requests = []
request_user_remain_payamount_byweek = threadpool.makeRequests(user_remain_payamount_byweek, batch_week_list)
request_user_remain_pay_byweek = threadpool.makeRequests(user_remain_pay_byweek, batch_week_list)
requests.extend(request_user_remain_payamount_byweek)
requests.extend(request_user_remain_pay_byweek)
main_pool = threadpool.ThreadPool(8)
[main_pool.putRequest(req) for req in requests]
if __name__ == '__main__':
while True:
try:
time.sleep(960)
main_pool.poll()
except KeyboardInterrupt:
print("**** Interrupted!")
break
except threadpool.NoResultsPending:
break
if main_pool.dismissedWorkers:
print("Joining all dismissed worker threads...")
main_pool.joinAllDismissedWorkers()
now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "當前時間是:",now_time