用Python实现邮件发送Hive明细数据,python邮件发送hive,代码地址如下:htt


代码地址如下:
http://www.demodashi.com/demo/12673.html

一、需求描述

客户需要每周周一接收特定的活动数据,生成Excel或是CSV文件,并通过邮件发送给指定接收者。需求初步分析得出如下结论:

1.客户所要的数据并不太复杂,无须通过特殊处理,可以简单的认为就是SQL查询结果输出2.查询结果输出CSV文件,及邮件发送技术相对成熟,通用性强3.Linux系统的Crond服务支持定时任务

二、系统环境要求

Linux CentOS6.xHadoop2.xPython2.7

三、Python依赖库

PyHiveppytools2thriftthrift-saslsasl

四、工作流程

五、代码实现

项目结构图如下:

关键实现步骤如下:

1.全局配置settings.py

# -*- coding: utf-8 -*-# __author__ = ‘[email protected]‘from ppytools.cfgreader import ConfReaderimport logging‘‘‘日志输出格式‘‘‘logging.basicConfig(level=logging.INFO,                    encoding=‘UTF-8‘, format=‘%(asctime)s [%(levelname)s] {%(name)-10s} - %(message)s‘)class ProjectConfig(object):    """ProjectConfig    """    def __init__(self, *conf_paths):        self.cr = ConfReader(*conf_paths)    def getHiveConf(self):        return self.cr.getValues(‘HiveServer‘)    def getEmailServer(self):        return self.cr.getValues(‘EmailServer‘)    def getJobInfo(self):        return self.cr.getValues(‘JobInfo‘)    def getCSVFolder(self):        return self.cr.getValues(‘CSVFolder‘)[‘folder‘]    def getCSVFile(self):        return self.cr.getValues(‘CSVFile‘)    def getCSVHead(self):        return self.cr.getValues(‘CSVHead‘)    def getHQLScript(self):        return self.cr.getValues(‘HQLScript‘)    def getEmailInfo(self):        return self.cr.getValues(‘EmailInfo‘)

2.核心代码 main.py

# -*- coding: utf-8 -*-# __author__ = ‘[email protected]‘from hiveemailjob.settings import ProjectConfigfrom ppytools.csvhelper import writefrom ppytools.emailclient import EmailClientfrom ppytools.hiveclient import HiveClientfrom ppytools.lang.timerhelper import timeMeterimport datetimeimport loggingimport sysimport timelogger = logging.getLogger(__name__)def build_rk(ts):    """    Build HBase row key value    :param ts: date time    :return: row key    """    return hex(int(time.mktime(ts.timetuple())*1000))[2:]def email_att(folder, name):    return ‘{}/{}_{}.csv‘.format(folder, name, datetime.datetime.now().strftime(‘%Y%m%d%H%M%S‘))@timeMeter()def run(args):    """    Email Job program execute entrance    :param args:    1. job file file path    2. start time, format: 2018-01-30 17:09:38 (not require)    3. stop time (not require)    :return: Empty    """    ‘‘‘    Read system args start    ‘‘‘    args_len = len(args)    if args_len is not 2 and args_len is not 4:        logger.error(‘Enter args is error. Please check!!!‘)        logger.error(‘1: job file path.‘)        logger.error(‘2: start time, format: 2018-01-30 17:09:38(option)‘)        logger.error(‘3: stop time(option)‘)        sys.exit(1)    elif args == 4:        try:            start_time = datetime.datetime.strptime(args[2], ‘%Y-%m-%d %H:%M:%S‘)            stop_time = datetime.datetime.strptime(args[3], ‘%Y-%m-%d %H:%M:%S‘)        except Exception, e:            raise RuntimeError(‘Parse start or stop time failed!!!\n‘, e)    else:        stop_time = datetime.date.today()        start_time = stop_time - datetime.timedelta(days=1)    job_file = args[1]    start_rk = build_rk(start_time)    stop_rk = build_rk(stop_time)    ‘‘‘System settings files (hard code)    ‘‘‘    hive_conf = ‘/etc/pythoncfg/hive.ini‘    email_conf = ‘/etc/pythoncfg/email.ini‘    sets = ProjectConfig(hive_conf, email_conf, job_file)    job_info = sets.getJobInfo()    csv_folder = sets.getCSVFolder()    logger.info(‘Now running %s Email Job...‘, job_info[‘title‘])    logger.info(‘Start time: %s‘, start_time)    logger.info(‘Stop time: %s‘, stop_time)    hc = HiveClient(**sets.getHiveConf())    csv_file = sets.getCSVFile().items()    csv_file.sort()    file_list = []    logger.info(‘File name list: ‘)    for (k, v) in csv_file:        logging.info(‘%s: %s‘, k, v)        file_list.append(v)    csv_head = sets.getCSVHead().items()    csv_head.sort()    head_list = []    logger.info(‘CSV file head list: ‘)    for (k, v) in csv_head:        logging.info(‘%s: %s‘, k, v)        head_list.append(v)    hql_scripts = sets.getHQLScript().items()    hql_scripts.sort()    email_atts = []    index = 0    for (k, hql) in hql_scripts:        logging.info(‘%s: %s‘, k, hql)        ‘‘‘Please instance of your logic in here.        ‘‘‘        result, size = hc.execQuery(hql.format(start_rk, stop_rk))        if size is 0:            logging.info(‘The above HQL script not found any data!!!‘)        else:            csv_file = email_att(csv_folder, file_list[index])            email_atts.append(csv_file)            write(csv_file, head_list[index].split(‘,‘), result)        index += 1    ‘‘‘Flush Hive Server connected.    ‘‘‘    hc.closeConn()    email_sub = sets.getEmailInfo()[‘subject‘] % start_time    email_body = sets.getEmailInfo()[‘body‘]    email_to = sets.getEmailInfo()[‘to‘].split(‘;‘)    email_cc = sets.getEmailInfo()[‘cc‘].split(‘;‘)    if len(email_atts) == 0:        email_body = ‘抱歉当前未找到任何数据。\n\n‘ + email_body    ec = EmailClient(**sets.getEmailServer())    ec.send(email_to, email_cc, email_sub, email_body, email_atts, False)    ec.quit()    logger.info(‘Finished %s Email Job.‘, job_info[‘title‘])

3.系统配置文件hive.ini与email.ini

# /etc/pythoncfg/hive.ini[HiveServer]host=127.0.0.1port=10000user=hivedb=default# /etc/pythoncfg/email.ini[EmailServer]server=mail.163.comport=25[email protected]passwd=xxxxxxmode=TSL

注意: 需要在指定的目录/etc/pythoncfg/下配置上述两个文件。

4.邮件Job配置参考emailjob.ini

[JobInfo]title=邮件报表任务测试[CSVFolder]folder=/opt/csv_files/# Please notice that CSVFile,CSVHead,HQLScript must be same length.# And suggest that use prefix+number to flag and write.[CSVFile]file1=省份分组统计file2=城市分组统计[CSVHead]head1=省份,累计head2=省份,城市,累计[HQLScript]script1=select cn_state,count(1) m from ext_act_ja1script2=select cn_state,cn_city,count(1) m from ext_act_ja2[EmailInfo][email protected];[email protected];# %s it will replace as the start date.subject=%s区域抽奖统计[测试]body=此邮件由系统自动发送,请勿回复,谢谢!

注意: CSVFile,CSVHead,HQLScript的数量要保持一致,也包括顺序,建议使用前缀+数字的格式命名。

5.Bin文件hive-emailjob.py

#! /usr/bin/env python# -*- coding: utf-8 -*-# __author__ = ‘[email protected]‘from hiveemailjob import mainimport sysif __name__ == ‘__main__‘:    main.run(sys.argv)

6.执行效果

在系统终端中敲入python -u bin/hive_email_job.py即可,输出如下:

2018-02-20 16:28:21,561 [INFO] {__main__  } - Now running 邮件报表任务测试 Email Job...2018-02-20 16:28:21,561 [INFO] {__main__  } - Start time: 2018-02-222018-02-20 16:28:21,562 [INFO] {__main__  } - Stop time: 2018-02-202018-02-20 16:28:21,691 [INFO] {pyhive.hive} - USE `default`2018-02-20 16:28:21,731 [INFO] {ppytools.hive_client} - Hive server connect is ready. Transport open: True2018-02-20 16:28:31,957 [INFO] {ppytools.email_client} - Email SMTP server connect ready.2018-02-20 16:28:31,957 [INFO] {root      } - File name list:2018-02-20 16:28:31,957 [INFO] {root      } - file1: 省份分组统计2018-02-20 16:28:31,957 [INFO] {root      } - file2: 城市分组统计2018-02-20 16:28:31,957 [INFO] {root      } - CSV file head list:2018-02-20 16:28:31,957 [INFO] {root      } - head1: 省份,累计2018-02-20 16:28:31,957 [INFO] {root      } - head2: 省份,城市,累计2018-02-20 16:28:31,957 [INFO] {root      } - script1: select cn_state,count(1) m from ext_act_ja22018-02-20 16:28:31,958 [INFO] {pyhive.hive} - select cn_state,count(1) m from ext_act_ja22018-02-20 16:29:04,258 [INFO] {ppytools.hive_client} - Hive client query completed. Records found: 312018-02-20 16:29:04,259 [INFO] {ppytools.lang.timer_helper} - Execute <ppytools.hive_client.execQuery> method cost 32.3012499809 seconds.2018-02-20 16:29:04,261 [INFO] {ppytools.csv_helper} - Write a CSV file successful. --> /opt/csv_files/省份分组统计_20180223162904.csv2018-02-20 16:29:04,262 [INFO] {ppytools.lang.timer_helper} - Execute <ppytools.csv_helper.write> method cost 0.00222992897034 seconds.2018-02-20 16:29:04,262 [INFO] {root      } - script2: select cn_state,cn_city,count(1) m from ext_act_ja22018-02-20 16:29:04,262 [INFO] {pyhive.hive} - select cn_state,cn_city,count(1) m from ext_act_ja22018-02-20 16:29:23,462 [INFO] {ppytools.hive_client} - Hive client query completed. Records found: 3672018-02-20 16:29:23,463 [INFO] {ppytools.lang.timer_helper} - Execute <ppytools.hive_client.execQuery> method cost 19.2005498409 seconds.2018-02-20 16:29:23,465 [INFO] {ppytools.csv_helper} - Write a CSV file successful. --> /opt/csv_files/城市分组统计_20180223162923.csv2018-02-20 16:29:23,465 [INFO] {ppytools.lang.timer_helper} - Execute <ppytools.csv_helper.write> method cost 0.00227284431458 seconds.2018-02-20 16:29:23,669 [INFO] {ppytools.email_client} - Send email[2018-02-22区域抽奖统计[测试]] success. To users: [email protected]2018-02-20 16:29:23,669 [INFO] {ppytools.lang.timer_helper} - Execute <ppytools.email_client.send> method cost 0.204078912735 seconds.2018-02-20 16:29:23,714 [INFO] {__main__  } - Finished 邮件报表任务测试 Email Job.2018-02-20 16:29:23,715 [INFO] {ppytools.lang.timer_helper} - Execute <emailjob.main.run> method cost 62.1566159725 seconds.

OK,一个可以通用的数据文件同步程序至此就完成啦。
用Python实现邮件发送Hive明细数据

代码地址如下:
http://www.demodashi.com/demo/12673.html

注:本文著作权归作者,由demo大师代发,拒绝转载,转载需要作者授权

用Python实现邮件发送Hive明细数据

评论关闭