当前位置: 移动技术网 > IT编程>脚本编程>Python > Python使用脚本进行用户信用评分体系计算的案例教程


2018年12月12日  | 移动技术网IT编程  | 我要评论

七乐彩条件 一休彩票1xcp,企业名字,定额宝














# -*- coding=utf-8 -*-
import warnings
import datetime
import calendar


def getnowday():
    daynow = datetime.datetime.today().strftime('%y-%m-%d')
    return daynow

def getyesterday():
    yesterday = datetime.datetime.today() - datetime.timedelta(days=1)
    yesterday = yesterday.strftime('%y-%m-%d')
    return yesterday

def getdaysxago(pt_day):
    daysxago = datetime.datetime.strptime(pt_day, '%y-%m-%d') - datetime.timedelta(days=7)
    daysxago = daysxago.strftime('%y-%m-%d')
    return daysxago

def getdays1ago(pt_day):
    days1ago = datetime.datetime.strptime(pt_day, '%y-%m-%d') - datetime.timedelta(days=1)
    days1ago = days1ago.strftime('%y-%m-%d')
    return days1ago

def getallmonthofyear():
    month_list = ['{num:02d}'.format(num=i) for i in range(1, 13, 1)]
    return month_list

def getallhourofday():
    hour_list = ['{num:02d}'.format(num=i) for i in range(0, 24, 1)]
    return hour_list

def getdayofmonth(year, month):
    day_list = range(calendar.monthrange(year, month)[1]+1)[1:]
    day_list2 = []
    for day in day_list:
    return day_list2

def daterange(begindate, enddate):
    dates = []
    dt = datetime.datetime.strptime(begindate, "%y-%m-%d")
    date = begindate[:]
    while date <= enddate:
        dt = dt + datetime.timedelta(1)
        date = dt.strftime("%y-%m-%d")
    return dates

# for date in daterange('2017-02-01', '2017-03-14'):
#     year = date[0:4]
#     month = date[5:7]
#     day = date[8:10]
#     print year, month, day

# print getallmonthofyear()
# print getallhourofday()
# print getnowday(),getyesterday()
# print getdayofmonth(year=2017, month=10)
# print daterange(begindate='2017-09-01', enddate='2017-11-19')
# print getdaysxago('2017-12-01')


# -*- coding=utf-8 -*-
import time
import datetime
import os
import re
import warnings


def cspayactiveproc(pt_day):
    os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass -e "use funnyai_data; \
                drop table if exists xcs_pay_active; \
                create table xcs_pay_active(pt_day varchar(10),uid bigint(20),pay_active_flag int,etl_time datetime,key idx_uid (uid)) engine=innodb default charset=utf8; \
                ;" """)
    calcdata=os.popen("""source /etc/profile; \
        /usr/lib/hive-current/bin/hive -e " \
        with tab_pay_sum as( \
        select pt_day,uid,sum(amount) pay_amount \
        from data_chushou_pay_info \
        where state=0 and pt_day = '{pt_day}' \
        group by pt_day,uid) \
        select a1.pt_day,a1.uid,case when a1.pay_amount>a2.pay_amount_per then 1 else 0 end pay_active_flag \
        from tab_pay_sum a1 \
        left join (select pt_day,sum(pay_amount)/count(distinct uid) pay_amount_per \
        from tab_pay_sum \
        group by pt_day) a2 on a1.pt_day=a2.pt_day \
        where a1.pay_amount>a2.pay_amount_per;" \

    nrpd_list = []
    for nrp_list in calcdata:
        nrp = re.split('\t', nrp_list.replace('\n', ''))

    i = 0
    insert_sql_text = "insert into xcs_pay_active(pt_day,uid,pay_active_flag,etl_time) values "
    for nrpd in nrpd_list:
        pt_day = nrpd[0]
        uid = nrpd[1]
        pay_active_flag = nrpd[2]
        etl_time = time.strftime('%y-%m-%d %x', time.localtime())

        i += 1

        insert_sql_text = insert_sql_text + "( '%s','%s','%s','%s')," % (pt_day,uid,pay_active_flag,etl_time)

        if (i % 1000 == 0):
            insert_sql_text = insert_sql_text[0:-1] + ";"
            os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass -e "use funnyai_data; \
                            {insert_sql_text} \
                             " """.format(insert_sql_text=insert_sql_text))

            insert_sql_text = "insert into xcs_pay_active(pt_day,uid,pay_active_flag,etl_time) values "

    insert_sql_text = insert_sql_text[0:-1] + ";"
    os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass -e "use funnyai_data; \
                                {insert_sql_text} \
                                 " """.format(insert_sql_text=insert_sql_text))

def csviewactiveproc(pt_day):
    os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass -e "use funnyai_data; \
                drop table if exists xcs_view_active; \
                create table xcs_view_active(pt_day varchar(10),uid bigint(20),view_active_flag int,etl_time datetime,key idx_uid (uid)) engine=innodb default charset=utf8; \
                ;" """)
    calcdata=os.popen("""source /etc/profile; \
        /usr/lib/hive-current/bin/hive -e " \
        with tab_view_sum as( \
        select pt_day,uid,sum(view_time) view_time \
        from recommend_data_view \
        where pt_day = '{pt_day}' \
        group by pt_day,uid) \
        select a1.pt_day,a1.uid,case when a1.view_time>a2.view_time_per then 1 else 0 end view_active_flag \
        from tab_view_sum a1 \
        left join (select pt_day,sum(view_time)/count(distinct uid) view_time_per \
        from tab_view_sum \
        group by pt_day) a2 on a1.pt_day=a2.pt_day \
        where a1.view_time>a2.view_time_per;" \

    nrpd_list = []
    for nrp_list in calcdata:
        nrp = re.split('\t', nrp_list.replace('\n', ''))

    i = 0
    insert_sql_text = "insert into xcs_view_active(pt_day,uid,view_active_flag,etl_time) values "
    for nrpd in nrpd_list:
        pt_day = nrpd[0]
        uid = nrpd[1]
        view_active_flag = nrpd[2]
        etl_time = time.strftime('%y-%m-%d %x', time.localtime())

        i += 1

        insert_sql_text = insert_sql_text + "( '%s','%s','%s','%s')," % (pt_day,uid,view_active_flag,etl_time)

        if (i % 1000 == 0):
            insert_sql_text = insert_sql_text[0:-1] + ";"
            os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass -e "use funnyai_data; \
                            {insert_sql_text} \
                             " """.format(insert_sql_text=insert_sql_text))

            insert_sql_text = "insert into xcs_view_active(pt_day,uid,view_active_flag,etl_time) values "

    insert_sql_text = insert_sql_text[0:-1] + ";"
    os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass -e "use funnyai_data; \
                                {insert_sql_text} \
                                 " """.format(insert_sql_text=insert_sql_text))

def csmessageactiveproc(pt_day):
    os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass -e "use funnyai_data; \
                drop table if exists xcs_message_active; \
                create table xcs_message_active(pt_day varchar(10),uid bigint(20),message_active_flag int,etl_time datetime,key idx_uid (uid)) engine=innodb default charset=utf8; \
                ;" """)
    calcdata=os.popen("""source /etc/profile; \
        /usr/lib/hive-current/bin/hive -e " \
        with tab_message_sum as( \
        select pt_day,uid,count(*) message_cnt \
        from oss_chushou_message_send \
        where pt_day = '{pt_day}' \
        group by pt_day,uid) \
        select a1.pt_day,a1.uid,case when a1.message_cnt>a2.message_cnt_per then 1 else 0 end message_active_flag \
        from tab_message_sum a1 \
        left join (select pt_day,sum(message_cnt)/count(distinct uid) message_cnt_per \
        from tab_message_sum \
        group by pt_day) a2 on a1.pt_day=a2.pt_day \
        where a1.message_cnt>a2.message_cnt_per;" \

    nrpd_list = []
    for nrp_list in calcdata:
        nrp = re.split('\t', nrp_list.replace('\n', ''))

    i = 0
    insert_sql_text = "insert into xcs_message_active(pt_day,uid,message_active_flag,etl_time) values "
    for nrpd in nrpd_list:
        pt_day = nrpd[0]
        uid = nrpd[1]
        message_active_flag = nrpd[2]
        etl_time = time.strftime('%y-%m-%d %x', time.localtime())

        i += 1

        insert_sql_text = insert_sql_text + "( '%s','%s','%s','%s')," % (pt_day, uid, message_active_flag, etl_time)

        if (i % 1000 == 0):
            insert_sql_text = insert_sql_text[0:-1] + ";"
            os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass -e "use funnyai_data; \
                        {insert_sql_text} \
                         " """.format(insert_sql_text=insert_sql_text))

            insert_sql_text = "insert into xcs_message_active(pt_day,uid,message_active_flag,etl_time) values "

    insert_sql_text = insert_sql_text[0:-1] + ";"
    os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass -e "use funnyai_data; \
                            {insert_sql_text} \
                             " """.format(insert_sql_text=insert_sql_text))

def csgiftactiveproc(pt_day):
    os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass -e "use funnyai_data; \
                drop table if exists xcs_gift_active; \
                create table xcs_gift_active(pt_day varchar(10),room_creator_uid bigint(20),gift_active_flag int,etl_time datetime,key idx_uid (room_creator_uid)) engine=innodb default charset=utf8; \
                ;" """)
    calcdata=os.popen("""source /etc/profile; \
        /usr/lib/hive-current/bin/hive -e " \
        with tab_gift_sum as( \
        select pt_day,room_creator_uid,sum(gift_point) gift_point \
        from honeycomb_all_gift_record \
        where pt_day = '{pt_day}' \
        group by pt_day,room_creator_uid) \
        select a1.pt_day,a1.room_creator_uid,case when a1.gift_point>a2.gift_point_per then 1 else 0 end gift_active_flag \
        from tab_gift_sum a1 \
        left join (select pt_day,sum(gift_point)/count(distinct room_creator_uid) gift_point_per \
        from tab_gift_sum \
        group by pt_day) a2 on a1.pt_day=a2.pt_day \
        where a1.gift_point>a2.gift_point_per;" \

    nrpd_list = []
    for nrp_list in calcdata:
        nrp = re.split('\t', nrp_list.replace('\n', ''))

    i = 0
    insert_sql_text = "insert into xcs_gift_active(pt_day,room_creator_uid,gift_active_flag,etl_time) values "
    for nrpd in nrpd_list:
        pt_day = nrpd[0]
        room_creator_uid = nrpd[1]
        gift_active_flag = nrpd[2]
        etl_time = time.strftime('%y-%m-%d %x', time.localtime())

        i += 1

        insert_sql_text = insert_sql_text + "( '%s','%s','%s','%s')," % (pt_day,room_creator_uid,gift_active_flag,etl_time)

        if (i % 1000 == 0):
            insert_sql_text = insert_sql_text[0:-1] + ";"
            os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass -e "use funnyai_data; \
                        {insert_sql_text} \
                         " """.format(insert_sql_text=insert_sql_text))

            insert_sql_text = "insert into xcs_gift_active(pt_day,room_creator_uid,gift_active_flag,etl_time) values "

    insert_sql_text = insert_sql_text[0:-1] + ";"
    os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass -e "use funnyai_data; \
                            {insert_sql_text} \
                             " """.format(insert_sql_text=insert_sql_text))

def csliveactiveproc(pt_day):
    os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass -e "use funnyai_data; \
                drop table if exists xcs_live_active; \
                create table xcs_live_active(pt_day varchar(10),room_creator_uid bigint(20),live_active_flag int,etl_time datetime,key idx_uid (room_creator_uid)) engine=innodb default charset=utf8; \
                ;" """)
    calcdata=os.popen("""source /etc/profile; \
        /usr/lib/hive-current/bin/hive -e " \
        with tab_live_sum as( \
        select a1.pt_day,a2.creator_uid,a1.anthor_live_time \
        from (select a1.pt_day,a1.roomid,sum(live_minute) anthor_live_time \
        from (select pt_day,room_id roomid,hour(time_interval)*60+minute(time_interval)+second(time_interval)/60 live_minute from ( \
        select pt_day,room_id,cast(updated_time as timestamp)-cast(switch_time as timestamp) time_interval \
        from honeycomb_all_live_history_status \
        where pt_day = '{pt_day}') x) a1 \
        group by a1.pt_day,a1.roomid) a1 \
        left join oss_room_v2 a2 on a1.roomid=a2.id \
        where a2.pt_day='{yesterday}') \
        select a1.pt_day,a1.creator_uid,case when a1.anthor_live_time>a2.anthor_live_time_per then 1 else 0 end live_active_flag \
        from tab_live_sum a1 \
        left join (select pt_day,sum(anthor_live_time)/count(distinct creator_uid) anthor_live_time_per \
        from tab_live_sum \
        group by pt_day) a2 on a1.pt_day=a2.pt_day \
        where a1.anthor_live_time>a2.anthor_live_time_per;" \
        """.format(pt_day=pt_day, yesterday=(datetime.date.today() - datetime.timedelta(days=1)).strftime('%y-%m-%d'))).readlines();

    nrpd_list = []
    for nrp_list in calcdata:
        nrp = re.split('\t', nrp_list.replace('\n', ''))

    i = 0
    insert_sql_text = "insert into xcs_live_active(pt_day,room_creator_uid,live_active_flag,etl_time) values "
    for nrpd in nrpd_list:
        pt_day = nrpd[0]
        room_creator_uid = nrpd[1]
        live_active_flag = nrpd[2]
        etl_time = time.strftime('%y-%m-%d %x', time.localtime())

        i += 1

        insert_sql_text = insert_sql_text + "( '%s','%s','%s','%s')," % (pt_day,room_creator_uid,live_active_flag,etl_time)

        if (i % 1000 == 0):
            insert_sql_text = insert_sql_text[0:-1] + ";"
            os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass -e "use funnyai_data; \
                        {insert_sql_text} \
                         " """.format(insert_sql_text=insert_sql_text))

            insert_sql_text = "insert into xcs_live_active(pt_day,room_creator_uid,live_active_flag,etl_time) values "

    insert_sql_text = insert_sql_text[0:-1] + ";"
    os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass -e "use funnyai_data; \
                            {insert_sql_text} \
                             " """.format(insert_sql_text=insert_sql_text))

def csusersilentproc(pt_day):
    os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass -e "use funnyai_data; \
                drop table if exists xcs_user_silent; \
                create table xcs_user_silent(pt_day varchar(10),uid bigint(20),user_silent_flag int,etl_time datetime,key idx_uid (uid)) engine=innodb default charset=utf8; \
                ;" """)
    calcdata=os.popen("""source /etc/profile; \
        /usr/bin/mysql -hmysqlhost -p50506 -umysqluser -pmysqlpass -n -e "use jellyfish_server; \
        select substr(created_time,1,10) pt_day,uid,1 user_silent_flag \
        from silent_user \
        where substr(created_time,1,10)='{pt_day}' \
        group by substr(created_time,1,10),uid;" \

    nrpd_list = []
    for nrp_list in calcdata:
        nrp = re.split('\t', nrp_list.replace('\n', ''))

    i = 0
    insert_sql_text = "insert into xcs_user_silent(pt_day,uid,user_silent_flag,etl_time) values "
    for nrpd in nrpd_list:
        pt_day = nrpd[0]
        uid = nrpd[1]
        user_silent_flag = nrpd[2]
        etl_time = time.strftime('%y-%m-%d %x', time.localtime())

        i += 1

        insert_sql_text = insert_sql_text + "( '%s','%s','%s','%s')," % (pt_day,uid,user_silent_flag,etl_time)

        if (i % 1000 == 0):
            insert_sql_text = insert_sql_text[0:-1] + ";"
            os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass -e "use funnyai_data; \
                        {insert_sql_text} \
                         " """.format(insert_sql_text=insert_sql_text))

            insert_sql_text = "insert into xcs_user_silent(pt_day,uid,user_silent_flag,etl_time) values "

    insert_sql_text = insert_sql_text[0:-1] + ";"
    os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass -e "use funnyai_data; \
                            {insert_sql_text} \
                             " """.format(insert_sql_text=insert_sql_text))

def csuppermanageproc(pt_day):
    os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass -e "use funnyai_data; \
                drop table if exists xcs_upper_manage; \
                create table xcs_upper_manage(pt_day varchar(10),uid bigint(20),room_id bigint(20),etl_time datetime,key idx_uid (uid)) engine=innodb default charset=utf8; \
                ;" """)
    calcdata=os.popen("""source /etc/profile; \
        /usr/bin/mysql -hmysqlhost -p50506 -umysqluser -pmysqlpass -n -e "use jellyfish_server; \
        select substr(created_time,1,10) pt_day,uid,room_id \
        from room_manager \
        where substr(created_time,1,10)='{pt_day}' \
        group by substr(created_time,1,10),uid,room_id;" \

    nrpd_list = []
    for nrp_list in calcdata:
        nrp = re.split('\t', nrp_list.replace('\n', ''))

    i = 0
    insert_sql_text = "insert into xcs_upper_manage(pt_day,uid,room_id,etl_time) values "
    for nrpd in nrpd_list:
        pt_day = nrpd[0]
        uid = nrpd[1]
        room_id = nrpd[2]
        etl_time = time.strftime('%y-%m-%d %x', time.localtime())

        i += 1

        insert_sql_text = insert_sql_text + "( '%s','%s','%s','%s')," % (pt_day,uid,room_id,etl_time)

        if (i % 1000 == 0):
            insert_sql_text = insert_sql_text[0:-1] + ";"
            os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass -e "use funnyai_data; \
                        {insert_sql_text} \
                         " """.format(insert_sql_text=insert_sql_text))

            insert_sql_text = "insert into xcs_upper_manage(pt_day,uid,room_id,etl_time) values "

    insert_sql_text = insert_sql_text[0:-1] + ";"
    os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass -e "use funnyai_data; \
                            {insert_sql_text} \
                             " """.format(insert_sql_text=insert_sql_text))

# pt_day = '2017-09-19'
# cspayactiveproc(pt_day)
# csviewactiveproc(pt_day)
# csmessageactiveproc(pt_day)
# csgiftactiveproc(pt_day)
# csliveactiveproc(pt_day)
# csusersilentproc(pt_day)
# csuppermanageproc(pt_day)



# -*- coding=utf-8 -*-
import os

from datecalc import *


def getparameterinfo(date_start, date_end):
    parameterlist = []
    accesskeyid = "accesskeyid-string"
    accesskeysecret = "accesskeysecret-string"
    bucket = "tv-hz"
    endpoint = "oss-cn-hangzhou-internal.aliyuncs.com"
    server_list = ["sz-121-210", "sz-129-97", "sz-135-45", "sz-76-100", "sz-147-189", "sz-152-217", "sz-153-240", "sz-155-163", "sz-155-212", "sz-155-217", "sz-158-99", "sz-162-5", "sz-168-212", "sz-18-122", "sz-191-14", "sz-192-253", "sz-199-103", "sz-201-3", "sz-21-92", "sz-73-130", "sz-212-123", "sz-214-62", "sz-218-105", "sz-218-165", "sz-233-217", "sz-237-167", "sz-240-45", "sz-25-63", "sz-26-159", "sz-27-218", "sz-27-65", "sz-27-91", "sz-42-99", "sz-54-148", "sz-58-102", "sz-62-113", "sz-97-233", "sz-64-227", "sz-65-64", "sz-68-199", "sz-130-97", "sz-78-112", "sz-135-198", "sz-79-58", "sz-98-72", "sz-159-238", "sz-130-89", "sz-129-70", "sz-131-90"]
    for server_name in server_list:
        for date in daterange(date_start, date_end):
            year = date[0:4]
            month = date[5:7]
            day = date[8:10]
            parameterlist.append((accesskeyid, accesskeysecret, bucket, endpoint, server_name, year, month, day))

    return parameterlist

def getparameterinfo2(date_start, date_end):
    parameterlist = []
    accesskeyid = "accesskeyid-string"
    accesskeysecret = "accesskeysecret-string"
    bucket = "tv-hz"
    endpoint = "oss-cn-hangzhou-internal.aliyuncs.com"
    server_list = ["sz-217-183"]
    for server_name in server_list:
        for date in daterange(date_start, date_end):
            year = date[0:4]
            month = date[5:7]
            day = date[8:10]
            parameterlist.append((accesskeyid, accesskeysecret, bucket, endpoint, server_name, year, month, day))

    return parameterlist

def getossfile2hdfs(accesskeyid, accesskeysecret, bucket, endpoint, server_name, year, month, day):

    date = year+"-"+month+"-"+day
    ossfilename = "jellyfishlogs/"+server_name+"/"+year+"/"+month+"/"+day+"/jellyfish-server/user_credit_operator.log_"+date
    # osspath = "https://tv-hz.oss-cn-hangzhou-internal.aliyuncs.com"+ossfilename
    # print osspath

    hdfsfilepath = "/tmp/nisj/creditscore/"+date+"/"
    hdfsfilename = hdfsfilepath+server_name+"#user_credit_operator_server."+date+".txt"

    os.system("""source /etc/profile; \
                hadoop distcp oss://%s:%s@%s.%s/%s hdfs:%s""" % (accesskeyid, accesskeysecret, bucket, endpoint, ossfilename, hdfsfilename))

def getossfile2hdfs2(accesskeyid, accesskeysecret, bucket, endpoint, server_name, year, month, day):

    date = year+"-"+month+"-"+day
    ossfilename = "jellyfishconsolelogs/"+server_name+"/"+year+"/"+month+"/"+day+"/user_credit_operator.log_"+date
    # osspath = "https://tv-hz.oss-cn-hangzhou-internal.aliyuncs.com"+ossfilename
    # print osspath

    hdfsfilepath = "/tmp/nisj/creditscore/"+date+"/"
    hdfsfilename = hdfsfilepath+server_name+"#user_credit_operator_console."+date+".txt"

    os.system("""source /etc/profile; \
                hadoop distcp oss://%s:%s@%s.%s/%s hdfs:%s""" % (accesskeyid, accesskeysecret, bucket, endpoint, ossfilename, hdfsfilename))

def downloadfilefromhdfs(year, month, day):
    date = year+"-"+month+"-"+day
    for dir_path in os.popen("""source /etc/profile; \
                hadoop dfs -ls /tmp/nisj/creditscore/%s/ | awk  -f ' '  '{print $8}'""" % (date)).readlines():
        dir_path = dir_path.strip()
        if len(dir_path) != 0:
            os.system("""source /etc/profile; \
                hadoop dfs -copytolocal {dir_path} /home/hadoop/nisj/automationdemand/creditscore/tmplogdir/""".format(dir_path=dir_path))

def logload2mysql(year, month, day):
    date = year+"-"+month+"-"+day
    os.system("""source /etc/profile; \
                awk  -f ' info : ' '{print $2}' /home/hadoop/nisj/automationdemand/creditscore/tmplogdir/*user_credit_operator*.%s.txt > /home/hadoop/nisj/automationdemand/creditscore/tmplogdir/log_record_%s.txt """ % (date, date))
    os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass -e "use funnyai_data; \
                delete from xcs_log_record where pt_day='{date}'; \
                load data local infile '/home/hadoop/nisj/automationdemand/creditscore/tmplogdir/log_record_{date}.txt' ignore into table xcs_log_record character set utf8 fields terminated by '|' enclosed by '' lines terminated by '\n' (uid,type,subtype,expiredtype,reason); \
                update xcs_log_record set pt_day='{date}' where pt_day is null;" """.format(date=date))

def logfromoss2local(runday):
    date_start = runday
    date_end = runday
    os.system("""source /etc/profile; \
                rm -rf /home/hadoop/nisj/automationdemand/creditscore/tmplogdir/*""")
    for parameter in getparameterinfo(date_start, date_end):
        accesskeyid = parameter[0]
        accesskeysecret = parameter[1]
        bucket = parameter[2]
        endpoint = parameter[3]
        server_name = parameter[4]
        year = parameter[5]
        month = parameter[6]
        day = parameter[7]

        getossfile2hdfs(accesskeyid, accesskeysecret, bucket, endpoint, server_name, year, month, day)

    for parameter in getparameterinfo2(date_start, date_end):
        accesskeyid = parameter[0]
        accesskeysecret = parameter[1]
        bucket = parameter[2]
        endpoint = parameter[3]
        server_name = parameter[4]
        year = parameter[5]
        month = parameter[6]
        day = parameter[7]

        getossfile2hdfs2(accesskeyid, accesskeysecret, bucket, endpoint, server_name, year, month, day)

    for date in daterange(date_start, date_end):
        year = date[0:4]
        month = date[5:7]
        day = date[8:10]
        downloadfilefromhdfs(year, month, day)
        logload2mysql(year, month, day)

# test code
# runday = '2017-09-19'
# logfromoss2local(runday)


# -*- coding=utf-8 -*-
import time
import os
import re
from datecalc import *


def csuserinfoproc(pt_day):
    # os.system("""source /etc/profile; \
    #             /usr/lib/hive-current/bin/hive -e " \
    #             drop table if exists xcs_user_info; \
    #             create table xcs_user_info as \
    #             select uid,nickname,last_login_time \
    #             from oss_chushou_user_profile \
    #             where pt_day='{yesterday}' and state=0 and last_login_time between '{daysxago}' and '{pt_day}';" \
    #             """.format(pt_day=pt_day, yesterday=yesterday, daysxago=daysxago))

    os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass --default-character-set=utf8 -e "use funnyai_data; \
                drop table if exists xcs_user_info; \
                create table xcs_user_info ( \
                  uid bigint(20), \
                  nickname varchar(2000), \
                  last_login_time varchar(19), \
                  etl_time datetime, \
                  key idx_uid (uid) \
                ) engine=innodb default charset=utf8; \
                " """)
    calcdata = os.popen("""source /etc/profile; \
            /usr/lib/hive-current/bin/hive -e " \
            select uid,nickname,last_login_time \
            from oss_chushou_user_profile \
            where pt_day='{yesterday}' and state=0 and last_login_time between '{daysxago}' and '{pt_day}';" \
            """.format(pt_day=pt_day, yesterday=yesterday, daysxago=daysxago)).readlines();

    nrpd_list = []
    for nrp_list in calcdata:
        nrp = re.split('\t', nrp_list.replace('\n', ''))

    i = 0
    insert_sql_text = "insert into xcs_user_info(uid,nickname,last_login_time,etl_time) values "
    for nrpd in nrpd_list:
        uid = nrpd[0]
        nickname = str(nrpd[1]).replace("""`""", """""").replace("""\'""", """""").replace("""\"""", """""").replace("""\\""", """""")
        last_login_time = nrpd[2]
        etl_time = time.strftime('%y-%m-%d %x', time.localtime())

        i += 1

        insert_sql_text = insert_sql_text + "( '%s','%s','%s','%s')," % (uid,nickname,last_login_time,etl_time)

        if (i % 1000 == 0):
            insert_sql_text = insert_sql_text[0:-1] + ";"
            os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass --default-character-set=utf8 -e "use funnyai_data; \
                                {insert_sql_text} \
                                 " """.format(insert_sql_text=insert_sql_text))

            insert_sql_text = "insert into xcs_user_info(uid,nickname,last_login_time,etl_time) values "

    insert_sql_text = insert_sql_text[0:-1] + ";"
    os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass --default-character-set=utf8 -e "use funnyai_data; \
                                    {insert_sql_text} \
                                     " """.format(insert_sql_text=insert_sql_text))

def csroompartinfo(pt_day):
    os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass --default-character-set=utf8 -e "use funnyai_data; \
                    drop table if exists xcs_room_part_info; \
                    create table xcs_room_part_info ( \
                      room_id bigint(20), \
                      subcriber_count bigint(20), \
                      etl_time datetime, \
                      key idx_uid (room_id) \
                    ) engine=innodb default charset=utf8; \
                    ;" """)

    roompartdata = os.popen("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass --default-character-set=utf8 -n -e "use funnyai_data; \
                    select room_id from xcs_upper_manage where pt_day='{pt_day}' group by room_id;" """.format(pt_day=pt_day)).readlines();
    roomidcon = ''
    for roomid in roompartdata:
        roomidcon = roomid.replace('\n', '') + ',' +roomidcon
    roomidcon = roomidcon[:-1]

    calcdata = os.popen("""source /etc/profile; \
                /usr/lib/hive-current/bin/hive -e " \
                select id,subcriber_count \
                from oss_room_v2 \
                where pt_day='{yesterday}' and id in({roomidcon}) \
                ;" \
                """.format(yesterday=yesterday, roomidcon=roomidcon)).readlines();

    nrpd_list = []
    for nrp_list in calcdata:
        nrp = re.split('\t', nrp_list.replace('\n', ''))

    i = 0
    insert_sql_text = "insert into xcs_room_part_info(room_id,subcriber_count,etl_time) values "
    for nrpd in nrpd_list:
        room_id = nrpd[0]
        subcriber_count = nrpd[1]
        etl_time = time.strftime('%y-%m-%d %x', time.localtime())

        i += 1

        insert_sql_text = insert_sql_text + "( '%s','%s','%s')," % (room_id,subcriber_count,etl_time)

        if (i % 1000 == 0):
            insert_sql_text = insert_sql_text[0:-1] + ";"
            os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass --default-character-set=utf8 -e "use funnyai_data; \
                                    {insert_sql_text} \
                                     " """.format(insert_sql_text=insert_sql_text))

            insert_sql_text = "insert into xcs_room_part_info(room_id,subcriber_count,etl_time) values "

    insert_sql_text = insert_sql_text[0:-1] + ";"
    os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass --default-character-set=utf8 -e "use funnyai_data; \
                                        {insert_sql_text} \
                                         " """.format(insert_sql_text=insert_sql_text))

# pt_day = '2017-09-19'
# csuserinfoproc(pt_day)
# csroompartinfo(pt_day)

# -*- coding=utf-8 -*-
import os
from datecalc import *


def csresulttabcreate():
    os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass --default-character-set=utf8 -e "use funnyai_data; \
                drop table if exists xcs_user_credit_score; \
                create table xcs_user_credit_score ( \
                  pt_day varchar(10), \
                  uid bigint(20) default null, \
                  nickname varchar(2000) default null, \
                  last_login_time varchar(19) default null, \
                  seqing_score int(10) not null default '0', \
                  jubao_score int(10) default null, \
                  chengpaopao_score int(10) not null default '0', \
                  weifan_score int(10) default null, \
                  pay_active_score int(10) not null default '0', \
                  view_active_score int(10) not null default '0', \
                  message_active_score int(10) not null default '0', \
                  gift_active_score int(10) not null default '0', \
                  live_active_score int(10) not null default '0', \
                  upper_manage_score decimal(48,4) default null, \
                  user_silent_score int(10) not null default '0', \
                  user_currday_increment_score decimal(48,4) default null, \
                  user_currday_score decimal(48,4) default '0', \
                  etl_time datetime default current_timestamp, \
                  key idx_pt_day_uid (pt_day,uid) \
                ) engine=innodb default charset=utf8;" """)

def csresultdatainsertandupdate(pt_day):
    os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass --default-character-set=utf8 -e "use funnyai_data; \
                delete from xcs_user_credit_score where pt_day='{pt_day}';
                insert into xcs_user_credit_score(pt_day,uid,nickname,last_login_time,seqing_score,jubao_score,chengpaopao_score,weifan_score,pay_active_score,view_active_score,message_active_score,gift_active_score,live_active_score,upper_manage_score,user_silent_score,user_currday_increment_score) \
                select '{pt_day}' pt_day,uid,nickname,last_login_time,seqing_score,jubao_score,chengpaopao_score,weifan_score,pay_active_score,view_active_score,message_active_score,gift_active_score,live_active_score,upper_manage_score,user_silent_score, \
                seqing_score+jubao_score+chengpaopao_score+weifan_score+pay_active_score+view_active_score+message_active_score+gift_active_score+live_active_score+upper_manage_score+user_silent_score user_currday_increment_score \
                from (select a1.uid,a1.nickname,a1.last_login_time, \
                case when a2.uid is not null then -10000 else 0 end seqing_score, \
                case when a3.uid is not null then -50*a3.jubao_cnt else 0 end jubao_score, \
                case when a4.uid is not null then -2000 else 0 end chengpaopao_score, \
                case when a5.uid is not null then a5.weifan_score else 0 end weifan_score, \
                case when a6.uid is not null then 50 else 0 end pay_active_score, \
                case when a7.uid is not null then 20 else 0 end view_active_score, \
                case when a8.uid is not null then 10 else 0 end message_active_score, \
                case when a9.room_creator_uid is not null then 30 else 0 end gift_active_score, \
                case when a10.room_creator_uid is not null then 20 else 0 end live_active_score, \
                case when a11.uid is not null then a11.upper_manage_score else 0 end upper_manage_score, \
                case when a12.uid is not null then -300 else 0 end user_silent_score \
                from xcs_user_info a1 \
                left join (select uid from xcs_log_record where pt_day='{pt_day}' and type=1 group by uid) a2 on a1.uid=a2.uid \
                left join (select uid,count(*) jubao_cnt from xcs_log_record where pt_day='{pt_day}' and type=2 group by uid) a3 on a1.uid=a3.uid \
                left join (select uid from xcs_log_record where pt_day='{pt_day}' and type=3 group by uid) a4 on a1.uid=a4.uid \
                left join (select uid,sum(case when subtype in(1,2) then -50 when subtype in(5,6) then -500 when subtype=0 then -2000 else 0 end) weifan_score from xcs_log_record where pt_day='{pt_day}' and type=4 group by uid) a5 on a1.uid=a5.uid \
                left join xcs_pay_active a6 on a1.uid=a6.uid \
                left join xcs_view_active a7 on a1.uid=a7.uid \
                left join xcs_message_active a8 on a1.uid=a8.uid \
                left join xcs_gift_active a9 on a1.uid=a9.room_creator_uid \
                left join xcs_live_active a10 on a1.uid=a10.room_creator_uid \
                left join (select a11_1.uid,sum(a11_2.subcriber_count/1000) upper_manage_score \
                             from xcs_upper_manage a11_1 \
                             left join xcs_room_part_info a11_2 on a11_1.room_id=a11_2.room_id \
                            group by a11_1.uid) a11 on a1.uid=a11.uid \
                left join xcs_user_silent a12 on a1.uid=a12.uid) x \
                ;" """.format(pt_day=pt_day))

    os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass --default-character-set=utf8 -e "use funnyai_data; \
                    update xcs_user_credit_score a1,xcs_user_credit_score a2 \
                       set a1.user_currday_score=ifnull(a1.user_currday_increment_score+a2.user_currday_score,0) \
                     where a1.pt_day='{pt_day}' and a2.pt_day='{days1ago}' \
                       and a1.uid=a2.uid;" """.format(pt_day=pt_day, days1ago=days1ago))

    os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass --default-character-set=utf8 -e "use funnyai_data; \
                update xcs_user_credit_score a1,xcs_user_credit_score a2 \
                   set a1.user_currday_score=ifnull((case when a2.user_currday_score-10>0 then a2.user_currday_score-10 else 0 end),0) \
                 where a1.pt_day='{pt_day}' and a2.pt_day='{days1ago}' \
                   and a1.uid=a2.uid \
                   and (a1.seqing_score=0 and a1.user_silent_score=0 and a1.jubao_score=0 and a1.chengpaopao_score=0 and a1.weifan_score=0 and a1.upper_manage_score=0 and a1.pay_active_score=0 and a1.view_active_score=0 and a1.message_active_score=0 and a1.gift_active_score=0 and a1.live_active_score=0) \
                   and a2.user_currday_score>0 \
                ;" """.format(pt_day=pt_day, days1ago=days1ago))

    os.system("""source /etc/profile; \
                /usr/bin/mysql -hmysqlhost -p6603 -umysqluser -pmysqlpass --default-character-set=utf8 -e "use funnyai_data; \
                update xcs_user_credit_score a1,xcs_user_credit_score a2 \
                   set a1.user_currday_score=ifnull((case when a1.user_currday_score+10<0 then a1.user_currday_score+10 else 0 end),0) \
                 where a1.pt_day='{pt_day}' and a2.pt_day='{days1ago}' \
                   and a1.uid=a2.uid \
                   and (a1.seqing_score=0 and a1.user_silent_score=0 and a1.jubao_score=0 and a1.chengpaopao_score=0 and a1.weifan_score=0) \
                   and a1.user_currday_score<0 \
                ;" """.format(pt_day=pt_day, days1ago=days1ago))

# pt_day = '2017-09-19'
# csresultdatainsertandupdate(pt_day)

# -*- coding=utf-8 -*-
from tabbasicdataproc import *
from logbasicdataproc import *
from userbasicdataproc import *
from resultdataproc import *


def creditscore_ctl(pt_day):
    # 表数据装载

    # 打点日志数据装载

    # 用户及房间基础信息装载

    # 最终数据的计算与更新
    # csresulttabcreate()

# batch code
pt_day = getyesterday()

# pt_day_list=['2017-09-21', '2017-09-22', '2017-09-23']
# for pt_day in pt_day_list:
#     print pt_day
#     creditscore_ctl(pt_day)

[hadoop@emr-worker-9 creditscore]$ crontab -l
10 13 * * * python /home/hadoop/nisj/automationdemand/creditscore/creditscore_ctl.py >> /home/hadoop/nisj/automationdemand/creditscore/creditscore_ctl.log 2>&1

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

