Advertisement
Guest User

Untitled

a guest
May 1st, 2017
82
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 14.92 KB | None | 0 0
  1. [root@testing ~]# cat /home/opt/ati_ui/script/class4_cdr_export.py
  2. #!/usr/bin/env python3
  3. import argparse
  4. import io
  5. from configparser import RawConfigParser
  6. import time
  7. import datetime
  8. import os
  9. import stat
  10. import subprocess
  11. import base64
  12.  
  13. import psycopg2
  14. import psycopg2.extras
  15.  
  16. import shutil
  17.  
  18. from lib.helper import SendMail,Helper
  19.  
  20.  
  21. def load_config(config_file_path):
  22. ini_str = open(config_file_path, 'r').read()
  23. ini_fp = io.StringIO(ini_str)
  24. config = RawConfigParser(strict=False, allow_no_value=True)
  25. config.readfp(ini_fp)
  26. return config
  27.  
  28.  
  29. def parse_args():
  30. parser = argparse.ArgumentParser(description="CDR Export")
  31. parser.add_argument('-c', '--config', required=True,
  32. dest="config", help="Config File")
  33. parser.add_argument('-i', '--log', required=True, type=int,
  34. dest='log_id', help='Log ID')
  35. parser.add_argument('-r', '--is_send_mail', type=int,
  36. dest='is_send_mail', help='is send mail',default=0)
  37. args = parser.parse_args()
  38. return args
  39.  
  40.  
  41. def export_cdr(config, log_id):
  42. logger = Helper.create_logger('cdr export')
  43. conn = psycopg2.connect(host=config.get('db', 'hostaddr'),
  44. port=config.get('db', 'port'),
  45. database=config.get('db', 'dbname'),
  46. user=config.get('db', 'user'),
  47. password=config.get('db', 'password'))
  48. conn.autocommit = True
  49. cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
  50. cur.execute("SELECT * FROM cdr_export_log WHERE id = %s", (log_id, ))
  51. cdr_export_log = cur.fetchone()
  52.  
  53. cur.execute("UPDATE cdr_export_log SET status = 1 WHERE id = %s", (log_id, ))
  54.  
  55. export_path = os.path.realpath(os.path.join(os.path.dirname(__file__), os.path.pardir, 'db_nfs_path', 'cdr_download'))
  56. if not os.path.exists(export_path):
  57. os.makedirs(export_path)
  58. try:
  59. # cdr_download_total_file = os.path.join(export_path, cdr_export_log['file_name'])
  60. os.chmod(export_path, stat.S_IRWXO+stat.S_IRWXU+stat.S_IRWXG)
  61. # os.chmod(cdr_download_total_file, stat.S_IRWXO+stat.S_IRWXU+stat.S_IRWXG)
  62. except:
  63. print ("chmod 777 failed")
  64.  
  65. cdr_start = datetime.datetime.strptime(str(cdr_export_log['cdr_start_time'])[0:19], "%Y-%m-%d %H:%M:%S")
  66. total_cdr_start = cdr_start
  67.  
  68. is_continue = False
  69. cdr_download_total_file = os.path.join(export_path, cdr_export_log['file_name'])
  70. # if cdr_export_log['stop_time'] and os.path.exists(cdr_download_total_file + '.bz2'):
  71. if cdr_export_log['stop_time'] and os.path.exists(cdr_download_total_file + '.zip'):
  72. cdr_start = datetime.datetime.strptime(str(cdr_export_log['stop_time'])[0:19], "%Y-%m-%d %H:%M:%S")
  73. is_continue = True
  74. logger.info('continue task log id ' + str(log_id))
  75.  
  76. # os.system('bzip2 -d %s' % cdr_download_total_file + '.bz2')
  77. os.chdir(export_path)
  78. os.system('unzip %s' % cdr_download_total_file + '.zip')
  79. os.system('rm -rf %s' % cdr_download_total_file + '.zip')
  80.  
  81.  
  82. cdr_end = datetime.datetime.strptime(str(cdr_export_log['cdr_end_time'])[0:19], "%Y-%m-%d %H:%M:%S")
  83.  
  84. print ("start %s end %s" % (cdr_start,cdr_end))
  85.  
  86. cur.execute("select TABLE_NAME as name from INFORMATION_SCHEMA.TABLES where TABLE_NAME like'client_cdr2%' order by TABLE_NAME limit 1")
  87. table_info = cur.fetchone()
  88. last_time_name = table_info['name'][10:]
  89. last_table_time = datetime.datetime.strptime(last_time_name, "%Y%m%d")
  90.  
  91. # print (type(last_table_time),last_table_time)
  92. # print (type(cdr_start),cdr_start)
  93.  
  94. if cdr_start < last_table_time:
  95. cdr_start = last_table_time
  96. now = datetime.datetime.now()
  97. if cdr_end > now:
  98. cdr_end = now
  99.  
  100. print ("start %s end %s" % (cdr_start,cdr_end))
  101.  
  102. total_hours = (cdr_end - total_cdr_start).days * 24 + int(((cdr_end - total_cdr_start).seconds + 3600 - 1)/3600)
  103.  
  104. if is_continue:
  105. total_row = cdr_export_log['file_rows']
  106. cur.execute("UPDATE cdr_export_log SET status = 2,total_hours = %s WHERE id = %s", (total_hours,log_id, ))
  107. else:
  108. total_row = 0
  109. cur.execute("UPDATE cdr_export_log SET status = 2,finished_hours = 0,total_hours = %s WHERE id = %s", (total_hours,log_id, ))
  110.  
  111.  
  112. this_download_path_name = str(cdr_export_log['id'])+'_'+str(int(time.time()))
  113.  
  114. log_file_path_name = os.path.join(export_path, this_download_path_name)
  115. if not os.path.exists(log_file_path_name):
  116. os.makedirs(log_file_path_name)
  117. try:
  118. os.chmod(log_file_path_name, stat.S_IRWXO+stat.S_IRWXU+stat.S_IRWXG)
  119. except:
  120. print ("chmod 777 failed")
  121. print (log_file_path_name)
  122.  
  123.  
  124.  
  125.  
  126. tmp_cdr_start = cdr_start
  127.  
  128.  
  129. is_stop = False
  130. error_flg = False
  131. while tmp_cdr_start <= cdr_end:
  132.  
  133. if tmp_cdr_start != cdr_start:
  134. tmp_cdr_start = datetime.datetime.strptime(tmp_cdr_start.strftime('%Y-%m-%d %H:00:00'), "%Y-%m-%d %H:%M:%S")
  135.  
  136. # print (tmp_cdr_start.strftime('%Y-%m-%d 23:59:59'))
  137. tmp_cdr_end = datetime.datetime.strptime(tmp_cdr_start.strftime('%Y-%m-%d %H:59:59'), "%Y-%m-%d %H:%M:%S")
  138.  
  139. if tmp_cdr_end > cdr_end:
  140. tmp_cdr_end = cdr_end
  141.  
  142. # print (tmp_cdr_start,tmp_cdr_end)
  143.  
  144. time_str = tmp_cdr_start.strftime('%Y%m%d')
  145. # print ("this time is %s time str %s" % (tmp_cdr_start,time_str))
  146. this_where = cdr_export_log['where_sql'].replace('client_cdr.','client_cdr'+time_str+'.')
  147. this_show_fields = cdr_export_log['show_fields_sql'].replace('client_cdr.','client_cdr'+time_str+'.')
  148.  
  149.  
  150. sql = "select {} FROM client_cdr{} where time between '{}' and '{}' {} order by time".format(this_show_fields,time_str,str(tmp_cdr_start),str(tmp_cdr_end),this_where)
  151. this_file_name = os.path.join(log_file_path_name, tmp_cdr_start.strftime('%Y%m%d_%H%M%S')+'.csv')
  152. logger.info('start {} end {}'.format(str(tmp_cdr_start),str(tmp_cdr_end)))
  153. # tmp_cdr_start = tmp_cdr_start + datetime.timedelta(hours=1)
  154. # continue
  155. if is_continue == False and tmp_cdr_start == cdr_start:
  156. # 第一个小时 文件需要带表头
  157. copy_sql = "COPY (%s) TO STDOUT WITH CSV HEADER " % (sql)
  158. else:
  159. # 将生成的数据 不带头 以便后面将文件追加到第一个文件中
  160. copy_sql = "COPY (%s) TO STDOUT WITH DELIMITER AS ','" % (sql)
  161.  
  162.  
  163. Canceled_flg = False
  164. try:
  165. handle = open(this_file_name, "w")
  166. except:
  167. error_flg = True
  168. error_msg = 'Download file path do not have write permissions'
  169. # cur.execute("UPDATE cdr_export_log SET status = -1 , error_msg = 'Download file path do not have write permissions' WHERE id = %s", (log_id, ))
  170. break
  171.  
  172. error_msg = ''
  173. # cur.execute(copy_sql)
  174. while (1):
  175. try:
  176. cur.copy_expert(copy_sql,handle)
  177. except (psycopg2.extensions.QueryCanceledError, psycopg2.OperationalError):
  178. # print(psycopg2.extensions.QueryCanceledError)
  179. # print(psycopg2.OperationalError)
  180. # error_msg = psycopg2.extensions.QueryCanceledError + "\n" + psycopg2.OperationalError
  181. Canceled_flg = True
  182.  
  183. except psycopg2.DatabaseError:
  184. print(psycopg2.DatabaseError)
  185. logger.error("error sql : {}".format(copy_sql))
  186. error_flg = True
  187. # error_msg = psycopg2.DatabaseError
  188. error_msg = 'Database error'
  189.  
  190. if error_flg:
  191. handle.close()
  192. break
  193. elif Canceled_flg:
  194. handle.close()
  195. logger.info('SQL canceled.Waiting for 1 minutes auto retry')
  196. time.sleep(60)
  197. stop_data = get_stop_status(cur,log_id)
  198. if stop_data is not None:
  199. is_stop = True
  200. cur.execute("UPDATE cdr_export_log SET stop_time = %s,status = -2 WHERE id = %s", (tmp_cdr_start,log_id, ))
  201. break
  202.  
  203. handle = open(this_file_name, "w")
  204. else:
  205.  
  206. handle.close()
  207. rows_cmd = "wc -l %s" % (this_file_name)
  208. rows_result = subprocess.check_output(rows_cmd, shell=True)
  209. rows = int(rows_result.decode().split( )[0])
  210. # if rows >= 10:
  211. # time.sleep(20)
  212. if is_continue is False and tmp_cdr_start == cdr_start:
  213. rows = rows - 1
  214. shutil.copyfile(this_file_name,cdr_download_total_file)
  215. logger.info('create download file')
  216. else:
  217. cmd = "cat {} >> {}".format(this_file_name,cdr_download_total_file)
  218. os.system(cmd)
  219. logger.info('add new row to download file')
  220. total_row += rows
  221.  
  222. cur.execute("UPDATE cdr_export_log SET finished_hours = finished_hours + 1,file_rows = %s WHERE id = %s", (total_row,log_id, ))
  223. break
  224.  
  225.  
  226. if error_flg == True:
  227. break
  228.  
  229. if is_stop:
  230. break
  231.  
  232. tmp_cdr_start = tmp_cdr_start + datetime.timedelta(hours=1)
  233. stop_data = get_stop_status(cur,log_id)
  234. if stop_data is not None:
  235. is_stop = True
  236. cur.execute("UPDATE cdr_export_log SET stop_time = %s,status = -2 WHERE id = %s", (tmp_cdr_start,log_id, ))
  237. break
  238. # if 'i' not in dir():
  239. # i = 0
  240. # if i > 2:
  241. # time.sleep(30)
  242. # i += 1
  243.  
  244.  
  245. # return
  246. if is_stop:
  247. logger.info('Log stop.exit')
  248. # cmd = "cat %s | bzip2 -s %s" % (cdr_download_total_file, cdr_download_total_file)
  249. cmd = "zip -m %s.zip %s" % (cdr_export_log['file_name'], cdr_export_log['file_name'])
  250. logger.info("compress file")
  251. os.system(cmd)
  252. os.chdir(export_path)
  253. os.system('rm -rf %s' % this_download_path_name)
  254. logger.info("rm tmp path")
  255. return
  256.  
  257.  
  258. if error_flg:
  259. cur.execute("UPDATE cdr_export_log SET status = -1 , error_msg = %s WHERE id = %s", (error_msg,log_id, ))
  260. cur.close()
  261. conn.close()
  262. return
  263.  
  264. cur.execute("UPDATE cdr_export_log SET status = 3, file_rows = %s WHERE id = %s", (total_row,log_id, ))
  265.  
  266. # os.chdir(export_path)
  267.  
  268. # cmd = "cat %s | bzip2 -s %s" % (cdr_download_total_file, cdr_download_total_file)
  269. os.chdir(export_path)
  270. cmd = "zip -m %s.zip %s" % (cdr_export_log['file_name'], cdr_export_log['file_name'])
  271. logger.info("compress file")
  272. os.system(cmd)
  273. os.system('rm -rf %s' % this_download_path_name)
  274. logger.info("rm tmp path")
  275.  
  276. cur.execute("UPDATE cdr_export_log SET status = 4 WHERE id = %s", (log_id, ))
  277.  
  278. if cdr_export_log['send_mail']:
  279. web_base_url = config.get('web_base','url')
  280. cdr_send_mail(cur,log_id,cdr_export_log,web_base_url)
  281. cur.close()
  282. conn.close()
  283.  
  284.  
  285. def get_stop_status(cur,log_id):
  286. sql = "select 1 from cdr_export_log where id = %s AND status = 6"
  287. cur.execute(sql,(log_id,))
  288. return cur.fetchone()
  289.  
  290.  
  291.  
  292. def get_smtp_info(cursor):
  293. sql = """SELECT smtphost as host,smtpport as port,emailusername as username,emailpassword as password,loginemail as is_auth,
  294. fromemail as from_email, smtp_secure as smtp_secure FROM system_parameter LIMIT 1"""
  295. cursor.execute(sql)
  296. smtp_setting = cursor.fetchone()
  297. return smtp_setting
  298.  
  299.  
  300. def get_smtp_info_by_send(cur,send_mail_id):
  301. sql = """SELECT smtp_host AS host, smtp_port AS port,username,password as password,loginemail as is_auth,
  302. email as from_email,name as name, secure as smtp_secure FROM mail_sender where id = %s"""
  303. cur.execute(sql,(send_mail_id,))
  304. smtp_setting = cur.fetchone()
  305. return smtp_setting
  306.  
  307.  
  308. def get_cdr_download_template(cur):
  309. sql = """SELECT download_cdr_from,download_cdr_subject,download_cdr_content,download_cdr_cc FROM mail_tmplate limit 1"""
  310. cur.execute(sql)
  311. return cur.fetchone()
  312.  
  313.  
  314. def cdr_send_mail(cur,log_id,log_info,web_base_url):
  315. send_mail = log_info['send_mail']
  316. if log_info['send_type'] == 1:
  317. template_info = get_cdr_download_template(cur)
  318. else:
  319. template_info = log_info
  320. if template_info['download_cdr_from'] == 'Default' or template_info['download_cdr_from'] == 'default':
  321. smtp_setting = get_smtp_info(cur)
  322. else:
  323. smtp_setting = get_smtp_info_by_send(cur,template_info['download_cdr_from'])
  324. if smtp_setting is None:
  325. smtp_setting = get_smtp_info(cur)
  326. mail_info = {}
  327. for (d,x) in smtp_setting.items():
  328. mail_info[d] = x
  329.  
  330. content = template_info['download_cdr_content']
  331. str_key = "key="+base64.b64encode(str(log_id).encode()).decode()
  332. tmp = base64.b64encode(str_key.encode()).decode()
  333. download_url = web_base_url+'cdr_download/index/'+ tmp
  334. download_btn = "<a href='{}'>Download Link</a>".format(download_url)
  335. if content is not None and '{download_link}' in content:
  336. content = content.replace('{download_link}',download_btn)
  337. else:
  338. content += '<br />download link is :'+download_btn
  339.  
  340. mail_info['subject'] = template_info['download_cdr_subject']
  341. mail_info['to'] = send_mail
  342. mail_info['cc'] = template_info['download_cdr_cc']
  343. mail_info['content'] = content
  344. return_info = SendMail.send_mail(mail_info)
  345. print (return_info)
  346. save_email_log(cur,return_info,mail_info)
  347.  
  348.  
  349. def save_email_log(cur,return_info,mail_info):
  350. sql = """INSERT INTO email_log (send_time,type,email_addresses,status,error,subject,content)
  351. values (current_timestamp(0),5,%s,%s,%s,%s,%s )"""
  352. if return_info['status'] == True:
  353. status = 0
  354. else:
  355. status = 1
  356. cur.execute(sql,(mail_info['to'],status,return_info['msg'],mail_info['subject'],mail_info['content']))
  357.  
  358.  
  359. def only_send_mail(config,log_id):
  360. conn = psycopg2.connect(host=config.get('db', 'hostaddr'),
  361. port=config.get('db', 'port'),
  362. database=config.get('db', 'dbname'),
  363. user=config.get('db', 'user'),
  364. password=config.get('db', 'password'))
  365. conn.autocommit = True
  366. cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
  367.  
  368. cur.execute("SELECT * FROM cdr_export_log WHERE id = %s", (log_id, ))
  369. cdr_export_log = cur.fetchone()
  370. if cdr_export_log is None:
  371. return False
  372. web_base_url = config.get('web_base','url')
  373. cdr_send_mail(cur,log_id,cdr_export_log,web_base_url)
  374.  
  375.  
  376. def main():
  377. args = parse_args()
  378. config =load_config(args.config)
  379. if args.is_send_mail == 1:
  380. only_send_mail(config, args.log_id)
  381. else:
  382. export_cdr(config, args.log_id)
  383.  
  384. if __name__ == "__main__":
  385. main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement