from datetime import datetime
import json
import urllib.request
import pymysql as pms
import pandas as pd
import pandas as pd
from sqlalchemy import create_engine
import datetime
from pyhive import hive
from impala.dbapi import connect as impala_connect
from impala.util import as_pandas
# 连接impala数据库
def impala_db(sql):
conn = impala_connect(host='#地址 ', port=#端口)
cursor = conn.cursor()
cursor.execute(sql)
results = as_pandas(cursor)
cursor.close()
conn.close()
return results
# 日期
today = datetime.date.today()
last_day = today + datetime.timedelta(days=-1)
last_day = datetime.datetime.strftime(last_day,"%Y%m%d")
engine = create_engine('mysql+pymysql://tableau:Tableau@253.com.Ta@103.28.213.198:53306/tableau_report')
df_sql ='''
select a, b, c FROM table limit 5;
'''
#调用方法 impala_db()
df = impala_db(sql = df_sql)
df.columns = [' 字段A',' 字段B',' 字段C']
#格式
def sg_md_deal(data):
sig = '|'
sig_md_str = '### {} 查询结果展示: \n'.format(last_day)
cols_list =list(data.columns)
sig_cols = sig+sig.join(('**'+i +'**' for i in cols_list))+sig +' \n '
# | --- | --- |
sig_tab = sig + sig.join(['---:' for i in range(len(cols_list))]) +sig +' \n '
sig_md_str = sig_md_str + sig_cols +sig_tab
for value in data.values:
sig_val = sig+sig.join([' '+str(i) for i in list(value)])+sig +' \n '
sig_md_str = sig_md_str + sig_val
return sig_md_str
#你的钉钉机器人url
global myurl
my_url =" 你的钉钉机器人url "
# 自定义传入url和内容发送请求
def send_request(url, datas):
#传入url和内容发送请求
# 构建一下请求头部
header = {
"Content-Type": "application/json",
"Charset": "UTF-8"
}
sendData = json.dumps(datas) # 将字典类型数据转化为json格式
sendDatas = sendData.encode("utf-8") # python3的Request要求data为byte类型
# 发送请求
request = urllib.request.Request(url=url, data=sendDatas, headers=header)
# 将请求发回的数据构建成为文件格式
opener = urllib.request.urlopen(request)
# 打印返回的结果
print(opener.read())
# 自定义钉钉模型
def get_ddmodel_datas(type):
#返回钉钉模型数据,1:文本;2:markdown所有人;3:markdown带图片,@接收人;4:link类型
if type == 1:
my_data = {
"msgtype": "text",
"text": {
"content":"test】我就是我, 是不一样的烟火"
},
"at": {
"atMobiles": [
"17317386472"
],
"isAtAll":True
}
}
elif type == 3:
my_data = {
"msgtype": "markdown",
"markdown": {"title":" ",
"text":" "
},
"at": {
"atMobiles": [
"13761329116"
],
"isAtAll":True
}
}
return my_data
#主函数
if __name__ == "__main__":
# print('Main! The time is: %s' % datetime.now())
#3.Markdown(带图片@对象)
my_data = get_ddmodel_datas(3)
my_data["markdown"]["title"] = " 查询结果:"
my_data["markdown"]["text"] = sg_md_deal(df)
send_request(my_url, my_data)
推送结果:
自动化推送:
在跳板机上执行:
#Linux crontab是用来定期执行程序的命令。
crontab -e
#编辑如下
# oozie failure schedule
30 8 * * * ${PYTHON_HOME}/python ${DINGDING_REPORT_DIR}/dingding_push.py