Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio, aioodbc, websockets
- import datetime
- from asyncio import Queue as AQueue
- import concurrent.futures
- from multiprocessing import Manager as MMan, Queue as MpQueue
- from time import sleep
- import pyodbc
- consumers = dict()
- fieldnames = ['lognode', 'logyear', 'logmonth', 'logday', 'loghour', 'logminute',
- 'logsec', 'logmsec', 'loggroup', 'logsubgr1', 'logsubgr2', 'logsubgr3',
- 'logcode', 'logtext', 'logpost', 'logtype', 'logvalue', 'logevent', 'lognow',
- 'logshift', 'logreportdate']
- paramsym = ['?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?', '?']
- mydsn='Driver={SQL Server};Server=STATION4\SQL2008R2;Database=ssd6;uid=sa;pwd=123'
- command_insert = "INSERT INTO ssddata({}) VALUES ({})".format(",".join(fieldnames),",".join(paramsym))
- command_update = "update currentrecords set {13}=(?), {18}=(?), {20}=(?) " \
- "where {12}=(?) and {0}=(?)".format(*fieldnames)
- def db_upd(jq):
- print('Start updater')
- while True:
- try:
- mustdiscon=False
- conu = pyodbc.connect(mydsn)
- curu = conu.cursor()
- print('Connect updater')
- while True:
- try:
- upddata = jq.get()
- # print("{1} q: {0}".format(jq.qsize(),
- # datetime.datetime.now().strftime(
- # '%H:%M:%S.%f')[:-3]))
- if upddata:
- packsize = 0
- for item in upddata[0].split('\r\n'):
- if item:
- packsize += 1
- xlist = item.split('@')[:-1]
- # print("upd Item= {1} q: {0}".format(item,
- # datetime.datetime.now().strftime(
- # '%H:%M:%S.%f')[:-3]))
- # xlist[19] = int(xlist[19])
- # xlist[18] = datetime.datetime.strptime(xlist[18], "%d.%m.%Y %H:%M:%S")
- # xlist[20] = datetime.datetime.now()
- try:
- curu.execute(command_update,
- (xlist[13],
- datetime.datetime.strptime('04.04.2017 20:00:19',
- "%d.%m.%Y %H:%M:%S"),
- datetime.datetime.now(),
- xlist[12],
- xlist[0]))
- except:
- print('Some Strange3 Error')
- mustdiscon = True
- else:
- try:
- curu.commit()
- except:
- print('Some Strange4 Error')
- mustdiscon = True
- # print("{2} packsize: {0}, q: {1}".format(packsize, jq.qsize(),
- # datetime.datetime.now().strftime(
- # '%H:%M:%S.%f')[:-3]))
- if mustdiscon:
- break
- sleep(0.05)
- finally:
- jq.task_done()
- sleep(5)
- finally:
- curu.close()
- conu.close()
- print('Disconnect')
- def db_ins(jq):
- print(repr(jq))
- print('Start inserter')
- while True:
- try:
- mustdiscon=False
- coni = pyodbc.connect(mydsn)
- curi = coni.cursor()
- print('Connect inserter')
- while True:
- try:
- insdata = jq.get()
- #mp_db_inserter.get()
- # print("Get {1} q: {0}".format(jq.qsize(),
- # datetime.datetime.now().strftime(
- # '%H:%M:%S.%f')[:-3]))
- if insdata:
- packsize=0
- # print(insdata)
- for item in insdata[0].split('\r\n'):
- if item:
- packsize+=1
- xlist=item.split('@')[:-1]
- # print("ins Item= {1} q: {0}".format(item,
- # datetime.datetime.now().strftime(
- # '%H:%M:%S.%f')[:-3]))
- xlist[19]=int(xlist[19])
- xlist[18]=datetime.datetime.strptime(xlist[18], "%d.%m.%Y %H:%M:%S")
- xlist[20]=datetime.datetime.now()
- try:
- curi.execute(command_insert, xlist)
- except:
- print('Some Strange1 Error')
- mustdiscon=True
- else:
- try:
- curi.commit()
- except:
- print('Some Strange2 Error')
- mustdiscon = True
- # print("{2} packsize: {0}, q: {1}".format(packsize, jq.qsize(),
- # datetime.datetime.now().strftime(
- # '%H:%M:%S.%f')[:-3]))
- if mustdiscon:
- break
- else:
- sleep(0.05)
- finally:
- jq.task_done()
- sleep(5)
- finally:
- curi.close()
- coni.close()
- print('Disconnect')
- async def sender_handler(websocket, jq, stationcode):
- con = await aioodbc.connect(dsn=mydsn, loop=myloop)
- cur = await con.cursor()
- # сначала должен получить текущую поездную обстановку
- sqlcmd = "SELECT * FROM currentrecords where logsubgr1='{}' order by logsubgr2".format(stationcode)
- zzz = ''
- try:
- await cur.execute(sqlcmd)
- except:
- pass
- else:
- zzz='\n'.join(['@'.join((str(row) for row in col)) for col in await cur.fetchall()])
- await websocket.send(zzz)
- print(zzz)
- await cur.close()
- await con.close()
- while True:
- # print("sender to consumer {0}".format(websocket))
- # for key, value in consumers.items():
- # if (key != 'db_inserter') and (key != 'db_updater'):
- await asyncio.sleep(0)
- job = await jq.get()
- print('{1}: Send = {0}'.format(job[0],datetime.datetime.now().microsecond))
- await websocket.send(' {1} {0}'.format(job[0],datetime.datetime.now().microsecond))
- async def ws_handler(websocket, path):
- """
- принимаются пакеты с источниками подключающимися к форматному пути:
- /s - путь для веб-потребителей данных, то есть динамических отчетов
- /r@<кодстанции (logsubgr1) >@<режим записи в БД и передачи потребителям >
- режим записи в БД и передачи потребителям:
- 0 всем, update+insert
- 1 свежие - всем потребителям, только update
- 2 старые - никому из потребителей, только insert
- :param websocket:
- :param path:
- :return:
- """
- delimiter='@'
- plist=path.split(delimiter)
- if (len(plist)>2):
- if plist[0] == '/s':
- print('NEW CONNECTED /s webconsumer')
- consumers[repr(websocket)]=AQueue()
- #print('consumers {}'.format(consumers))
- try:
- #pass
- await sender_handler(websocket, consumers[repr(websocket)], plist[1])
- except websockets.exceptions.ConnectionClosed:
- print('WS TARGET LOST by ConnectionClosed')
- del consumers[repr(websocket)]
- if plist[0] == '/r':
- print('NEW CONNECTED /r webproducer')
- try:
- await receiver_handler(websocket, plist[1], plist[2])
- except websockets.exceptions.ConnectionClosed:
- print('WS SOURCE LOST')
- else:
- print('NEW CONNECTion to operation='+path)
- try:
- #pass
- await ureceiver_handler(websocket)
- except websockets.exceptions.ConnectionClosed:
- print('unknown operation SOURCE LOST')
- async def receiver_handler(websocket, source_code, source_mode):
- while True:
- z = await websocket.recv()
- # принят пакет
- if z is not None:
- # в каждую очередь словаря(уникальность имен) потребителей
- # отправляем пакет
- for key,value in consumers.items():
- if source_mode == '0': # всем
- if key != 'db_inserter' and key != 'db_updater':
- await value.put((z, 0))
- else:
- value.put((z, 0), False)
- if source_mode == '1': # горячие только для обновления
- if key!='db_inserter':
- if key != 'db_updater':
- await value.put((z, 0))
- else:
- value.put((z, 0), False)
- if source_mode == '2': # устаревшие только для записи
- if key == 'db_inserter':
- value.put((z, 0), False)
- await websocket.send('999')
- async def run_blocking_db(qu,qi):
- executor = concurrent.futures.ProcessPoolExecutor(max_workers=3, )
- loop = asyncio.get_event_loop()
- blocking_tasks2 = [
- loop.run_in_executor(executor, db_ins, qi),
- loop.run_in_executor(executor, db_upd, qu)
- ]
- await asyncio.wait(blocking_tasks2)
- async def ureceiver_handler(websocket):
- print("ureceiver_handler {0}".format(websocket))
- while True:
- z = await websocket.recv()
- if z is not None:
- print("Принято {0}".format(z))
- await websocket.send('999')
- async def zmain():
- m = MMan()
- consumers['db_inserter'] = m.Queue()
- consumers['db_updater'] = m.Queue()
- xdb_task = asyncio.ensure_future(run_blocking_db(consumers['db_updater'],consumers['db_inserter'] ))
- if __name__ == '__main__':
- myloop = asyncio.get_event_loop()
- start_serverR = websockets.serve(ws_handler, '0.0.0.0', 81)
- xtuple = (
- start_serverR,
- zmain,
- )
- myloop.run_until_complete(asyncio.gather(zmain(),start_serverR))
- myloop.run_forever()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement