Advertisement
Guest User

Untitled

a guest
Oct 23rd, 2017
117
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 11.41 KB | None | 0 0
  1. import asyncio, aioodbc, websockets
  2. import datetime
  3. from asyncio import Queue as AQueue
  4.  
  5. import concurrent.futures
  6.  
  7. from multiprocessing import Manager as MMan, Queue as MpQueue
  8. from time import sleep
  9. import pyodbc
  10.  
  11. consumers = dict()
  12.  
  13. fieldnames = ['lognode', 'logyear', 'logmonth', 'logday', 'loghour', 'logminute',
  14. 'logsec', 'logmsec', 'loggroup', 'logsubgr1', 'logsubgr2', 'logsubgr3',
  15. 'logcode', 'logtext', 'logpost', 'logtype', 'logvalue', 'logevent', 'lognow',
  16. 'logshift', 'logreportdate']
  17. paramsym = ['?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?']
  18.  
  19. mydsn='Driver={SQL Server};Server=STATION4\SQL2008R2;Database=ssd6;uid=sa;pwd=123'
  20.  
  21. command_insert = "INSERT INTO ssddata({}) VALUES ({})".format(",".join(fieldnames),",".join(paramsym))
  22.  
  23. command_update = "update currentrecords set {13}=(?), {18}=(?), {20}=(?) " \
  24. "where {12}=(?) and {0}=(?)".format(*fieldnames)
  25.  
  26.  
  27. def db_upd(jq):
  28. print('Start updater')
  29. while True:
  30. try:
  31.  
  32. mustdiscon=False
  33. conu = pyodbc.connect(mydsn)
  34. curu = conu.cursor()
  35. print('Connect updater')
  36. while True:
  37. try:
  38. upddata = jq.get()
  39. # print("{1} q: {0}".format(jq.qsize(),
  40. # datetime.datetime.now().strftime(
  41. # '%H:%M:%S.%f')[:-3]))
  42. if upddata:
  43. packsize = 0
  44. for item in upddata[0].split('\r\n'):
  45. if item:
  46. packsize += 1
  47. xlist = item.split('@')[:-1]
  48. # print("upd Item= {1} q: {0}".format(item,
  49. # datetime.datetime.now().strftime(
  50. # '%H:%M:%S.%f')[:-3]))
  51.  
  52. # xlist[19] = int(xlist[19])
  53. # xlist[18] = datetime.datetime.strptime(xlist[18], "%d.%m.%Y %H:%M:%S")
  54. # xlist[20] = datetime.datetime.now()
  55. try:
  56. curu.execute(command_update,
  57. (xlist[13],
  58. datetime.datetime.strptime('04.04.2017 20:00:19',
  59. "%d.%m.%Y %H:%M:%S"),
  60. datetime.datetime.now(),
  61. xlist[12],
  62. xlist[0]))
  63. except:
  64. print('Some Strange3 Error')
  65. mustdiscon = True
  66. else:
  67. try:
  68. curu.commit()
  69. except:
  70. print('Some Strange4 Error')
  71. mustdiscon = True
  72. # print("{2} packsize: {0}, q: {1}".format(packsize, jq.qsize(),
  73. # datetime.datetime.now().strftime(
  74. # '%H:%M:%S.%f')[:-3]))
  75. if mustdiscon:
  76. break
  77. sleep(0.05)
  78. finally:
  79. jq.task_done()
  80. sleep(5)
  81. finally:
  82. curu.close()
  83. conu.close()
  84. print('Disconnect')
  85.  
  86.  
  87. def db_ins(jq):
  88. print(repr(jq))
  89. print('Start inserter')
  90. while True:
  91. try:
  92. mustdiscon=False
  93. coni = pyodbc.connect(mydsn)
  94. curi = coni.cursor()
  95. print('Connect inserter')
  96. while True:
  97.  
  98. try:
  99. insdata = jq.get()
  100. #mp_db_inserter.get()
  101. # print("Get {1} q: {0}".format(jq.qsize(),
  102. # datetime.datetime.now().strftime(
  103. # '%H:%M:%S.%f')[:-3]))
  104.  
  105. if insdata:
  106. packsize=0
  107. # print(insdata)
  108. for item in insdata[0].split('\r\n'):
  109. if item:
  110.  
  111. packsize+=1
  112. xlist=item.split('@')[:-1]
  113. # print("ins Item= {1} q: {0}".format(item,
  114. # datetime.datetime.now().strftime(
  115. # '%H:%M:%S.%f')[:-3]))
  116. xlist[19]=int(xlist[19])
  117. xlist[18]=datetime.datetime.strptime(xlist[18], "%d.%m.%Y %H:%M:%S")
  118. xlist[20]=datetime.datetime.now()
  119. try:
  120. curi.execute(command_insert, xlist)
  121. except:
  122. print('Some Strange1 Error')
  123. mustdiscon=True
  124. else:
  125. try:
  126. curi.commit()
  127. except:
  128. print('Some Strange2 Error')
  129. mustdiscon = True
  130. # print("{2} packsize: {0}, q: {1}".format(packsize, jq.qsize(),
  131. # datetime.datetime.now().strftime(
  132. # '%H:%M:%S.%f')[:-3]))
  133.  
  134. if mustdiscon:
  135. break
  136. else:
  137. sleep(0.05)
  138. finally:
  139. jq.task_done()
  140. sleep(5)
  141. finally:
  142. curi.close()
  143. coni.close()
  144. print('Disconnect')
  145.  
  146.  
  147. async def sender_handler(websocket, jq, stationcode):
  148. con = await aioodbc.connect(dsn=mydsn, loop=myloop)
  149. cur = await con.cursor()
  150. # сначала должен получить текущую поездную обстановку
  151. sqlcmd = "SELECT * FROM currentrecords where logsubgr1='{}' order by logsubgr2".format(stationcode)
  152. zzz = ''
  153. try:
  154. await cur.execute(sqlcmd)
  155. except:
  156. pass
  157. else:
  158. zzz='\n'.join(['@'.join((str(row) for row in col)) for col in await cur.fetchall()])
  159. await websocket.send(zzz)
  160. print(zzz)
  161. await cur.close()
  162. await con.close()
  163.  
  164. while True:
  165. # print("sender to consumer {0}".format(websocket))
  166. # for key, value in consumers.items():
  167. # if (key != 'db_inserter') and (key != 'db_updater'):
  168. await asyncio.sleep(0)
  169.  
  170. job = await jq.get()
  171. print('{1}: Send = {0}'.format(job[0],datetime.datetime.now().microsecond))
  172. await websocket.send(' {1} {0}'.format(job[0],datetime.datetime.now().microsecond))
  173.  
  174.  
  175. async def ws_handler(websocket, path):
  176. """
  177. принимаются пакеты с источниками подключающимися к форматному пути:
  178. /s - путь для веб-потребителей данных, то есть динамических отчетов
  179. /r@<кодстанции (logsubgr1) >@<режим записи в БД и передачи потребителям >
  180. режим записи в БД и передачи потребителям:
  181. 0 всем, update+insert
  182. 1 свежие - всем потребителям, только update
  183. 2 старые - никому из потребителей, только insert
  184.  
  185. :param websocket:
  186. :param path:
  187. :return:
  188. """
  189. delimiter='@'
  190. plist=path.split(delimiter)
  191. if (len(plist)>2):
  192. if plist[0] == '/s':
  193. print('NEW CONNECTED /s webconsumer')
  194. consumers[repr(websocket)]=AQueue()
  195. #print('consumers {}'.format(consumers))
  196. try:
  197. #pass
  198. await sender_handler(websocket, consumers[repr(websocket)], plist[1])
  199. except websockets.exceptions.ConnectionClosed:
  200. print('WS TARGET LOST by ConnectionClosed')
  201. del consumers[repr(websocket)]
  202.  
  203. if plist[0] == '/r':
  204. print('NEW CONNECTED /r webproducer')
  205. try:
  206. await receiver_handler(websocket, plist[1], plist[2])
  207. except websockets.exceptions.ConnectionClosed:
  208. print('WS SOURCE LOST')
  209. else:
  210. print('NEW CONNECTion to operation='+path)
  211. try:
  212. #pass
  213. await ureceiver_handler(websocket)
  214. except websockets.exceptions.ConnectionClosed:
  215. print('unknown operation SOURCE LOST')
  216.  
  217.  
  218. async def receiver_handler(websocket, source_code, source_mode):
  219. while True:
  220. z = await websocket.recv()
  221. # принят пакет
  222. if z is not None:
  223. # в каждую очередь словаря(уникальность имен) потребителей
  224. # отправляем пакет
  225. for key,value in consumers.items():
  226. if source_mode == '0': # всем
  227. if key != 'db_inserter' and key != 'db_updater':
  228. await value.put((z, 0))
  229. else:
  230. value.put((z, 0), False)
  231. if source_mode == '1': # горячие только для обновления
  232. if key!='db_inserter':
  233. if key != 'db_updater':
  234. await value.put((z, 0))
  235. else:
  236. value.put((z, 0), False)
  237. if source_mode == '2': # устаревшие только для записи
  238. if key == 'db_inserter':
  239. value.put((z, 0), False)
  240. await websocket.send('999')
  241.  
  242. async def run_blocking_db(qu,qi):
  243. executor = concurrent.futures.ProcessPoolExecutor(max_workers=3, )
  244. loop = asyncio.get_event_loop()
  245. blocking_tasks2 = [
  246. loop.run_in_executor(executor, db_ins, qi),
  247. loop.run_in_executor(executor, db_upd, qu)
  248. ]
  249. await asyncio.wait(blocking_tasks2)
  250.  
  251.  
  252.  
  253. async def ureceiver_handler(websocket):
  254. print("ureceiver_handler {0}".format(websocket))
  255. while True:
  256. z = await websocket.recv()
  257. if z is not None:
  258. print("Принято {0}".format(z))
  259. await websocket.send('999')
  260.  
  261. async def zmain():
  262. m = MMan()
  263. consumers['db_inserter'] = m.Queue()
  264. consumers['db_updater'] = m.Queue()
  265. xdb_task = asyncio.ensure_future(run_blocking_db(consumers['db_updater'],consumers['db_inserter'] ))
  266.  
  267.  
  268. if __name__ == '__main__':
  269. myloop = asyncio.get_event_loop()
  270. start_serverR = websockets.serve(ws_handler, '0.0.0.0', 81)
  271.  
  272. xtuple = (
  273. start_serverR,
  274. zmain,
  275. )
  276.  
  277. myloop.run_until_complete(asyncio.gather(zmain(),start_serverR))
  278. myloop.run_forever()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement