Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import socket
- import time
- import random
- import threading
- import sys
- import os
- from enum import Enum
- # BGBLinkCable class by TheZZAZZGlitch
- class BGBLinkCable():
- def __init__(self,ip,port):
- self.ip = ip
- self.port = port
- self.ticks = 0
- self.frames = 0
- self.received = 0
- self.sent = 0
- self.transfer = -1
- self.lock = threading.Lock()
- self.exchangeHandler = None
- def start(self):
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.sock.connect((self.ip, self.port))
- threading.Thread(target=self.networkLoop, daemon=True).start()
- def queryStatus(self):
- status = [0x6a,0,0,0,0,0,0,0]
- self.ticks += 1
- self.frames += 8
- status[2] = self.ticks % 256
- status[3] = (self.ticks // 256) % 256
- status[5] = self.frames % 256
- status[6] = (self.frames // 256) % 256
- status[7] = (self.frames // 256 // 256) % 256
- return bytes(status)
- def getStatus(self):
- return (self.frames, self.ticks, self.received, self.sent)
- def networkLoop(self):
- while True:
- try:
- data = bytearray(self.sock.recv(8))
- except KeyboardInterrupt:
- raise
- if len(data) == 0:
- break
- if data[0] == 0x01:
- self.sock.send(data)
- self.sock.send(b'\x6c\x03\x00\x00\x00\x00\x00\x00')
- continue
- if data[0] == 0x6C:
- self.sock.send(b'\x6c\x01\x00\x00\x00\x00\x00\x00')
- self.sock.send(self.queryStatus())
- continue
- if data[0] == 0x65:
- continue
- if data[0] == 0x6A:
- self.sock.send(self.queryStatus())
- continue
- if (data[0] == 0x69 or data[0] == 0x68):
- self.received+=1
- self.sent+=1
- data[1] = self.exchangeHandler(data[1], self)
- self.sock.send(data)
- self.sock.send(self.queryStatus())
- continue
- print("Unknown command " + hex(data[0]))
- print(data)
- def setExchangeHandler(self, ex):
- self.exchangeHandler = ex
- pass
- # Mobile Adapter GB implementation by Háčky
- class TransferState(Enum):
- Waiting = 0 # Waiting for the first byte of the preamble (0x99).
- Preamble = 1 # Expecting the second byte of the preamble (0x66).
- PacketStart = 2 # Expecting the packet start.
- Packet01 = 3 # Expecting packet offset 0x01 (unused?)
- Packet02 = 4 # Expecting packet offset 0x02 (unused?)
- PacketLen = 5 # Expecting the packet length.
- PacketBody = 6 # Expecting the packet body.
- Checksum1 = 7 # Expecting the first byte of the checksum.
- Checksum2 = 8 # Expecting the second byte of the checksum.
- DeviceID = 9 # Expecting the device ID.
- StatusByte = 10 # Expecting the status byte (0x00 for sender, 0x80 ^ packetID for receiver)
- adapter_state = TransferState.Waiting
- is_sender = False
- packet_data = {'id': 0, 'size': 0, 'data': [], 'checksum': 0}
- line_busy = False
- port = 0
- http_ready = True
- pop_session_begun = False
- def mobileAdapter(b, obj):
- global adapter_state, is_sender, packet_data
- if(is_sender):
- # This does not handle errors flagged by Game Boy!
- if(adapter_state == TransferState.Waiting):
- adapter_state = TransferState.Preamble
- return 0x99
- elif(adapter_state == TransferState.Preamble):
- adapter_state = TransferState.PacketStart
- return 0x66
- elif(adapter_state == TransferState.PacketStart):
- adapter_state = TransferState.Packet01
- return packet_data['id']
- elif(adapter_state == TransferState.Packet01):
- adapter_state = TransferState.Packet02
- return 0x00
- elif(adapter_state == TransferState.Packet02):
- adapter_state = TransferState.PacketLen
- return 0x00
- elif(adapter_state == TransferState.PacketLen):
- if(packet_data['size'] > 0):
- adapter_state = TransferState.PacketBody
- else:
- adapter_state = TransferState.Checksum1
- return packet_data['size']
- elif(adapter_state == TransferState.PacketBody):
- packet_data['size'] -= 1
- if(packet_data['size'] == 0):
- adapter_state = TransferState.Checksum1
- return packet_data['data'][-1 - packet_data['size']]
- elif(adapter_state == TransferState.Checksum1):
- adapter_state = TransferState.Checksum2
- return packet_data['checksum'] >> 8
- elif(adapter_state == TransferState.Checksum2):
- adapter_state = TransferState.DeviceID
- return packet_data['checksum'] & 0xFF
- elif(adapter_state == TransferState.DeviceID):
- adapter_state = TransferState.StatusByte
- return 0x88
- elif(adapter_state == TransferState.StatusByte):
- adapter_state = TransferState.Waiting
- is_sender = False
- return 0x00
- else: # adapter is receiving
- if(adapter_state == TransferState.Waiting):
- if(b == 0x99):
- adapter_state = TransferState.Preamble
- packet_data = {'id': 0, 'size': 0, 'data': bytearray(), 'checksum': 0} # reset
- elif(adapter_state == TransferState.Preamble):
- if(b == 0x66):
- adapter_state = TransferState.PacketStart
- else: # fail
- adapter_state = TransferState.Waiting
- return 0xf1
- elif(adapter_state == TransferState.PacketStart):
- packet_data['id'] = b
- adapter_state = TransferState.Packet01
- elif(adapter_state == TransferState.Packet01):
- adapter_state = TransferState.Packet02
- elif(adapter_state == TransferState.Packet02):
- adapter_state = TransferState.PacketLen
- elif(adapter_state == TransferState.PacketLen):
- packet_data['size'] = b
- if(packet_data['size'] > 0):
- adapter_state = TransferState.PacketBody
- else:
- adapter_state = TransferState.Checksum1
- elif(adapter_state == TransferState.PacketBody):
- packet_data['data'].append(b)
- packet_data['size'] -= 1
- if(packet_data['size'] == 0):
- adapter_state = TransferState.Checksum1
- elif(adapter_state == TransferState.Checksum1):
- packet_data['checksum'] = b << 8
- adapter_state = TransferState.Checksum2
- elif(adapter_state == TransferState.Checksum2):
- packet_data['checksum'] += b
- adapter_state = TransferState.DeviceID
- elif(adapter_state == TransferState.DeviceID):
- adapter_state = TransferState.StatusByte
- return 0x88
- elif(adapter_state == TransferState.StatusByte):
- adapter_state = TransferState.Waiting
- is_sender = True
- return craftResponsePacket()
- # if nothing else, send 0x4B
- return 0x4B
- def craftResponsePacket():
- global packet_data, configuration_data, line_busy, port, http_ready, pop_session_begun, response_text
- rval = 0x80 ^ packet_data['id']
- if(packet_data['id'] == 0x10):
- print('>> 10 %s' % packet_data['data'].decode())
- print('<< 10 %s' % packet_data['data'].decode())
- port = 0
- # Echo that packet
- elif(packet_data['id'] == 0x11):
- print('>> 11 Closing session')
- print('<< 11 Closing session\n\n')
- port = 0
- line_busy = False
- # Echo that packet
- elif(packet_data['id'] == 0x12):
- print('>> 12 Dial %s' % packet_data['data'][1:].decode())
- print('<< 12 Dialed')
- # Empty response
- packet_data['data'] = bytearray()
- line_busy = True
- elif(packet_data['id'] == 0x13):
- print('>> 13 Hang up')
- print('<< 13 Hung up')
- line_busy = False
- # Echo that packet
- elif(packet_data['id'] == 0x15):
- if(port == 110): # POP
- if(len(packet_data['data']) <= 1):
- print('>> 15 No POP traffic to send')
- else:
- print('>> 15 Sending POP traffic:')
- print(packet_data['data'][1:].decode())
- packet_data['id'] = 0x95
- packet_data['data'] = bytearray(b'\x00') + craftPOPResponse()
- if(len(packet_data['data']) <= 1):
- print('<< 95 No POP traffic received')
- else:
- print('<< 95 POP response received:')
- print(packet_data['data'][1:].decode())
- elif(port == 80): # HTTP
- if(len(packet_data['data']) <= 1):
- print('>> 15 No HTTP traffic to send')
- else:
- print('>> 15 Send HTTP traffic:')
- try:
- print(packet_data['data'][1:].decode())
- except UnicodeDecodeError:
- hexDump(packet_data['data'][1:])
- if(http_ready or len(response_text) > 0):
- packet_data['id'] = 0x95
- packet_data['data'] = bytearray(b'\x00') + craftHTTPResponse()
- if(len(packet_data['data']) <= 1):
- print('<< 95 No HTTP traffic received')
- else:
- print('<< 95 HTTP response received:')
- try:
- print(packet_data['data'][1:].decode())
- except UnicodeDecodeError:
- hexDump(packet_data['data'][1:])
- else:
- packet_data['id'] = 0x9F
- packet_data['data'] = bytearray(b'\x00')
- print('<< 9F HTTP server closed connection')
- else:
- print('>> 15 Unknown protocol')
- print('<< 15 Echoing data')
- elif(packet_data['id'] == 0x17):
- print('>> 17 Check telephone line')
- if line_busy:
- print('<< 17 Line busy')
- packet_data['data'] = bytearray(b'\x05')
- else:
- print('<< 17 Line free')
- packet_data['data'] = bytearray(b'\x00')
- elif(packet_data['id'] == 0x19):
- offset = packet_data['data'][0]
- length = packet_data['data'][1]
- print('>> 19 Read %s bytes from offset %s of configuration data' % (length, offset))
- print('<< 19 Reading configuration data:')
- hexDump(configuration_data[offset : offset + length])
- packet_data['data'] = bytearray([offset]) + configuration_data[offset : offset + length]
- elif(packet_data['id'] == 0x1A):
- offset = packet_data['data'][0]
- length = len(packet_data['data']) - 1
- print('>> 1A Write %s bytes at offset %s of configuration data:' % (length, offset))
- hexDump(packet_data['data'][1:])
- configuration_data[offset : offset + length] = packet_data['data'][1:]
- print('<< 1A Configuration data written')
- # Null response
- packet_data['data'] = bytearray()
- elif(packet_data['id'] == 0x21):
- print('>> 21 Log in to DION')
- print('<< 21 Logged in')
- packet_data['data'] = bytearray(b'\x00')
- elif(packet_data['id'] == 0x22):
- print('>> 22 Log out of DION')
- print('<< 22 Logged out')
- port = 0
- # Echo that packet
- elif(packet_data['id'] == 0x23):
- port = (packet_data['data'][4] << 8) + packet_data['data'][5]
- print('>> 23 Connect to %s.%s.%s.%s:%s' %
- (packet_data['data'][0], packet_data['data'][1], packet_data['data'][2], packet_data['data'][3], port))
- print('<< A3 Connected')
- packet_data['id'] = 0xA3
- packet_data['data'] = bytearray(b'\xFF')
- http_ready = True
- pop_session_begun = False
- elif(packet_data['id'] == 0x24):
- print('>> 24 Close connection to server')
- print('<< 24 Connection closed')
- port = 0
- # Echo that packet
- elif(packet_data['id'] == 0x28):
- print('>> 28 DNS query for %s' % packet_data['data'].decode())
- print('<< 28 Use fake IP address 250.202.222.0')
- packet_data['data'] = bytearray(b'\xFA\xCA\xDE\x00')
- else:
- print('>> %02x Unknown packet' % packet_data['id'])
- print('<< %02x Echoing that packet' % packet_data['id'])
- packet_data['size'] = len(packet_data['data'])
- checksum = packet_data['id'] + packet_data['size']
- for byte in packet_data['data']:
- checksum += byte
- packet_data['checksum'] = checksum
- return rval
- # This e-mail will be read by Pokémon Crystal’s Trade Corner
- email = (
- b'From: MISSINGNO.\r\n' # The game requires the From and Date headers to be present, even though it doesn’t read them
- b'Date: Sat, 27 Jan 2001 12:34:56 +0900\r\n'
- b'X-Game-code: CGB-BXTJ-00\r\n'
- b'X-Game-result: 1 8abe3fd7 0210 0310 1\r\n' # Change this to match your TID/SID and the trade you’re making
- b'X-GBmail-type: exclusive\r\n' # This tells Mobile Trainer not to mess with it
- b'\r\n'
- # Base64-encoded Pokémon data
- b'meOQroYQtyEcEACKvgAGTAAAAAAAAAAAAAB2OyMOIwBHAI4SDgAAACYAJgATABEA'
- b'FQARABGZ45CuhkOsQ1BQoRmBpouMkp8L4xrjyX9/f05DrEOg46bJf5KMk6CsjeML'
- b'meOQroaKvhC3\r\n')
- response_text = bytearray()
- def craftPOPResponse():
- global packet_data, pop_session_begun, response_text, email
- pop_text = bytearray()
- if(len(response_text) == 0):
- if(len(packet_data['data']) > 1):
- pop_text = packet_data['data'][1:]
- if(pop_text.find(b'STAT') == 0 or pop_text.find(b'LIST 1') == 0):
- response_text += ('+OK 1 %s\r\n' % len(email)).encode()
- elif(pop_text.find(b'LIST ') == 0):
- response_text += b'-ERR\r\n'
- elif(pop_text.find(b'LIST') == 0):
- response_text += ('+OK\r\n1 %s\r\n.\r\n' % len(email)).encode()
- elif(pop_text.find(b'TOP 1 0') == 0):
- response_text += b'+OK\r\n' + email.split(b'\r\n\r\n')[0] + b'\r\n\r\n.\r\n' # e-mail headers only
- elif(pop_text.find(b'RETR 1') == 0):
- response_text += b'+OK\r\n' + email + b'\r\n.\r\n'
- elif(len(pop_text) > 0 or not pop_session_begun): # Reply +OK at start of session or to any other command
- pop_session_begun = True
- response_text += b'+OK\r\n'
- else: # something went wrong?
- response_text += b'-ERR\r\n'
- bytes_to_send = min(254, len(response_text)) # Can’t send more than 254 bytes at once
- text_to_send = response_text[:bytes_to_send]
- response_text = response_text[bytes_to_send:]
- return text_to_send
- http_text = bytearray()
- http_responses = {
- # Trade Corner
- b'GET /cgb/download?name=/01/CGB-BXTJ/exchange/index.txt HTTP/1.0':
- { 'response': b'HTTP/1.0 200 OK', 'headers': {},
- 'content': (
- b'http://gameboy.datacenter.ne.jp/cgb/upload?name=/01/CGB-BXTJ/exchange/10upload.cgi\r\n'
- b'http://gameboy.datacenter.ne.jp/cgb/upload?name=/01/CGB-BXTJ/exchange/cancel.cgi\r\n'
- )},
- b'GET /cgb/upload?name=/01/CGB-BXTJ/exchange/10upload.cgi HTTP/1.0':
- { 'response': b'HTTP/1.0 401 Unauthorized', 'headers': {'Gb-Auth-ID': b'HAIL GIOVANNI'}, 'content': b'' },
- b'POST /cgb/upload?name=/01/CGB-BXTJ/exchange/10upload.cgi HTTP/1.0':
- { 'response': b'HTTP/1.0 200 OK', 'headers': {}, 'content': b'' },
- b'POST /cgb/upload?name=/01/CGB-BXTJ/exchange/cancel.cgi HTTP/1.0':
- { 'response': b'HTTP/1.0 200 OK', 'headers': {}, 'content': b'' },
- # Egg Ticket event
- b'GET /cgb/download?name=/01/CGB-BXTJ/tamago/index.txt HTTP/1.0':
- { 'response': b'HTTP/1.0 200 OK', 'headers': {},
- 'content': (
- b'http://gameboy.datacenter.ne.jp/cgb/download?name=/01/CGB-BXTJ/tamago/tamagoXX.pkm\r\n'
- b'0ccc170a2e1447ad5eb778518ccca147b0a3bfffd1eae3d6f0a2ffff\r\n'
- )}
- }
- odd_eggs = {
- 0: bytes.fromhex('AC 00 54 CC 92 00 00 08 00 00 7D 00 00 00 00 00' # Pichu
- '00 00 00 00 00 00 00 1E 14 0A 00 14 00 00 00 05'
- '00 00 00 00 00 11 00 09 00 06 00 0B 00 08 00 08'
- '8F 9D 09 50 50 50'),
- 1: bytes.fromhex('AC 00 54 CC 92 00 00 01 00 00 7D 00 00 00 00 00'
- '00 00 00 00 00 2A AA 1E 14 0A 00 14 00 00 00 05'
- '00 00 00 00 00 11 00 09 00 07 00 0C 00 09 00 09'
- '8F 9D 09 50 50 50'),
- 2: bytes.fromhex('AD 00 01 CC 92 00 00 10 00 00 7D 00 00 00 00 00' # Cleffa
- '00 00 00 00 00 00 00 23 14 0A 00 14 00 00 00 05'
- '00 00 00 00 00 14 00 07 00 07 00 06 00 09 00 0A'
- '8F 9D 09 50 50 50'),
- 3: bytes.fromhex('AD 00 01 CC 92 00 00 03 00 00 7D 00 00 00 00 00'
- '00 00 00 00 00 2A AA 23 14 0A 00 14 00 00 00 05'
- '00 00 00 00 00 14 00 07 00 08 00 07 00 0A 00 0B'
- '8F 9D 09 50 50 50'),
- 4: bytes.fromhex('AE 00 2F CC 92 00 00 10 00 00 7D 00 00 00 00 00' # Igglybuff
- '00 00 00 00 00 00 00 0F 14 0A 00 14 00 00 00 05'
- '00 00 00 00 00 18 00 08 00 06 00 06 00 09 00 07'
- '8F 9D 09 50 50 50'),
- 5: bytes.fromhex('AE 00 2F CC 92 00 00 03 00 00 7D 00 00 00 00 00'
- '00 00 00 00 00 2A AA 0F 14 0A 00 14 00 00 00 05'
- '00 00 00 00 00 18 00 08 00 07 00 07 00 0A 00 08'
- '8F 9D 09 50 50 50'),
- 6: bytes.fromhex('EE 00 01 7A 92 00 00 0E 00 00 7D 00 00 00 00 00' # Smoochum
- '00 00 00 00 00 00 00 23 1E 0A 00 14 00 00 00 05'
- '00 00 00 00 00 13 00 08 00 06 00 0B 00 0D 00 0B'
- '8F 9D 09 50 50 50'),
- 7: bytes.fromhex('EE 00 01 7A 92 00 00 02 00 00 7D 00 00 00 00 00'
- '00 00 00 00 00 2A AA 23 1E 0A 00 14 00 00 00 05'
- '00 00 00 00 00 13 00 08 00 07 00 0C 00 0E 00 0C'
- '8F 9D 09 50 50 50'),
- 8: bytes.fromhex('F0 00 34 92 00 00 00 0A 00 00 7D 00 00 00 00 00' # Magby
- '00 00 00 00 00 00 00 19 0A 00 00 14 00 00 00 05'
- '00 00 00 00 00 13 00 0C 00 08 00 0D 00 0C 00 0A'
- '8F 9D 09 50 50 50'),
- 9: bytes.fromhex('F0 00 34 92 00 00 00 02 00 00 7D 00 00 00 00 00'
- '00 00 00 00 00 2A AA 19 0A 00 00 14 00 00 00 05'
- '00 00 00 00 00 13 00 0C 00 09 00 0E 00 0D 00 0B'
- '8F 9D 09 50 50 50'),
- 10: bytes.fromhex('EF 00 62 2B 92 00 00 0C 00 00 7D 00 00 00 00 00' # Elekid
- '00 00 00 00 00 00 00 1E 1E 0A 00 14 00 00 00 05'
- '00 00 00 00 00 13 00 0B 00 08 00 0E 00 0B 00 0A'
- '8F 9D 09 50 50 50'),
- 11: bytes.fromhex('EF 00 62 2B 92 00 00 02 00 00 7D 00 00 00 00 00'
- '00 00 00 00 00 2A AA 1E 1E 0A 00 14 00 00 00 05'
- '00 00 00 00 00 13 00 0B 00 09 00 0F 00 0C 00 0B'
- '8F 9D 09 50 50 50'),
- 12: bytes.fromhex('EC 00 21 92 00 00 00 0A 00 00 7D 00 00 00 00 00' # Tyrogue
- '00 00 00 00 00 00 00 23 0A 00 00 14 00 00 00 05'
- '00 00 00 00 00 12 00 08 00 08 00 08 00 08 00 08'
- '8F 9D 09 50 50 50'),
- 13: bytes.fromhex('EC 00 21 92 00 00 00 01 00 00 7D 00 00 00 00 00'
- '00 00 00 00 00 2A AA 23 0A 00 00 14 00 00 00 05'
- '00 00 00 00 00 12 00 08 00 09 00 09 00 09 00 09'
- '8F 9D 09 50 50 50')
- }
- for i, odd_egg in odd_eggs.items():
- http_responses[('GET /cgb/download?name=/01/CGB-BXTJ/tamago/tamago%02x.pkm HTTP/1.0' % i).encode()] = {
- 'response': b'HTTP/1.0 200 OK', 'headers': {}, 'content': odd_egg
- }
- def craftHTTPResponse():
- global packet_data, http_text, http_responses, http_ready, response_text
- if(len(response_text) == 0):
- if(len(packet_data['data']) > 1):
- http_text += packet_data['data'][1:]
- http_data = parseHTTPRequest(http_text)
- if('request' in http_data):
- # if this is a POST request, is it done?
- if(http_data['request'].find(b'POST') == 0):
- if('Content-Length' in http_data['headers']):
- content_length = int(http_data['headers']['Content-Length'])
- if(len(http_data['content']) >= content_length):
- http_ready = False # request is done
- else: # this is a GET request, so we’re definitely done
- http_ready = False
- if(not http_ready):
- # Clear http_text before the next request
- http_text = bytearray()
- response = http_responses.get(bytes(http_data['request']))
- if(http_data['request'] == b'GET /cgb/upload?name=/01/CGB-BXTJ/exchange/10upload.cgi HTTP/1.0'):
- if('Gb-Auth-ID' in http_data['headers']):
- response = { 'response': b'HTTP/1.0 200 OK', 'headers': {'Gb-Auth-ID': b'HAIL GIOVANNI'}, 'content': b'\r\n' }
- if(response is None):
- print('No response known for %s' % http_data['request'].decode())
- response_text = b'HTTP/1.0 404 Not Found\r\n\r\n'
- else:
- response_text = response['response'] + b'\r\n'
- for header, value in response['headers'].items():
- response_text += header.encode() + b': ' + value + b'\r\n'
- response_text += b'\r\n' + response['content']
- bytes_to_send = min(254, len(response_text)) # Can’t send more than 254 bytes at once
- text_to_send = response_text[:bytes_to_send]
- response_text = response_text[bytes_to_send:]
- return text_to_send
- def parseHTTPRequest(x):
- http_data = {}
- if(b'\r\n\r\n' in x): # if this is a complete request
- http_data['request'] = x.split(b'\r\n')[0]
- http_data['headers'] = {}
- if(x.find(b'\r\n') < x.find(b'\r\n\r\n')): # if there are headers
- headers = x[x.find(b'\r\n') + 2 : x.find(b'\r\n\r\n')]
- headers = headers.split(b'\r\n')
- for header in headers:
- header = header.split(b': ')
- http_data['headers'][header[0].decode()] = header[1]
- http_data['content'] = x[x.find(b'\r\n\r\n') + 4:]
- return http_data
- def hexDump(x):
- for i in range(0, len(x), 16):
- print(' ' + ''.join('%02x ' % j for j in x[i : min(len(x), i + 16)]))
- configuration_data = bytearray()
- try:
- with open('mobilegb.cfg', 'rb') as f:
- configuration_data = bytearray(f.read())
- f.closed
- except FileNotFoundError:
- pass
- if(len(configuration_data) != 192):
- print("Configuration data file 'mobilegb.cfg' is invalid or does not exist.")
- print("Creating a blank configuration.\n")
- configuration_data = bytearray([0] * 192)
- try:
- link = BGBLinkCable('127.0.0.1',8765)
- link.setExchangeHandler(mobileAdapter)
- link.start()
- while True:
- time.sleep(10)
- except KeyboardInterrupt:
- print("Saving configuration to 'mobilegb.cfg'.")
- with open('mobilegb.cfg', 'wb') as out:
- out.write(configuration_data)
- out.closed
Add Comment
Please, Sign In to add comment