Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- import json
- import os
- import sys
- from contextlib import suppress
- from functools import partial
- from typing import Optional, Union
- import aiofiles
- import aiopg
- import asyncio_redis
- import asyncssh
- import magic
- import psycopg2
- from aiohttp import web
- DSN = 'dbname={db} user={user} password={password} host={host}'.format(
- **dict(db=DATABASES['default'].get('NAME'), user=DATABASES['default'].get('USER'),
- password=DATABASES['default'].get('PASSWORD'), host=DATABASES['default'].get('HOST'))
- )
- class UploadError(Exception):
- pass
- async def get_file_settings(redis_pool: 'asyncio_redis.Pool', token: str) -> Optional[dict]:
- """
- Метод получения параметров загружаемого файла из Redis.
- :param redis_pool: пул соединений с Redis
- :param token: уникальный идентификатор для получения информации для загрузки файла
- :return: параметры файла
- """
- try:
- key = UPLOAD_SETTING_PATTERN.format(token)
- settings = await redis_pool.get(key)
- if settings is None or not token:
- raise web.HTTPBadRequest(
- body=json.dumps(error_message('Token is invalid.')),
- content_type='application/json')
- settings = json.loads(settings)
- return settings
- except asyncio_redis.exceptions.Error:
- raise web.HTTPInternalServerError(
- body=json.dumps(error_message('Failed to verify token.')),
- content_type='application/json'
- )
- def get_file_class(db_table: str, *args, **kwargs) -> str:
- return TABLES_TO_CLASS_MAP.get(db_table, "")
- def get_logging_extra(file_id: str, db_table: str, *args, **kwargs) -> dict:
- """
- Метод получения дополнительной информации для логирования.
- :param file_id: уникальный идентификатор файла в БД
- :param db_table: таблица в БД
- :param args: дополнительные позиционные аргументы
- :param kwargs: дополнительные key=value аргументы
- """
- return dict(user='system', entity_uuid=[file_id], entity_class=[get_file_class(db_table)])
- async def get_ssh_connection(auth_ssh: dict, storage_ip: str) -> 'asyncssh.SSHClientConnection':
- """
- Метод соединения по SSH.
- :param auth_ssh: параметры для подключения по SSH
- :param storage_ip: IP-адрес хранилища
- """
- conn, _ = await asyncio.wait_for(
- asyncssh.create_connection(asyncssh.SSHClient, host=storage_ip, known_hosts=None,
- **auth_ssh),
- UPLOADER_SSH_TIMEOUT
- )
- return conn
- async def select_handler(auth_ssh: dict, file_settings: dict, logging_extra: dict) -> tuple:
- """
- Метод выбора корректного обработчика для записи файла в хранилище.
- :param auth_ssh: параметры для подключения по SSH
- :param file_settings: параметры файла
- :param logging_extra: дополнительная информация для логирования
- """
- if file_settings['destination'] == 'controller':
- handler, connection = controller_file_upload_handler(**file_settings)
- else:
- handler, connection = await node_file_upload_handler(auth_ssh,
- logging_extra=logging_extra,
- **file_settings)
- return handler, connection
- def controller_file_upload_handler(file_path: str, *args, **kwargs) -> tuple:
- """
- Метод получения обработчика для записи файла на контроллер (локально).
- :param file_path: путь к файлу
- :param args: дополнительные позиционные аргументы
- :param kwargs: дополнительные key=value аргументы
- """
- if not os.path.exists(os.path.dirname(file_path)):
- os.mkdir(os.path.dirname(file_path))
- return aiofiles, None
- async def node_file_upload_handler(auth_ssh: dict, storage_ip: str, file_path: str, file_id: str,
- logging_extra: dict, *args, **kwargs) -> tuple:
- """
- Метод получения обработчика для записи файла на удаленное хранилище, используя SFTP.
- :param auth_ssh: параметры для подключения по SSH
- :param storage_ip: IP-адрес хранилища
- :param file_path: путь к файлу
- :param file_id: уникальный идентификатор файла в БД
- :param logging_extra: дополнительная информация для логирования
- :param args: дополнительные позиционные аргументы
- :param kwargs: дополнительные key=value аргументы
- """
- try:
- conn = await get_ssh_connection(auth_ssh, storage_ip=storage_ip)
- except (asyncio.TimeoutError, OSError):
- LOG.error(
- 'Failed upload file {file_id} ({file_path}): the storage on ip-address {storage_ip} '
- 'is not available.'.format(file_path=file_path, file_id=file_id, storage_ip=storage_ip),
- extra=logging_extra)
- return web.json_response(data=error_message('The storage is not available.'),
- status=500)
- sftp = await conn.start_sftp_client()
- exists = await sftp.exists(os.path.dirname(file_path))
- if not exists:
- await sftp.mkdir(os.path.dirname(file_path))
- return sftp, conn
- def check_max_file_size(file_magic: str, size: int) -> None:
- """
- Метод проверки максимального размера загружаемого файла.
- Реализована проверка только Keytab файлов.
- Максимальный размер указан в переменной settings.CTRL_KEYTAB_MAX_SIZE в Mb.
- :param file_magic: тип файла
- :param size: размер в байтах
- """
- if 'Kerberos Keytab file' in file_magic:
- if size > CTRL_KEYTAB_MAX_SIZE * 1024 * 1024:
- msg = 'Max file size exceeded. Keytab max size - 1Mb.'
- raise UploadError(msg)
- def check_magic_header(chunk: bytes, db_table: str, file_id: str, file_path: str,
- logging_extra: dict, *args, **kwargs) -> Optional[str]:
- """
- Метод проверки соответствия типа загружаемого файла.
- :param chunk: фрагмент файла
- :param db_table: таблица в БД
- :param file_id: уникальный идентификатор файла в БД
- :param file_path: путь к файлу
- :param logging_extra: дополнительная информация для логирования
- :param args: дополнительные позиционные аргументы
- :param kwargs: дополнительные key=value аргументы
- """
- file_magic = magic.from_buffer(chunk)
- if not any(magic_header in file_magic for magic_header in
- TABLES_TO_MAGIC_MAP.get(db_table)):
- LOG.error(
- 'Failed upload file {file_id} ({file_path}): '
- 'bad file format'.format(file_id=file_id, file_path=file_path),
- extra=logging_extra
- )
- return None
- return file_magic
- async def update_file_status(pgsql_pool: 'aiopg.Pool', db_table: str, file_id: str, status: str,
- additional_flags: dict = None, file_size: int = None,
- **kwargs) -> None:
- """
- Метод выставления статуса файла в БД.
- :param pgsql_pool: пул соединений с БД
- :param db_table: таблица в БД
- :param file_id: уникальный идентификатор файла в БД
- :param status: статус
- :param additional_flags: дополнительные флаги
- :param file_size: размер файла
- :param kwargs: дополнительные key=value аргументы
- """
- async with pgsql_pool.acquire() as conn:
- async with conn.cursor() as cur:
- await cur.execute(
- "UPDATE {table} SET status=%s, size=%s "
- "WHERE id=%s".format(table=db_table), (status, file_size, file_id))
- if additional_flags:
- await cur.execute(
- "UPDATE {table} SET additional_flags=%s WHERE id=%s".format(table=db_table),
- (json.dumps(additional_flags), file_id)
- )
- # Методы выставления ACTIVE/FAILED статуса файла
- file_active = partial(update_file_status, status='ACTIVE')
- file_failed = partial(update_file_status, status='FAILED')
- async def remove_file(handler: Union['aiofiles', 'asyncssh.SFTPClient'],
- file_settings: dict) -> None:
- """
- Метод удаления файла.
- :param handler: обработчик для записи файла
- :param file_settings: параметры файла
- """
- if file_settings['destination'] == 'controller':
- with suppress(FileNotFoundError):
- os.remove(file_settings['file_path'])
- else:
- handler.remove(file_settings['file_path'])
- def error_message(message: str) -> dict:
- """
- Метод форматирования сообщения об ошибке в соответствии с соглашением об API.
- :param message: сообщение об ошибке
- """
- return {
- 'errors': {
- 'detail': [message]
- }
- }
- async def give_handle(request):
- """
- Метод выгрузки файла
- """
- token = request.query.get('token')
- redis_pool = request.app['redis_pool']
- pgsql_pool = request.app['pgsql_pool']
- auth_ssh = request.app['auth_ssh_settings']
- file_settings = await get_file_settings(redis_pool, token)
- logging_extra = get_logging_extra(**file_settings)
- handler = None
- connection = None
- LOG.info(
- 'Started to upload file {file_id} ({file_path}).'.format(**file_settings),
- extra=logging_extra
- )
- try:
- handler, connection = await select_handler(auth_ssh, file_settings, logging_extra)
- async with handler.open(file_settings['file_path'], 'rb') as f:
- response = web.StreamResponse()
- response.content_type = 'application/octet-stream'
- response.headers['CONTENT-DISPOSITION'] = 'filename="{}"'.format(file_settings['filename'])
- response.content_length = file_settings['size']
- await response.prepare(request)
- try:
- while True:
- chunk = f.read(102400)
- if not chunk:
- LOG.info(
- 'Successful downloaded file {file_path} ({file_id}).'.format(**file_settings),
- extra=logging_extra
- )
- break
- response.write(chunk)
- finally:
- await response.write_eof()
- return response
- except asyncio.CancelledError:
- LOG.error(
- 'Download file {file_path} ({file_id}) canceled by user.'.format(**file_settings),
- extra=logging_extra
- )
- return web.HTTPOk()
- except asyncssh.DisconnectError as e:
- LOG.error(
- 'Error occured while connecting to '
- '{storage_ip}: {error}'.format(storage_ip=file_settings.get('storage_ip', ''),
- error=e),
- extra=logging_extra)
- return web.json_response(
- data=error_message('Unable to connect to the storage. {}'.format(e)), status=500
- )
- except Exception as e:
- LOG.error(
- 'Error occurred while uploading file {file_path} '
- '({file_id}): {error}'.format(**file_settings, error=e),
- extra=logging_extra
- )
- async def store_handle(request):
- """
- Основной метод загрузки файла.
- """
- token = request.query.get('token')
- redis_pool = request.app['redis_pool']
- pgsql_pool = request.app['pgsql_pool']
- auth_ssh = request.app['auth_ssh_settings']
- file_settings = await get_file_settings(redis_pool, token)
- logging_extra = get_logging_extra(**file_settings)
- additional_flags = None
- handler = None
- connection = None
- size = 0
- LOG.info(
- 'Started to upload file {file_id} ({file_path}).'.format(**file_settings),
- extra=logging_extra
- )
- reader = await request.multipart()
- upload_file = await reader.next()
- try:
- handler, connection = await select_handler(auth_ssh, file_settings, logging_extra)
- async with handler.open(file_settings['file_path'], 'wb') as f:
- first_chunk = True
- while True:
- chunk = await upload_file.read_chunk(size=102400)
- if chunk and first_chunk:
- file_magic = check_magic_header(chunk, logging_extra=logging_extra,
- **file_settings)
- if not file_magic:
- await file_failed(pgsql_pool, **file_settings)
- return web.json_response(data=error_message('Bad file format.'), status=500)
- additional_flags = get_additional_flags(file_magic)
- first_chunk = False
- if not chunk and upload_file.at_eof():
- LOG.info(
- 'Successful upload file {file_path} ({file_id}).'.format(**file_settings),
- extra=logging_extra
- )
- break
- size += len(chunk)
- check_max_file_size(file_magic, size)
- try:
- await f.write(chunk)
- except (asyncssh.sftp.SFTPError, IOError) as e:
- msg = 'There is not enough space on the storage. {}'.format(e)
- raise UploadError(msg)
- await file_active(pgsql_pool, **file_settings,
- additional_flags=additional_flags, file_size=size)
- return web.json_response(data={'id': file_settings['file_id']}, status=200)
- except asyncio.CancelledError:
- LOG.error(
- 'Upload file {file_path} ({file_id}) canceled by user.'.format(**file_settings),
- extra=logging_extra
- )
- await remove_file(handler, file_settings)
- await file_failed(pgsql_pool, **file_settings)
- return web.HTTPOk()
- except asyncssh.DisconnectError as e:
- LOG.error(
- 'Error occured while connecting to '
- '{storage_ip}: {error}'.format(storage_ip=file_settings.get('storage_ip', ''),
- error=e),
- extra=logging_extra)
- await file_failed(pgsql_pool, **file_settings)
- return web.json_response(
- data=error_message('Unable to connect to the storage. {}'.format(e)), status=500
- )
- except Exception as e:
- LOG.error(
- 'Error occurred while uploading file {file_path} '
- '({file_id}): {error}'.format(**file_settings, error=e),
- extra=logging_extra
- )
- await remove_file(handler, file_settings)
- await file_failed(pgsql_pool, **file_settings)
- return web.json_response(data=error_message(str(e)), status=500)
- finally:
- # Закрываем соединения в любом случае
- if hasattr(handler, 'exit'):
- handler.exit()
- if hasattr(connection, 'close'):
- connection.close()
- async def init_app(application: 'web.Application') -> None:
- """
- Метод инициализации приложения.
- :param application: объект приложения
- """
- try:
- application['redis_pool'] = await asyncio_redis.Pool.create(
- host=REDIS_HOST,
- port=REDIS_PORT,
- password=REDIS_PASSWORD,
- poolsize=10)
- application['pgsql_pool'] = await aiopg.create_pool(DSN)
- application['auth_ssh_settings'] = {'username': WORKER_USER,
- 'client_keys': CTRL_SSH_PRIVATE_KEY}
- except psycopg2.OperationalError as e:
- LOG.error('Error occured while connecting to database: {error}.'.format(error=e))
- await application.shutdown()
- sys.exit(1)
- async def on_start(application: 'web.Application') -> None:
- LOG.debug('loader daemon started.')
- async def on_shutdown(application: 'web.Application') -> None:
- """
- Метод определяет действия при завершении приложения.
- :param application: объект приложения
- """
- if application.get('redis_pool'):
- application['redis_pool'].close()
- if application.get('pgsql_pool'):
- application['pgsql_pool'].close()
- await application['pgsql_pool'].wait_closed()
- LOG.debug('loader daemon stopped.')
- LOG = get_async_logger()
- app = web.Application()
- app.router.add_post('/upload', store_handle)
- app.router.add_get('/download', give_handle)
- app.on_startup.append(on_start)
- app.on_shutdown.append(on_shutdown)
- loop = asyncio.get_event_loop()
- loop.run_until_complete(init_app(app))
- if __name__ == "__main__":
- web.run_app(app, host="127.0.0.1", port=UPLOAD_PORT)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement