Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- import sys
- import asyncio
- from funcy import *
- from telethon import TelegramClient
- from telethon.network.connection import tcpabridged
- from telethon.crypto import CdnDecrypter
- from telethon.errors import SessionPasswordNeededError, FileMigrateError
- from telethon.tl.functions.upload import GetFileRequest
- from telethon.tl.types import (DocumentAttributeAudio,
- DocumentAttributeFilename, InputDocumentFileLocation)
- from telethon.tl.types.upload import FileCdnRedirect
- from telethon.utils import get_display_name
- from telethon.tl.types import InputMessagesFilterMusic
- from telethon import utils
- from telethon import errors
- from telethon import functions
- from argparse import ArgumentParser
- import stat
- import logging
- import errno
- import llfuse
- #
- #
- #
- #
- api_id =
- api_hash =
- phone =
- session = 'session1'
- basedir = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '..'))
- if (os.path.exists(os.path.join(basedir, 'setup.py')) and
- os.path.exists(os.path.join(basedir, 'src', 'llfuse.pyx'))):
- sys.path.insert(0, os.path.join(basedir, 'src'))
- try:
- import faulthandler
- except ImportError:
- pass
- else:
- faulthandler.enable()
- ENTRY = 0
- MSG = 1
- DOC = 2
- loop = asyncio.get_event_loop()
- logvfs = logging.getLogger('TGFS')
- logtg = logging.getLogger('telethon')
- def each(func, seq):
- for s in seq:
- func(s)
- class TelegramFsClient(TelegramClient):
- def __init__(self, session_user_id, user_phone, api_id, api_hash):
- super().__init__(
- session_user_id,
- api_id,
- api_hash,
- connection=tcpabridged.ConnectionTcpAbridged,
- # update_workers=1
- )
- self.user_phone = user_phone
- self.api_id = api_id
- self.api_hash = api_hash
- async def auth(self):
- logtg.debug('Connecting to Telegram servers...')
- try:
- await self.connect()
- except ConnectionError:
- logtg.debug('Initial connection failed. Retrying...')
- if not await self.connect():
- logtg.debug('Could not connect to Telegram servers.')
- return
- logtg.debug('Connected')
- if not await self.is_user_authorized():
- logtg.debug('First run. Sending code request...')
- await self.sign_in(self.user_phone)
- self_user = None
- while self_user is None:
- code = input('Enter the code you just received: ')
- try:
- self_user = await self.sign_in(code=code)
- except SessionPasswordNeededError:
- pw = input('Two step verification is enabled. '
- 'Please enter your password: ')
- self_user = await self.sign_in(password=pw)
- async def get_dialogs_map(self, limit=150):
- dialogs = await self.get_dialogs(limit=limit)
- entities = walk(lambda d: d.entity, dialogs)
- return dict(zip(walk(get_display_name, entities), entities))
- async def get_file_chunk(self, msg, offset, limit):
- input_location = InputDocumentFileLocation(id=msg.media.document.id,
- access_hash=msg.media.document.access_hash,
- version=msg.media.document.version)
- file_size = msg.media.document.size
- chunk = bytes()
- _offset = working_offset = offset
- assert not _offset % 4096
- if offset % 1024 != 0:
- working_offset = (offset // 1024) * 1024
- part_size_kb = 64 # get_appropriated_part_size(size)
- part_size = int(part_size_kb * 1024)
- dc_id, input_location = utils.get_input_location(input_location)
- exported = dc_id and self.session.dc_id != dc_id
- if exported:
- try:
- sender = await self._borrow_exported_sender(dc_id)
- except errors.DcIdInvalidError:
- # Can't export a sender for the ID we are currently in
- config = await self(functions.help.GetConfigRequest())
- for option in config.dc_options:
- if option.ip_address == self.session.server_address:
- self.session.set_dc(
- option.id, option.ip_address, option.port)
- self.session.save()
- break
- # TODO Figure out why the session may have the wrong DC ID
- sender = self._sender
- exported = False
- else:
- # The used sender will also change if ``FileMigrateError`` occurs
- sender = self._sender
- try:
- _offset = working_offset
- while len(chunk) < limit + (offset - working_offset):
- _offset_of_offset = 0
- if not _offset // (1024 * 1024) == (_offset + part_size - 1) // (1024 * 1024):
- edge = (_offset // (1024 * 1024) + 1) * (1024 * 1024)
- before = edge - offset
- temp_offset = edge - part_size
- _offset = temp_offset
- _offset_of_offset = before
- try:
- result = await sender.send(functions.upload.GetFileRequest(
- input_location, _offset, part_size
- ))
- # print(result)
- if isinstance(result, FileCdnRedirect):
- raise NotImplementedError
- # cdn_decrypter, result = \
- # await CdnDecrypter.prepare_decrypter(
- # client, await self._get_cdn_client(result), result
- # )
- except FileMigrateError as e:
- sender = await self._borrow_exported_sender(e.new_dc)
- exported = True
- continue
- _offset += part_size
- if not result.bytes:
- return getattr(result, 'type', '')
- chunk += result.bytes[-_offset_of_offset:]
- except Exception as e:
- logging.debug("zero chunk")
- finally:
- if exported:
- await self._return_exported_sender(sender)
- elif sender != self._sender:
- await sender.disconnect()
- # if cdn_decrypter:
- # try:
- # await cdn_decrypter.client.disconnect()
- # except:
- # pass
- ret_data = chunk[offset - working_offset:offset - working_offset + limit]
- if len(ret_data) == 0:
- logging.debug("zero chunk")
- return ret_data
- async def get_documents(self, entity, limit, offset_id=None, music=False):
- documents = []
- messages = await self.get_messages(entity, limit=limit, offset_id=0,
- filter=InputMessagesFilterMusic if music else None)
- for msg in messages:
- if getattr(msg, 'media', None):
- if getattr(msg.media, 'document', None):
- document = msg.media.document
- document_data = dict.fromkeys(
- ['id', 'date', 'mime_type', 'size', 'type', 'attributes', 'download_func'])
- document_atrributes = dict.fromkeys(['file_name', 'title', 'performer', 'duration'])
- document_data['attributes'] = document_atrributes
- document_data['download_func'] = func_partial(self.get_file_chunk, msg)
- document_data.update(id=document.id, date=document.date, mime_type=document.mime_type,
- size=document.size, type=None)
- for attr in msg.media.document.attributes:
- if isinstance(attr, DocumentAttributeAudio):
- if getattr(attr, 'title', None):
- document_atrributes['title'] = attr.title
- if getattr(attr, 'performer', None):
- document_atrributes['performer'] = attr.performer
- if getattr(attr, 'duration', None):
- document_atrributes['duration'] = int(attr.duration)
- elif isinstance(attr, DocumentAttributeFilename):
- document_atrributes['file_name'] = attr.file_name
- documents.append((msg, document_data))
- return documents
- class tgfsFile(object):
- def __init__(self, msg, doc, inode=None, attr=None):
- self.inode = inode
- self.msg = msg
- self.doc = doc
- self.fname = (doc['attributes'].get('file_name') or "msg_%s_doc" % msg.id).encode()
- self.attr = attr
- class TestTelegramFs(llfuse.Operations):
- def __init__(self, documents):
- super(TestTelegramFs, self).__init__()
- self.documents = documents
- self.files = self.documents_to_tgfsfiles(documents, llfuse.ROOT_INODE + 1)
- self.inodes = list(self.files.keys())
- self.file_by_name = walk_keys(lambda inode: self.files[inode].fname, self.files)
- def create_tgfsfile(self, msg, doc, inode):
- entry = llfuse.EntryAttributes()
- entry.st_mode = (stat.S_IFREG | 0o644)
- entry.st_size = doc.get('size')
- stamp = int(doc['date'].timestamp() * 1e9) if 'date' in doc else int(1438467123.985654 * 1e9)
- entry.st_atime_ns = stamp
- entry.st_ctime_ns = stamp
- entry.st_mtime_ns = stamp
- entry.st_gid = os.getgid()
- entry.st_uid = os.getuid()
- entry.st_ino = inode
- file = tgfsFile(msg, doc, inode, entry)
- return file
- def documents_to_tgfsfiles(self, msg_documents, starting_inode):
- files = {}
- for inode, msg in enumerate(msg_documents, starting_inode):
- files[inode] = self.create_tgfsfile(msg[0], msg[1], inode)
- return files
- def getattr(self, inode, ctx=None):
- if inode == llfuse.ROOT_INODE:
- entry = llfuse.EntryAttributes()
- entry.st_mode = (stat.S_IFDIR | 0o755)
- entry.st_size = 0
- stamp = int(1438467123.985654 * 1e9)
- entry.st_atime_ns = stamp
- entry.st_ctime_ns = stamp
- entry.st_mtime_ns = stamp
- entry.st_gid = os.getgid()
- entry.st_uid = os.getuid()
- entry.st_ino = llfuse.ROOT_INODE
- return entry
- elif inode in self.files:
- return self.files[inode].attr
- else:
- raise llfuse.FUSEError(errno.ENOENT)
- def lookup(self, parent_inode, name, ctx=None):
- if parent_inode != llfuse.ROOT_INODE or name not in self.file_by_name:
- raise llfuse.FUSEError(errno.ENOENT)
- return self.file_by_name[name].attr
- def opendir(self, inode, ctx):
- if inode != llfuse.ROOT_INODE:
- raise llfuse.FUSEError(errno.ENOENT)
- return inode
- def readdir(self, fh, off):
- assert fh == llfuse.ROOT_INODE
- logvfs.debug("readdir(%s,%s)" % (fh, off))
- if off == 0:
- others = self.inodes
- else:
- others = self.inodes[self.inodes.index(off):]
- for inode, inext in with_next(others):
- if inext:
- file = self.files[inode]
- yield (file.fname, file.attr, inext)
- def open(self, inode, flags, ctx):
- if flags & os.O_RDWR or flags & os.O_WRONLY:
- raise llfuse.FUSEError(errno.EPERM)
- return inode
- def read(self, fh, off, size):
- print("read: %s %s %s = %s; " % (fh, off, size, off + size), end='')
- doc = self.files[fh].doc
- reading_func = doc['download_func']
- chunk = loop.run_until_complete(reading_func(off, size))
- print("read: %s" % len(chunk))
- return chunk
- def init_logging(debug=False):
- formatter = logging.Formatter('%(asctime)s.%(msecs)03d %(threadName)s: '
- '[%(name)s] %(message)s', datefmt="%Y-%m-%d %H:%M:%S")
- _log_handler = logging.StreamHandler()
- _log_handler.setFormatter(logging.Formatter('%(asctime)s.%(msecs)03d %(threadName)s: \t\t\t\t\t\t\t\t\t\t'
- '[%(name)s] %(message)s', datefmt="%Y-%m-%d %H:%M:%S"))
- logvfs.addHandler(_log_handler)
- handler = logging.StreamHandler()
- handler.setFormatter(formatter)
- root_logger = logging.getLogger()
- # logging.getLogger('telethon').setLevel(logging.INFO)
- if debug:
- handler.setLevel(logging.DEBUG)
- root_logger.setLevel(logging.DEBUG)
- logtg.setLevel(logging.DEBUG)
- else:
- handler.setLevel(logging.INFO)
- root_logger.setLevel(logging.INFO)
- logtg.setLevel(logging.INFO)
- root_logger.addHandler(handler)
- def parse_args():
- '''Parse command line'''
- parser = ArgumentParser()
- parser.add_argument('--dir', type=str, required='--id' in sys.argv,
- help='Where to mount the file system')
- parser.add_argument('--debug', action='store_true', default=False,
- help='Enable debugging output')
- parser.add_argument('--debug-fuse', action='store_true', default=True,
- help='Enable FUSE debugging output')
- parser.add_argument('--list-dialogs', action='store_true', default=False,
- help='List available telegram dialogs')
- parser.add_argument('--music', action='store_true', default=False,
- help='Only music')
- parser.add_argument('--id', default=None, required='--dir' in sys.argv,
- help='Dialog id')
- parser.add_argument('--limit', default=10000,
- help='limit messages')
- return parser.parse_args()
- def main():
- options = parse_args()
- init_logging(options.debug)
- client = TelegramFsClient(session, phone, api_id, api_hash)
- loop.run_until_complete(client.auth())
- if options.list_dialogs:
- dialogs = loop.run_until_complete(client.get_dialogs_map())
- for d in dialogs:
- print(d, dialogs[d].id)
- return
- else:
- dialogs = loop.run_until_complete(client.get_dialogs_map())
- cykabot = loop.run_until_complete(client.get_entity(int(options.id)))
- documents = loop.run_until_complete(
- client.get_documents(cykabot, limit=int(options.limit), music=options.music))
- testfs = TestTelegramFs(documents)
- fuse_options = set(llfuse.default_options)
- fuse_options.add('fsname=lltest')
- if options.debug_fuse:
- fuse_options.add('debug')
- llfuse.init(testfs, options.dir, fuse_options)
- try:
- llfuse.main(workers=1)
- except:
- llfuse.close(unmount=False)
- raise
- llfuse.close()
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement