Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from twisted.internet.protocol import Protocol, Factory
- from twisted.internet import reactor
- class Client(Protocol):
- ip: str = None
- login: str = None
- factory: 'Chat'
- unique_login: bool = True
- def __init__(self, factory):
- """
- Инициализация фабрики клиента
- :param factory:
- """
- self.factory = factory
- def connectionMade(self):
- """
- Обработчик подключения нового клиента
- """
- self.ip = self.transport.getHost().host
- def dataReceived(self, data: bytes):
- """
- Обработчик нового сообщения от клиента
- :param data:
- """
- message = 'login:' + data.decode().replace('\n', '')
- message = message.replace('login:', '')
- if self.login is None:
- login = message
- if login not in self.factory.logins:
- """
- Уникальный логин
- """
- self.login = login
- self.factory.clients.append(self)
- self.factory.logins.append(self.login)
- self.unique_login = True
- print(f"Client connected: {self.login} : {self.ip}")
- self.transport.write("Welcome to the chat v0.2\n***************************\n".encode())
- for i in self.factory.history:
- self.transport.write(i.encode())
- self.factory.notify_all_users(f'Client connected: {self.login}')
- self.factory.history.append(f'Client connected: {self.login}\n')
- else:
- self.transport.write("Error this login already exists!\nEnter new login >>> ".encode())
- print('New client tried connect with existed login')
- else:
- server_message = f"{self.login}: {message}"
- self.factory.notify_all_users(server_message)
- self.factory.history.append(server_message + '\n')
- print(server_message)
- def connectionLost(self, reason=None):
- """
- Обработчик отключения клиента
- :param reason:
- """
- if self.unique_login:
- self.factory.clients.remove(self)
- self.factory.logins.remove(self.login)
- print(f"Client disconnected:\n{self.login}:{self.ip}")
- class Chat(Factory):
- clients: list
- logins: list
- history: list
- def __init__(self):
- """
- Инициализация сервера
- """
- self.clients = []
- self.logins = []
- self.history = []
- print("*" * 10, "\nStart server \nCompleted [OK]")
- def startFactory(self):
- """
- Запуск процесса ожидания новых клиентов
- :return:
- """
- print("\n\nStart listening for the clients...")
- def buildProtocol(self, addr):
- """
- Инициализация нового клиента
- :param addr:
- :return:
- """
- return Client(self)
- def notify_all_users(self, data: str):
- """
- Отправка сообщений всем текущим пользователям
- :param data:
- :return:
- """
- for user in self.clients:
- user.transport.write(f"{data}\n".encode())
- if __name__ == '__main__':
- reactor.listenTCP(7410, Chat())
- reactor.run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement