Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- # Version alfa 2.0
- # Creado en python 3.9.1
- # Requiere requests 2.26.0
- from io import FileIO
- import re
- import string
- import os.path as patos
- import pickle
- from typing import BinaryIO
- import requests
- import random
- import time
- from requests.models import Response
- class BoardData:
- """
- Clase que almacena los datos asociados a un tablon
- """
- # Tamaño maximo del archivo en bytes
- MAX_FILE_SIZE = 25165824
- # Tamaño maximo del mensaje
- MAX_MESSAGE_LENGTH = 8000
- def __init__(self, name:str, fname:str='', files:list[str]=None, spoiler=False,
- sfw=True, **kwargs):
- """
- Constructor
- name: El nombre del tablon mostrado en la url
- fname: El nombre completo del tablon, opcional
- files: Una lista con las extenciones de los archivos permitidos, opcional
- spoiler: Indica si es posible utilizar una imagen con spoiler, defecto False
- sfw: Indica si el tablon es SFW, defecto True
- kwargs
- fsize: Indica el tamaño maximo de archivo para este tablon, defecto MAX_FILE_SIZE
- """
- if name is None:
- raise ValueError('Argument "name" is None')
- if name.isspace() or len(name) == 0:
- raise ValueError('Argument "name" is an empty or blank string')
- self.name = name
- self.full_name = fname
- self.can_spoiler = spoiler
- self.allowed_files = files
- self.is_sfw = sfw
- self.max_file_size = kwargs.get('fsize', self.MAX_FILE_SIZE)
- def accepts_file(self, p):
- """Determina si es posible postear un archivo en el tablon dada su ruta"""
- if not patos.isfile(p) or patos.islink(p) or patos.getsize(p) > self.max_file_size:
- return False
- f, ext = patos.splitext(p)
- return ext in self.allowed_files
- def get_url(self):
- return 'https://www.hispachan.org/' + self.name
- class FileExts:
- """
- Almacena las extensiones de archivo permitidas para varios tablones
- """
- IMAGE = ['.gif', '.jpg', '.png']
- VIDEO = ['.mp4', '.webm']
- AUDIO = ['.mp3', '.ogg']
- DOCUMENT = ['.pdf']
- COMIC = ['.cbr', '.cbz']
- OTHER = ['.swf']
- AUDIOVISUAL = IMAGE + VIDEO + AUDIO
- STANDARD = AUDIOVISUAL + DOCUMENT + OTHER
- ALL = STANDARD + COMIC
- BOARDS = {
- 'ac': BoardData('ac', 'Animacion y comics', FileExts.ALL, True, True),
- 'art': BoardData('art', 'Arte', FileExts.STANDARD, False, False),
- 'a': BoardData('a', 'Anime, manga y cultura japonesa', FileExts.STANDARD, True),
- 'b': BoardData('b', 'Balcón', FileExts.STANDARD, False, False),
- 'i': BoardData('i', 'Adictos a internet', FileExts.STANDARD, False, True),
- 'pl': BoardData('pl', 'Plaza', FileExts.STANDARD, False, False),
- 't': BoardData('t', 'Tecnología', FileExts.STANDARD, False, True),
- 'v': BoardData('v', 'Videojuegos', FileExts.STANDARD, True, True),
- }
- class PostPath:
- def __init__(self) -> None:
- pass
- class PostData:
- __thread_format = 'https://www.hispachan.org/{0}/res/{1}.html'
- OPTION_SAGE = 'sage'
- OPTION_BUMP = ''
- def __init__(self, pid:int=0, board:BoardData=None, msg='', opt='', fname='', **kw):
- """
- Crea un objeto que almacena la informacion de un post
- pid: El id del hilo al que este post pertenece
- board: El tablon al cual pertenece el hilo
- msg: El mensaje del post
- opt: Sage o bump
- fname: La ruta del archivo adjuntado al post
- kwargs
- spoiler: Indica si postear el archivo con la opcion de spoiler
- id: Indica el numero de este post, defecto -1
- """
- self.parent_id = pid
- self.board = board
- self.post_id = kw.get('id', -1)
- self.message = msg
- self.option = opt
- self.file = fname
- self.spoiler = False
- def is_thread(self):
- """
- Indica si este post es un hilo
- """
- return self.parent_id == 0
- def set_parent_path(self, p: str):
- """
- Fija el valor de board y thread basandose en una cadena con formato /board/thread
- Ejemplos: /a/12345 /b/3333 /c/1212 /d/1144
- """
- splited = p.split('/')
- try:
- tn = int(splited[2])
- except:
- raise ValueError('Could not get the post number from the given path')
- if tn <= 0:
- raise ValueError('Invalid value for post number')
- self.parent_id = tn
- # El primer elemento del array despues de invocar a split con el formato
- # /board/thread es una cadena vacia
- if splited[1] in BOARDS:
- self.board = BOARDS[splited[1]]
- else:
- raise ValueError('Board does not exist')
- def get_parent_path(self) -> str:
- return str.format('/{0}/{1}', self.board.name, self.parent_id)
- parent_path = property(get_parent_path, set_parent_path)
- def get_thread_url(self):
- return self.__thread_format.format(self.board.name, self.parent_id)
- def get_url(self):
- return self.__thread_format.format(self.board.name, self.post_id)
- class Poster:
- __pwdchars = string.ascii_letters + string.digits
- __pwdlen = 8
- __fail_urls = frozenset()
- __board_php = 'https://www.hispachan.org/board.php'
- __lpost_re = re.compile('(?P<board>[a-zA-Z]+)ZETA(?P<id>\d+)')
- def __init__(self, abort=True, tries=2, try_dealy=8, cookiesfile='./hemuhi.cookies'):
- self.__session = None
- self.on_posted = None
- self.on_post_fail = None
- self.on_try_fail = None
- self.abort_on_fail = abort
- self.cookies_file = cookiesfile
- self.tries = tries
- self.try_delay = try_dealy
- def begin_postintg(self):
- """
- Carga las cookies desde el archivo e inicia la session de requests
- """
- self.__session = requests.Session()
- # Carga las cookies desde el archivo, si no existen __check_cookies las crea
- if patos.exists(self.cookies_file):
- with open(self.cookies_file, mode='rb') as file:
- self.__session.cookies = pickle.load(file, encoding='utf-8')
- self.__check_cookies()
- def end_posting(self):
- """
- Guarda las cookies en el archivo y cierra la session de requests
- """
- open_mod = 'wb' if patos.exists(self.cookies_file) else 'xb'
- with open(self.cookies_file, mode=open_mod) as file:
- pickle.dump(self.__session.cookies, protocol=0, file=file)
- self.__session.close()
- def __enter__(self):
- self.begin_postintg()
- return self
- def __exit__(self, exc_type, exc_value, traceback):
- self.end_posting()
- @classmethod
- def __generate_password(cls):
- """Genera un password para el post, es la funcion get_password de kusaba.js"""
- return ''.join(random.sample(cls.__pwdchars, cls.__pwdlen))
- @staticmethod
- def __form_data_from_post(post: PostData) -> tuple[dict[str], dict[str, BinaryIO]]:
- """
- Obtiene la representacion de un post en forma de un diccionario con la form data
- para ser enviado
- """
- fdata = {
- 'board': post.board.name, # El tablon al cual pertenece el hilo
- 'replythread': post.parent_id, # El hilo al cual se responde
- 'MAX_FILE_SIZE': post.board.max_file_size, # Tamaño maximo de archivo (24MB)
- 'em': post.option, # La oocuib del post (bump, noko, sage, ...)?
- 'message': post.message, # El mensaje del post
- 'email': '' # Campo de email, posiblemete legado de futabas
- }
- files = {}
- # Agrega la opcion de spoiler si la activo y el tablon lo permite
- if post.spoiler and post.board.can_spoiler:
- fdata['spoiler'] = 'on'
- # Si no se desea postear un archivo
- if post.file is None or post.file == '':
- return (fdata, files)
- try:
- files['imagefile'] = open(post.file, 'rb')
- # Aun se desconoce si otro tipo de archivos utilizan otro nombre de campo
- except:
- raise ValueError('Could not open file')
- return (fdata, files)
- def __check_cookies(self):
- """
- Verifica si las contraseñas y 'PHPSESSID' estan almacenadas en la session.
- Si no existen, genera las contraseñas y obtiene PHPSESSID
- """
- if 'PHPSESSID' not in self.__session.cookies:
- response = self.__session.get(self.__board_php)
- # Visita requerida para obtener la cookie 'PHPSESSID'
- # Solo fui capaz de obtenerla a traves de esta url
- if not (response.ok and 'PHPSESSID' in response.cookies):
- raise Exception('Could not get the PHPSESSID cookie!')
- if 'undefined' not in self.__session.cookies:
- self.__session.cookies['undefined'] = self.__generate_password()
- if 'postpassword' not in self.__session.cookies:
- self.__session.cookies['postpassword'] = self.__generate_password()
- def _posted(self, post, response):
- """
- Funcion invocada cuando un post es posteado exitosamente
- """
- if self.on_posted is not None:
- self.on_posted(post, response)
- def _post_failed(self, post, response):
- """
- Funcion invocada cuando no fue posible postear un post
- """
- if self.on_post_fail is not None:
- self.on_post_fail(post, response)
- def _try_failed(self, post, response, number):
- """
- Funcion invocada cuando falla uno de los intentos para postear un post
- """
- if self.on_try_fail is not None:
- self.on_try_fail(post, response, number)
- def __succesfull_post(self, response: Response, post: PostData) -> bool:
- """
- Determina si un post fue posteado correctamente
- """
- if not response.ok or response.url in self.__fail_urls:
- return False
- board_url = post.board.get_url()
- # Las respuestas exitosas redirigen a https://www.hispachan.org/abc/ (no noko)
- # o a https://www.hispachan.org/abc/res/12345.html (noko)
- return board_url == response.url or board_url in response.url
- def __last_post_from_cookies(self):
- """
- Intenta obtener el id del ultimo post almacenado en la cookie last_post
- """
- m = self.__lpost_re.match(self.__session.cookies['last_post'])
- post_id = int(m['id'])
- board_name = m['board']
- return (board_name, post_id)
- def __post(self, post: PostData):
- """
- Envia el post y verifica que haya sido posteado correctamente
- Regresa el numero del post si fue exitoso o -1 si fallo
- """
- current_try = 1
- post_id = -1
- response = None
- data, files = self.__form_data_from_post(post)
- # minetras haya intentos y el post no haya sido posteado
- while current_try <= self.tries and post_id == -1:
- response = self.__session.post(self.__board_php, data=data, files=files)
- if self.__succesfull_post(response, post):
- board, post_id = self.__last_post_from_cookies()
- post.post_id = post_id
- self._posted(post, response)
- else:
- # Si el itento fallo aumentar el valor del intento e invocar _try_failed
- self._try_failed(post, response, current_try)
- current_try += 1
- time.sleep(self.try_delay)
- # Cerrar el archivo abierto
- for fname in files:
- if not files[fname].closed:
- files[fname].close()
- # Invocar _post_failed si fallo
- if post_id == -1:
- self._post_failed(post, response)
- return post_id
- def post(self, post: PostData) -> int:
- """
- Crea un post en un hilo y regresa su id o -1 si no fue enviado,
- actualmente no se puede crear un hilo con este metodo
- """
- return self.__post(post)
- def multi_post(self, posts: PostData, delay: float=1.0) -> list[int]:
- """
- Crea multiples posts en un hilo,
- regresa una lista con los ids de los posts creados
- """
- ids = []
- for post in posts:
- post_id = self.__post(post)
- if self.abort_on_fail and post_id == -1:
- return posts
- ids.append(post_id)
- time.sleep(delay)
- return posts
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement