自动化报表的实现

一段自动报表查询导出发送的脚本

Posted by DCL on June 15, 2016

日常工作中,业务部门经常有报表的需求。需要人工查询数据库并导出表单发送邮件。

下面的Python脚本可以自动实现报表的查询、导出并发送邮件,再设置好系统的计划任务后,这样业务部门需求的报表可以准时自动发送到其邮箱。

#!/usr/bin/evn python
# -*- coding:utf-8 -*-

# XX报表

import datetime, time
import mysql.connector
import xlwt
import sys
import os
reload(sys)
sys.setdefaultencoding('utf-8')

# 默认样式
default_style = xlwt.easyxf('''
  pattern: pattern solid;
  borders: left 1, right 1, top 1, bottom 1;
  align: horiz center''',
  num_format_str='0,000.00')

# 标题栏样式
title_style = xlwt.easyxf('''
  pattern: pattern solid, fore_colour yellow;
  font: name Times New Roman, color-index black, bold on;
  borders: left 1, right 1, top 1, bottom 1;
  align: horiz center''',
  num_format_str='0,000.00')

# 时间格式样式
time_style = xlwt.easyxf(num_format_str='YYYY-MM-DD h:mm:ss')

def get_sql():
  '''
  创建需要的sql语句
  '''
  sql = '''

# sql查询语句

'''
  return sql

def get_title(cursor):
  '''
  通过游标获得excel文件title
  '''
  return cursor.column_names

def get_select_data(cursor):
  '''
  通过游标获得数据列表(list)
  '''
  return [row for row in cursor]

def create_excel_title(work_sheet, title, title_style=None):
  '''
  生产exceltitle
  '''
  if not title_style:
    title_style = default_style
  for col_index, col_name in enumerate(title):
    work_sheet.write(0, col_index, col_name, title_style)
  return work_sheet

def create_excel_body(work_sheet, body, body_style=None):
  '''
  生成excel body信息
  '''
  if not title_style:
    body_style = default_style
  for row_num, row_data in enumerate(data, 1):
    for col_index, col_value in enumerate(row_data):
      work_sheet.write(row_num, col_index, col_value)
  return work_sheet

def get_col_max_length(data, title):
  '''
  获得数据每列最大值长度
  '''
  col_len = map(len, map(str, title))
  func = lambda x, y: y if y>x else x
  for row in data:
    row_len = map(len, map(str, row))
    col_len = map(func, col_len, row_len)
  return col_len

def set_work_sheet_col_len(work_sheet, max_len):
  '''
  设置列长度
  '''
  for col, len in enumerate(max_len):
    work_sheet.col(col).width = 256 * (len + 1)
  return work_sheet
      

if __name__ == '__main__':
  info = {
#数据库连接
    'host'    :'',
    'user'    :'',
    'password':'',
    'database':''
  }
  conn = mysql.connector.connect(**info)
  cursor = conn.cursor()
  sql = get_sql()
  cursor.execute(sql)
  # 获得excel的title
  title = get_title(cursor)
  # 获得需要的数据
  data = get_select_data(cursor)
  # 获得每一列的最大长度
  max_len = get_col_max_length(data, title)
  work_book = xlwt.Workbook(encoding='utf-8')
  # 创建一个excel模板
  work_sheet = work_book.add_sheet('查询数据')
  # 生成excel title
  work_sheet = create_excel_title(work_sheet, title, title_style)
  # 生成 excel 数据
  work_sheet = create_excel_body(work_sheet, data)
  # 设置每一列适当的长度
  work_sheet = set_work_sheet_col_len(work_sheet, max_len)
  # 保存 excel
  work_book.save('data_{time}_XX报表.xls'.format(time=time.strftime('%Y-%m-%d', time.localtime(time.time()))))

from email import encoders
from email.header import Header
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.utils import parseaddr, formataddr

import smtplib

def _format_addr(s):
    name, addr = parseaddr(s)
    return formataddr(( \
        Header(name, 'utf-8').encode(), \
        addr.encode('utf-8') if isinstance(addr, unicode) else addr))
#邮件发送
from_addr = ''
password = ''
to_addr = ['','']
#邮件服务器地址
smtp_server = ''

msg = MIMEMultipart()
msg['From'] = _format_addr(u'自动报表 <%s>' % from_addr)
msg['To'] = _format_addr(u'业务需求部门 <%s>' % to_addr)
msg['Subject'] = Header(u'日数据报表-XX报表', 'utf-8').encode()

# add MIMEText:
msg.attach(MIMEText('HI,all:相关报表见邮件附件,请查收!', 'plain', 'utf-8'))

# add file:
with open(('data_%s_XX报表.xls' % time.strftime('%Y-%m-%d', time.localtime(time.time()))), 'rb') as f:
    mime = MIMEBase('image', 'xls', filename=('data_%s_XX报表.xls' % time.strftime('%Y-%m-%d', time.localtime(time.time()))))
    mime.add_header('Content-Disposition', 'attachment', filename=('data_%s_XX报表.xls' % time.strftime('%Y-%m-%d', time.localtime(time.time()))))
    mime.add_header('Content-ID', '<0>')
    mime.add_header('X-Attachment-Id', '0')
    mime.set_payload(f.read())
    encoders.encode_base64(mime)
    msg.attach(mime)

server = smtplib.SMTP_SSL(smtp_server, 465) #邮件服务器发送端口
server.set_debuglevel(1)
server.login(from_addr, password)
server.sendmail(from_addr, to_addr, msg.as_string())
server.quit()