Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import pygame
- import sys
- import argparse
- from PIL import Image
- import logging
- import time
- import threading
- # Configure logging
- logging.basicConfig(level=logging.INFO)
- class SimpleEmulator:
- def __init__(self):
- self.display_width = 320
- self.display_height = 200
- self.memory = [0] * 0x6900000 # Ensure memory aligns with display
- self.frame_buffer = [[(0, 0, 0)] * self.display_width for _ in range(self.display_height)]
- self.registers = {
- 'eax': 0, 'ebx': 0, 'ecx': 0, 'edx': 0,
- 'esi': 0, 'edi': 0, 'esp': 0xFFFF, 'ebp': 0,
- 'al': 0, 'ah': 0, 'bl': 0, 'bh': 0,
- 'cl': 0, 'ch': 0, 'dl': 0, 'dh': 0,
- 'ds': 0, 'es': 0, 'cx': 0, 'si': 0, 'di': 0, 'ax': 0, 'bx': 0 # Segment registers for DOS
- }
- self.ip = 0x7C00 # Instruction pointer (start of BIOS)
- self.running = True
- self.screen = None
- self.maxShades = 5 # Change this if needed for performance
- self.keyboard_buffer = [] # Initialize the keyboard buffer
- self.boot_data = bytearray(b'\x00' * 512) # To initialize the virtual hard drive
- self.command_thread = threading.Thread(target=self.command_input) # Thread for command input
- self.command_thread.daemon = True # Daemon thread will exit when main program does
- self.devices = []
- self.vga_mode = "Text" # either use "Render" to show actual rendering or "Text" to only display text data
- self.fps = 0
- self.frame_count = 0
- self.last_time = pygame.time.get_ticks()
- def calculate_fps(self):
- """Calculate and return the current FPS."""
- current_time = pygame.time.get_ticks()
- self.frame_count += 1
- # Update FPS every second
- if current_time - self.last_time >= 1000:
- self.fps = self.frame_count
- self.frame_count = 0
- self.last_time = current_time
- return self.fps
- def log(self, text):
- args = parser.parse_args()
- #print(args.verbose)
- if args.verbose == "-v":
- logging.info(text)
- def warn(self, text):
- args = parser.parse_args()
- #print(args.verbose)
- if args.verbose == "-v":
- logging.warning(text)
- def dump_memory(self, device_name, start, end):
- device = next((d for d in self.devices if d.name == device_name), None)
- if not device:
- print(f"Device '{device_name}' not found.")
- return
- #if start < 0 or end >= device.size:
- # print("Invalid address range.")
- # return
- print(f"Memory dump of '{device_name}' from {start:04X} to {end:04X}:")
- for addr in range(start, end + 1):
- if (addr - start) % 16 == 0:
- print("\n{:04X}: ".format(addr), end="")
- print(f"{device.memory[addr]:02X} ", end="")
- print()
- def handle_interrupt(self, interrupt_number):
- if interrupt_number == 0x10: # INT 10h for video services
- self.handle_video_interrupt()
- def add_keypress(self, key):
- """Simulate a key press by adding it to the keyboard buffer."""
- self.keyboard_buffer.append(key)
- self.log(f"Key '{chr(key)}' added to keyboard buffer.")
- def handle_video_interrupt(self):
- ah = self.registers['ah'] # Function number in AH register
- if ah == 0x00: # Set Video Mode
- self.set_video_mode()
- elif ah == 0x0C: # Write Pixel
- self.write_pixel()
- else:
- self.warn(f"Unhandled INT 10h function: {ah:02X}")
- def command_input(self):
- """Function to handle user command input."""
- while self.running:
- command = input("root$ ")
- self.process_command(command)
- def write_device(self, device_name, address, data):
- """Write data to a specific address on a specific device."""
- device = next((d for d in self.devices if d.name == device_name), None)
- if device is None:
- print(f"Device '{device_name}' not found.")
- return
- if isinstance(data, str):
- # Write each character in the string to consecutive memory addresses
- for i, char in enumerate(data):
- device.write(address + i, ord(char))
- elif isinstance(data, int):
- # Write a single integer value
- device.write(address, data)
- else:
- print(f"Unsupported data type: {data}")
- def read_device(self, device_name, address):
- """Read data from a specific address on a specific device."""
- device = next((d for d in self.devices if d.name == device_name), None)
- if device:
- return device.read(address)
- else:
- print(f"Device '{device_name}' not found.")
- return None
- def process_command(self, command):
- """Process the command entered by the user."""
- parts = command.split()
- cmd = parts[0].lower()
- if cmd == "exit":
- self.running = False
- elif cmd == "registers" or cmd == 'reg':
- for reg, value in self.registers.items():
- print(f"{reg}: {value:#04x}")
- elif cmd.startswith("set"):
- if len(parts) == 3:
- reg_name = parts[1].lower()
- value = int(parts[2], 16) # Assume hexadecimal input
- if reg_name in self.registers:
- self.registers[reg_name] = value
- self.log(f"Set {reg_name} to {value:#04x}")
- else:
- self.warn(f"Unknown register: {reg_name}")
- else:
- self.warn("Usage: set <register> <value>")
- elif cmd == "mem":
- address = int(parts[1], 16) if len(parts) > 1 else 0
- print(f"Memory at {address:#04x}: {self.memory[address]:#04x}")
- elif cmd == "send":
- address = int(parts[1], 16) if len(parts) > 1 else 0
- print(f"Sending {address:#04x} at {self.ip}")
- self.handle_opcode(address)
- elif cmd == "ip":
- print(f"Current IP: {self.ip}")
- elif cmd == None:
- print(f"Please enter a command")
- elif cmd == "device":
- name = parts[2].lower()
- type = parts[1].lower()
- if type != "usb" and type != "hd" and type != "cd" and type != "ps2":
- print(f"Unsupported type: '{type}'")
- return
- #self.ip = 0x7C00
- print(f"Adding a new: {type} with name: '{name}' at IP: {self.ip}")
- newdevice = VirtualDevice(name)
- self.devices.append(newdevice)
- #pygame.display.set_caption(f"Rah.py - {file}")
- elif cmd == "key":
- key = parts[1].lower()
- print(f"Sent key: {key} as {int(chr(key))}")
- self.add_keypress(chr(key))
- elif cmd == "write":
- if len(parts) < 4:
- print("Usage: write <device_name> <address> <data>")
- else:
- device_name = parts[1]
- address = int(parts[2], 16) # Assume address is in hex
- data = int(parts[3]) # Assume data is in decimal
- self.write_device(device_name, address, data)
- elif cmd == "dump":
- if len(parts) < 4:
- print("Usage: dump <device> <address1> <address2>")
- else:
- device_name = parts[1]
- address = int(parts[2], 16) # Assume address is in hex
- address2 = int(parts[3], 16) # Assume data is in decimal
- self.dump_memory(device_name, address, address2)
- elif cmd == "read":
- if len(parts) < 3:
- print("Usage: read <device_name> <address>")
- else:
- device_name = parts[1]
- address = int(parts[2], 16) # Assume address is in hex
- data = self.read_device(device_name, address)
- if data is not None:
- print(f"Data at {address:04X} on '{device_name}': {data}")
- elif cmd == "memw":
- address = int(parts[1], 16) if len(parts) > 1 else 0
- value = int(parts[2], 16)
- oldv = self.memory[address]
- if oldv == 235:
- print("You're trying to edit the boot sector data")
- print("editing it might result into breaking the vm PERMANENTLY")
- awns = input("Continue (Y/N)")
- if awns.lower() == "y":
- pass
- else:
- print("Canceling")
- return
- self.memory[address] = value
- print(f"Memory edited from: {oldv} to {value}")
- elif cmd == "help":
- print(f"-- Rah.py CPU Commands --")
- print(f"Lists of all the commands:")
- print(f"reg or registers - shows the registers and their data.")
- print(f"set [register] [value] - sets the specified register to the specified value.")
- print(f"mem [address] - shows the value of the specified address.")
- print(f"memw [address] [new value] - edits the value of the address in the rom. USE AT YOUR OWN RISK")
- print(f"send [opcode] - sends an opcode and handles it.")
- print(f"ip - shows the current ip in memory.")
- print(f"device [type] [name] - adds a device to the list")
- print(f"write [device] [address] [value: int] - writes data at a specific address")
- print(f"read [device] [address] - reads data at a specific address")
- print(f"dump [device] [address1] [address2] - dumps the data between the adresses")
- print(f"help - shows this list.")
- elif cmd == "ds1": # Pixel placement debugging
- self.debugPixelPlacement()
- elif cmd == "ds2": # Checkboard test
- self.checkboardTest()
- elif cmd == "ds3": # Stripes test 1
- self.vertical_stripes(10)
- elif cmd == "ds4": # Stripes test 2
- self.horizontal_stripes(10)
- elif cmd == "qds": # Update screen
- self.update_display()
- elif cmd == "rds":
- self.display_raw_memory()
- else:
- print(f"Unknown command: {cmd}")
- def load_file(self, file_path, address):
- """Load a file into memory."""
- try:
- with open(file_path, 'rb') as f:
- data = f.read()
- for i, byte in enumerate(data):
- self.memory[address + i] = byte
- self.log(f"Loaded file '{file_path}' into memory at address {address:04X}")
- except FileNotFoundError:
- logging.error(f"Error: File '{file_path}' not found.")
- except Exception as e:
- logging.error(f"Error loading file: {e}")
- def load_test_data(self):
- """Load some test data into memory for rendering."""
- # Example: Filling memory with a pattern of color indices
- for y in range(self.display_height):
- for x in range(self.display_width):
- self.memory[y * self.display_width + x] = (x + y) % 256 # Cycle through 256 colors
- def execute(self, file):
- self.command_thread.start() # Start the command input thread
- while self.running:
- self.main_loop(file)
- def main_loop(self, file):
- for event in pygame.event.get():
- if event.type == pygame.QUIT:
- self.running = False
- if self.ip < len(self.memory):
- instruction = self.memory[self.ip]
- self.log(f"Executing instruction: {instruction:02X} at IP: {self.ip:04X}")
- self.handle_opcode(instruction)
- else:
- self.warn(f"Instruction pointer out of bounds: {self.ip}")
- self.ip = 0x7C00
- self.calculate_fps()
- pygame.display.set_caption(f"Rah.py - {file} - {self.fps} FPS")
- self.update_display()
- time.sleep(0.001) # Sleep to limit CPU usage and improve performance
- def setup_display(self, file):
- pygame.init()
- # Set up display and surface for rendering
- self.screen = pygame.display.set_mode((self.display_width * 2, self.display_height * 2), pygame.SRCALPHA | pygame.HWSURFACE)
- self.frame_surface = pygame.Surface((self.display_width, self.display_height))
- pygame.display.set_caption(f"Rah.py - {file} - Loading...")
- def handle_int_10(self, registers):
- """Handle BIOS interrupt 0x10 for video services."""
- function = registers['AL']
- if function == 0x00: # Set Video Mode
- mode = registers['AH']
- if mode == 0x13: # Set 320x200x256 mode
- self.set_video_mode_13()
- elif mode == 0x03: # Set 80x25 text mode
- self.set_video_mode_03()
- else:
- print(f"Unsupported video mode: {mode}")
- elif function == 0x0E: # Write Character and Attribute
- char = registers['AL']
- attr = registers['AH']
- self.write_character(char, attr)
- # Add other functions as needed...
- def update_display(self):
- # Create a back buffer for rendering
- back_buffer = pygame.Surface((self.display_width, self.display_height))
- pixel_array = pygame.PixelArray(back_buffer)
- # Loop through the display memory and set pixel colors
- for y in range(self.display_height):
- for x in range(self.display_width):
- mem_offset = 0xA0000 + (y * self.display_width + x)
- if mem_offset < len(self.memory):
- color_index = self.memory[mem_offset]
- else:
- color_index = 0 # Default to black
- r, g, b = self.get_color(color_index)
- pixel_array[x, y] = (r << 16) | (g << 8) | b # Set pixel value
- # Unlock the pixel array
- del pixel_array
- # Render the back buffer to the screen
- self.screen.blit(back_buffer, (0, 0))
- pygame.display.flip()
- def load_image_as_vga_memory(self, image_path):
- # Load an image file and convert it to grayscale VGA memory format
- #img = Image.open(image_path).convert("L").resize((self.display_width, self.display_height))
- #img_data = list(img.getdata())
- #return img_data # Return the pixel data as a list of grayscale color indices
- pass # This function was only for debug its useless for the emulator itself
- def horizontal_stripes(self, stripe_height):
- """Create horizontal stripes across the screen."""
- vga_start = 0xA0000
- for y in range(self.display_height):
- for x in range(self.display_width):
- mem_offset = vga_start + (y * self.display_width + x)
- if mem_offset < len(self.memory):
- if (y // stripe_height) % 2 == 0:
- self.memory[mem_offset] = 0 # Set to black
- else:
- self.memory[mem_offset] = 15 # Set to white
- def vertical_stripes(self, stripe_width):
- """Create vertical stripes across the screen."""
- vga_start = 0xA0000
- for y in range(self.display_height):
- for x in range(self.display_width):
- mem_offset = vga_start + (y * self.display_width + x)
- if mem_offset < len(self.memory):
- if (x // stripe_width) % 2 == 0:
- self.memory[mem_offset] = 0 # Set to black
- else:
- self.memory[mem_offset] = 15 # Set to white
- def checkboardTest(self):
- for y in range(200): # Assuming height is 200
- for x in range(320): # Assuming width is 320
- # Alternate between two color indices for a checkerboard effect
- if (x // 32) % 2 == (y // 32) % 2: # Change 32 for the size of each square
- color_index = 0 # Black
- else:
- color_index = 15 # White (assuming 15 is a valid index for white in your palette)
- # Set the color index in VGA memory
- self.memory[0xA0000 + (y * 320 + x)] = color_index
- def debugPixelPlacement(self):
- # Initialize VGA memory with a simple color pattern for testing
- for i in range(320 * 200): # Fill a 320x200 framebuffer
- self.memory[0xA0000 + i] = i % 256 # Cycling through color indices 0-255
- def display_raw_memory(self):
- """Display raw memory contents to visualize what's being written to video memory."""
- vga_start = 0xA0000
- for y in range(self.display_height):
- for x in range(self.display_width):
- mem_offset = vga_start + (y * self.display_width + x)
- if mem_offset < len(self.memory):
- color_index = self.memory[mem_offset]
- r, g, b = self.get_color(color_index) # Use your existing color mapping function
- self.frame_surface.set_at((x, y), (r, g, b))
- self.screen.blit(self.frame_surface, (0, 0))
- pygame.display.flip()
- def get_color(self, index):
- """Map color index to an RGB value."""
- if index < 64: # Grayscale shades
- gray_value = int(index * 4)
- return (gray_value, gray_value, gray_value)
- elif index < 128: # Red shades
- red_value = int((index - 64) * 4)
- return (red_value, 0, 0)
- elif index < 192: # Green shades
- green_value = int((index - 128) * 4)
- return (0, green_value, 0)
- else: # Blue shades
- blue_value = int((index - 192) * 4)
- return (0, 0, blue_value)
- def handle_opcode(self, instruction): # This handles a custom made cpu, it should mostly work even on new oses but i wouldnt raccomend using it
- # Basic MOV instruction
- if instruction == 0xB8: # MOV EAX, imm32
- self.registers['eax'] = (
- self.memory[self.ip + 1] |
- (self.memory[self.ip + 2] << 8) |
- (self.memory[self.ip + 3] << 16) |
- (self.memory[self.ip + 4] << 24)
- )
- self.ip += 5
- elif instruction == 0xE5: # MOV al, [address]
- address = (
- self.memory[self.ip + 1] |
- (self.memory[self.ip + 2] << 8)
- )
- self.registers['al'] = self.memory[address] # Move the value from memory to al
- self.ip += 3 # Increment IP by 3 to account for address bytes
- elif instruction == 0xCD: # INT instruction
- self.handle_interrupt(self.memory[self.ip + 1])
- self.ip += 2
- elif instruction == 0xE9: # JMP (absolute short)
- offset = self.memory[self.ip + 1] + (self.memory[self.ip + 2] << 8)
- self.ip += offset + 3
- elif instruction == 0xEB: # JMP short (relative jump)
- offset = self.memory[self.ip + 1]
- self.ip += offset + 2
- elif instruction == 0x00: # NOP
- self.ip += 1
- elif instruction == 0xF4: # HLT
- self.log("Execution ending")
- self.running = False
- elif instruction == 0xF3: # REP prefix (not implemented)
- self.ip += 1
- elif instruction == 0xAA: # STOSB (store byte)
- color = self.registers['al']
- for _ in range(self.registers['cx']): # Store CX times
- if self.registers['edi'] < self.video_memory_start + (self.display_height * self.display_width):
- self.memory[self.video_memory_start + self.registers['edi']] = color
- self.registers['edi'] += 1
- self.ip += 1
- # New Instructions
- elif instruction == 0x04: # ADD al, imm8
- immediate_value = self.memory[self.ip + 1]
- self.registers['al'] += immediate_value
- self.ip += 2
- elif instruction == 0x05: # ADD EAX, imm32
- immediate_value = (
- self.memory[self.ip + 1] |
- (self.memory[self.ip + 2] << 8) |
- (self.memory[self.ip + 3] << 16) |
- (self.memory[self.ip + 4] << 24)
- )
- self.registers['eax'] += immediate_value
- self.ip += 5
- elif instruction == 0x2C: # SUB al, imm8
- immediate_value = self.memory[self.ip + 1]
- self.registers['al'] -= immediate_value
- self.ip += 2
- elif instruction == 0x2D: # SUB EAX, imm32
- immediate_value = (
- self.memory[self.ip + 1] |
- (self.memory[self.ip + 2] << 8) |
- (self.memory[self.ip + 3] << 16) |
- (self.memory[self.ip + 4] << 24)
- )
- self.registers['eax'] -= immediate_value
- self.ip += 5
- elif instruction == 0x24: # AND al, imm8
- immediate_value = self.memory[self.ip + 1]
- self.registers['al'] &= immediate_value
- self.ip += 2
- elif instruction == 0x25: # AND EAX, imm32
- immediate_value = (
- self.memory[self.ip + 1] |
- (self.memory[self.ip + 2] << 8) |
- (self.memory[self.ip + 3] << 16) |
- (self.memory[self.ip + 4] << 24)
- )
- self.registers['eax'] &= immediate_value
- self.ip += 5
- elif instruction == 0x0C: # OR al, imm8
- immediate_value = self.memory[self.ip + 1]
- self.registers['al'] |= immediate_value
- self.ip += 2
- elif instruction == 0x0D: # OR EAX, imm32
- immediate_value = (
- self.memory[self.ip + 1] |
- (self.memory[self.ip + 2] << 8) |
- (self.memory[self.ip + 3] << 16) |
- (self.memory[self.ip + 4] << 24)
- )
- self.registers['eax'] |= immediate_value
- self.ip += 5
- elif instruction == 0x34: # XOR al, imm8
- immediate_value = self.memory[self.ip + 1]
- self.registers['al'] ^= immediate_value
- self.ip += 2
- elif instruction == 0x35: # XOR EAX, imm32
- immediate_value = (
- self.memory[self.ip + 1] |
- (self.memory[self.ip + 2] << 8) |
- (self.memory[self.ip + 3] << 16) |
- (self.memory[self.ip + 4] << 24)
- )
- self.registers['eax'] ^= immediate_value
- self.ip += 5
- elif instruction == 0x40: # INC EAX
- self.registers['eax'] += 1
- self.ip += 1
- elif instruction == 0x48: # DEC EAX
- self.registers['eax'] -= 1
- self.ip += 1
- elif instruction == 0x6A: # PUSH imm8
- value = self.memory[self.ip + 1]
- self.memory[self.registers['esp']] = value
- self.registers['esp'] -= 1
- self.ip += 2
- elif instruction == 0x68: # PUSH imm32
- immediate_value = (
- self.memory[self.ip + 1] |
- (self.memory[self.ip + 2] << 8) |
- (self.memory[self.ip + 3] << 16) |
- (self.memory[self.ip + 4] << 24)
- )
- self.memory[self.registers['esp']] = immediate_value
- self.registers['esp'] -= 4
- self.ip += 5
- elif instruction == 0x58: # POP EAX
- self.registers['eax'] = self.memory[self.registers['esp'] + 1]
- self.registers['esp'] += 4
- self.ip += 1
- elif instruction == 0x74: # JZ (jump if zero)
- offset = self.memory[self.ip + 1]
- if self.registers['eax'] == 0:
- self.ip += offset + 2
- else:
- self.ip += 2
- elif instruction == 0x75: # JNZ (jump if not zero)
- offset = self.memory[self.ip + 1]
- if self.registers['eax'] != 0:
- self.ip += offset + 2
- else:
- self.ip += 2
- elif instruction == 0x3D: # CMP EAX, imm32
- immediate_value = (
- self.memory[self.ip + 1] |
- (self.memory[self.ip + 2] << 8) |
- (self.memory[self.ip + 3] << 16) |
- (self.memory[self.ip + 4] << 24)
- )
- self.registers['zero_flag'] = self.registers['eax'] == immediate_value
- self.registers['eax'] -= immediate_value # Set flags based on the result
- self.ip += 5
- elif instruction == 0xA8: # TesT al, imm8
- immediate_value = self.memory[self.ip + 1]
- self.registers['zero_flag'] = (self.registers['al'] & immediate_value) == 0
- self.ip += 2
- elif instruction == 0x0E: # PUSH CS
- # Push the current value of the CS register onto the stack
- # CS is typically 0 in a simple emulator without segmentation
- cs_value = 0 # Set to the value of CS (0 in this case)
- self.memory[self.registers['esp']] = cs_value & 0xFF # Push low byte
- self.memory[self.registers['esp'] + 1] = (cs_value >> 8) & 0xFF # Push high byte
- self.registers['esp'] -= 2 # Decrement stack pointer by 2 bytes
- self.ip += 1 # Move to the next instruction
- elif instruction == 0x1F: # POP DS
- # Pop the top value from the stack into the DS register
- self.registers['ds'] = (
- self.memory[self.registers['esp'] + 1] << 8 | # High byte
- self.memory[self.registers['esp']] # Low byte
- )
- self.registers['esp'] += 2 # Increment esP to discard the popped value
- self.ip += 1
- elif instruction == 0xB9: # MOV ECX, imm32
- self.registers['ecx'] = (
- self.memory[self.ip + 1] |
- (self.memory[self.ip + 2] << 8) |
- (self.memory[self.ip + 3] << 16) |
- (self.memory[self.ip + 4] << 24)
- )
- self.ip += 5 # Move IP to the next instruction
- if instruction == 0xFE:
- sub_instruction = self.memory[self.ip + 1]
- if sub_instruction == 0xC0: # INC al
- self.registers['al'] += 1
- self.ip += 2
- elif sub_instruction == 0xC1: # INC CL
- self.registers['cl'] += 1
- self.ip += 2
- elif sub_instruction == 0xC2: # INC DL
- self.registers['dl'] += 1
- self.ip += 2
- elif sub_instruction == 0xC3: # INC BL
- self.registers['bl'] += 1
- self.ip += 2
- elif sub_instruction == 0xC4: # INC ah
- self.registers['ah'] += 1
- self.ip += 2
- elif sub_instruction == 0xC5: # INC CH
- self.registers['ch'] += 1
- self.ip += 2
- elif sub_instruction == 0xC6: # INC DH
- self.registers['dh'] += 1
- self.ip += 2
- elif sub_instruction == 0xC7: # INC BH
- self.registers['bh'] += 1
- self.ip += 2
- elif sub_instruction == 0xFC: # DEC al
- self.registers['al'] -= 1
- self.ip += 2
- else:
- self.warn(f"Unknown FE sub-instruction: {sub_instruction:02X}")
- self.ip += 1
- #self.running = False
- if instruction == 0xA4: # MOVSB
- # Move byte from DS:SI to es:DI
- byte_to_move = self.memory[self.registers['ds'] + self.registers['si']]
- self.memory[self.registers['es'] + self.registers['di']] = byte_to_move
- # Increment SI and DI
- self.registers['si'] += 1
- self.registers['di'] += 1
- # Increment IP to point to the next instruction
- self.ip += 1
- self.log(f"MOVSB: Moved byte {byte_to_move:02X} from DS:SI to es:DI")
- if instruction == 0xEA: # Far JMP
- # Read the offset and segment from memory
- offset_low = self.memory[self.ip + 1]
- offset_high = self.memory[self.ip + 2]
- segment_low = self.memory[self.ip + 3]
- segment_high = self.memory[self.ip + 4]
- # Combine low and high bytes to form full offset and segment
- offset = (offset_high << 8) | offset_low
- segment = (segment_high << 8) | segment_low
- # Set CS and IP for the far jump
- self.registers['cs'] = segment
- self.ip = offset
- # Increment IP to point to the next instruction after the JMP
- self.ip += 5 # Move past the current instruction and its operands
- self.log(f"Far JMP to segment: {segment:04X}, offset: {offset:04X}")
- elif instruction == 0x7D: # JGE (Jump if Greater or Equal)
- offset = self.memory[self.ip + 1] # Get the offset for the jump
- # Check if the jump condition is met based on the Sign Flag (SF) and Overflow Flag (OF)
- if self.registers.get('sf', 0) == self.registers.get('of', 0):
- self.ip += offset + 2 # Jump to the relative address
- else:
- self.ip += 2 # Continue to the next instruction if condition not met
- elif instruction == 0xFD: # STD (Set Direction Flag)
- self.registers['df'] = 1 # Set the Direction Flag to 1
- self.ip += 1 # Move to the next instruction
- self.log("Direction Flag set to 1 (decrement)")
- elif instruction == 0xFF:
- sub_instruction = self.memory[self.ip + 1]
- if sub_instruction == 0x00: # INC r/m32
- address = self.memory[self.ip + 2] | (self.memory[self.ip + 3] << 8) | (self.memory[self.ip + 4] << 16) | (self.memory[self.ip + 5] << 24)
- self.memory[address] += 1
- self.ip += 6
- elif sub_instruction == 0x01: # DEC r/m32
- address = self.memory[self.ip + 2] | (self.memory[self.ip + 3] << 8) | (self.memory[self.ip + 4] << 16) | (self.memory[self.ip + 5] << 24)
- self.memory[address] -= 1
- self.ip += 6
- elif sub_instruction == 0x02: # CalL r/m32
- address = self.memory[self.ip + 2] | (self.memory[self.ip + 3] << 8) | (self.memory[self.ip + 4] << 16) | (self.memory[self.ip + 5] << 24)
- # Push current IP onto stack
- self.memory[self.registers['esp']] = self.ip + 6 # Save the return address
- self.registers['esp'] -= 4 # Decrement stack pointer
- self.ip = address # Jump to the address
- elif sub_instruction == 0x03: # JMP r/m32
- address = self.memory[self.ip + 2] | (self.memory[self.ip + 3] << 8) | (self.memory[self.ip + 4] << 16) | (self.memory[self.ip + 5] << 24)
- self.ip = address # Jump to the address
- elif sub_instruction == 0x04: # POP r/m32
- address = self.memory[self.ip + 2] | (self.memory[self.ip + 3] << 8) | (self.memory[self.ip + 4] << 16) | (self.memory[self.ip + 5] << 24)
- # Pop value from stack into memory
- self.registers['esp'] += 4 # Increment stack pointer
- self.memory[address] = self.memory[self.registers['esp']] # Load the popped value into the address specified
- self.ip += 6
- else:
- self.warn(f"Unknown FF sub-instruction: {sub_instruction:02X}")
- self.ip += 2 # Increment IP to skip the unknown instruction
- elif instruction == 0x0A:
- mod_rm_byte = self.memory[self.ip + 1]
- # Decode the MOD-REG-R/M byte
- mod = (mod_rm_byte >> 6) & 0b11 # Bits 6-7
- reg = (mod_rm_byte >> 3) & 0b111 # Bits 3-5
- rm = mod_rm_byte & 0b111 # Bits 0-2
- # Depending on the addressing mode (mod), fetch the operands
- if mod == 0b00:
- # No displacement (direct addressing)
- if rm == 0b110: # Addressing mode 110 means direct memory access
- address = self.memory[self.ip + 2] | (self.memory[self.ip + 3] << 8) # Example for 16-bit addressing
- operand1 = self.memory[address] # Memory operand
- operand2 = self.registers[reg] # Register operand
- result = operand1 | operand2
- self.memory[address] = result # Store result back to memory
- self.ip += 4 # Move instruction pointer to next instruction
- else:
- # Handle other R/M addressing modes
- self.warn(f"Unsupported R/M mode for OR: {rm:03b}")
- self.ip += 2 # Skip this instruction
- elif mod == 0b01:
- # Displacement of 8 bits
- if rm == 0b110:
- address = self.memory[self.ip + 2] + self.memory[self.ip + 3] # Example for 8-bit displacement
- operand1 = self.memory[address]
- operand2 = self.registers[reg]
- result = operand1 | operand2
- self.memory[address] = result
- self.ip += 4
- elif mod == 0b10:
- # Displacement of 16 bits (or 32 bits, depending on your architecture)
- if rm == 0b110:
- address = self.memory[self.ip + 2] | (self.memory[self.ip + 3] << 8) # Example for 16-bit addressing
- operand1 = self.memory[address]
- operand2 = self.registers[reg]
- result = operand1 | operand2
- self.memory[address] = result
- self.ip += 4
- else:
- self.warn(f"Unknown MOD value: {mod:02b}")
- self.ip += 2 # Increment IP to skip the unknown instruction
- elif instruction == 0x0F:
- # Fetch the next byte for the extended opcode
- extended_opcode = self.memory[self.ip + 1]
- if extended_opcode == 0x01: # Example: LAR (Load Access Rights)
- mod_rm_byte = self.memory[self.ip + 2]
- # Decode the MOD-REG-R/M byte
- mod = (mod_rm_byte >> 6) & 0b11 # Bits 6-7
- reg = (mod_rm_byte >> 3) & 0b111 # Bits 3-5
- rm = mod_rm_byte & 0b111 # Bits 0-2
- # Handle addressing modes similar to the previous examples
- if mod == 0b00: # No displacement
- if rm == 0b110:
- address = self.memory[self.ip + 3] | (self.memory[self.ip + 4] << 8)
- # Load access rights (assuming some value from a descriptor table)
- operand = self.memory[address] # Example to fetch value
- self.registers[reg] = operand # Store the result in the register
- self.ip += 5 # Move instruction pointer forward
- elif mod == 0b01: # 8-bit displacement
- # Implement this case accordingly
- pass
- elif mod == 0b10: # 16-bit displacement
- # Implement this case accordingly
- pass
- else:
- self.warn(f"Unknown MOD value: {mod:02b}")
- self.ip += 3 # Increment IP to skip the unknown instruction
- elif extended_opcode == 0x20: # Example: MOVAPS
- # Handle MOVAPS self.logic here
- pass
- elif extended_opcode == 0x22: # Example: MOVAPD
- # Handle MOVAPD self.logic here
- pass
- # Add more extended opcodes as needed
- else:
- self.warn(f"Unsupported extended opcode: {extended_opcode:02x}")
- self.ip += 2 # Increment IP to skip this instruction
- elif instruction == 0x11: # ADC (Add with Carry)
- mod_rm_byte = self.memory[self.ip + 1]
- # Decode the MOD-REG-R/M byte
- mod = (mod_rm_byte >> 6) & 0b11 # Bits 6-7
- reg = (mod_rm_byte >> 3) & 0b111 # Bits 3-5
- rm = mod_rm_byte & 0b111 # Bits 0-2
- if mod == 0b00: # No displacement
- self.log("0bit")
- if rm == 0b110: # Direct addressing mode (address specified)
- address = self.memory[self.ip + 2] | (self.memory[self.ip + 3] << 8)
- value = self.memory[address] # Load value from memory
- result = self.registers[reg] + value + self.flags['carry'] # Add with carry
- self.registers[reg] = result & 0xFFFF # Store result back to register, assuming 16-bit
- self.update_flags(result) # Update flags based on result
- self.ip += 4 # Move instruction pointer forward
- self.ip += 2
- elif mod == 0b01: # 8-bit displacement
- self.log("8bit")
- displacement = self.memory[self.ip + 2] # Fetch the displacement
- address = self.registers[rm] + displacement
- value = self.memory[address] # Load value from memory
- result = self.registers[reg] + value + self.flags['carry'] # Add with carry
- self.registers[reg] = result & 0xFFFF # Store result back to register, assuming 16-bit
- self.update_flags(result) # Update flags based on result
- self.ip += 3 # Move instruction pointer forward
- elif mod == 0b10: # 16-bit displacement
- self.log("16bit")
- displacement = self.memory[self.ip + 2] | (self.memory[self.ip + 3] << 8) # Fetch the displacement
- address = self.registers[rm] + displacement
- value = self.memory[address] # Load value from memory
- result = self.registers[reg] + value + self.flags['carry'] # Add with carry
- self.registers[reg] = result & 0xFFFF # Store result back to register, assuming 16-bit
- self.update_flags(result) # Update flags based on result
- self.ip += 4 # Move instruction pointer forward
- else:
- self.warn(f"Unknown MOD value: {mod:02b}")
- self.ip += 2 # Increment IP to skip the unknown instruction
- elif instruction == 0x13: # now there are all the interrupt calls it took so long to make them fr
- self.int_13h()
- elif instruction == 0x14:
- self.int_14h()
- self.ip += 1
- elif instruction == 0x15:
- self.int_15h()
- self.ip += 1
- elif instruction == 0x16:
- self.int_16h()
- self.ip += 1
- elif instruction == 0x17:
- self.int_17h()
- self.ip += 1
- elif instruction == 0x18:
- self.int_18h()
- self.ip += 1
- elif instruction == 0x19:
- self.int_19h() # it can finally boot
- #self.ip += 1
- elif instruction == 0x1A:
- self.int_1ah() # Setup system clock lox
- else:
- self.ip += 1
- def int_13h(self):
- """Simulate INT 13h for disk operations."""
- # Assume the parameters are set for reading the first sector (for simplicity)
- # AX: Number of sectors to read (1 for simplicity)
- # CX: Cylinder/Head/Sector (assumed to be valid)
- # DX: Drive number (assumed to be 0)
- # bx: Destination segment:offset in memory to store the data
- sector_count = self.registers['ax'] # Number of sectors to read
- destination_offset = (self.registers['bx'] & 0xFFFF) # Get offset from bx
- # For simplicity, we read a predefined 'disk_data' into memory
- for i in range(sector_count):
- if destination_offset + i * 256 < len(self.memory):
- self.memory[destination_offset + i * 256 : destination_offset + (i + 1) * 256] = self.disk_data
- else:
- logging.error("Attempting to write out of memory bounds.")
- self.running = False
- return
- # Update flags (in a real implementation, you would check for errors)
- self.registers['ah'] = 0 # Success (0)
- self.ip += 1
- def int_14h(self):
- """Simulate INT 14h for serial communication."""
- # This is a simplified implementation; in a real emulator, you'd manage serial port state
- if self.registers['ah'] == 0x00: # Check for Serial Port Read
- # Simulate reading a character from the serial port
- # For demonstration, we'll use a simple self.logic to return a character
- # In a real implementation, you'd read from an actual buffer
- character = 0x41 # ASCII 'A' for example
- self.registers['al'] = character # Load character into al register
- self.registers['ah'] = 0 # Clear ah to indicate success
- elif self.registers['ah'] == 0x01: # Check for Serial Port Write
- # Simulate writing a character to the serial port
- # In a real implementation, you would write to a serial port buffer
- char_to_send = self.registers['al'] # Get character from al
- self.log(f"Sending character: {chr(char_to_send)}")
- self.registers['ah'] = 0 # Clear ah to indicate success
- else:
- self.warn("Unsupported ah value for INT 14h")
- self.registers['ah'] = 0xFF # Indicate an error
- def int_15h(self):
- """Simulate INT 15h for extended services."""
- if self.registers['ah'] == 0x00: # Get System BIOS Information
- self.registers['ax'] = 0x0000 # Example value indicating the BIOS is present
- self.log("INT 15h: BIOS Information returned.")
- elif self.registers['ah'] == 0x86: # Extended Memory Size (Get)
- # This returns the amount of extended memory available
- extended_memory_size = 0xFFFF # For example, return 64KB of extended memory
- self.registers['ax'] = extended_memory_size
- self.log(f"INT 15h: Extended memory size is {extended_memory_size} KB.")
- elif self.registers['ah'] == 0x87: # Extended Memory Size (Get in KB)
- # This function returns the amount of extended memory in KB
- extended_memory_size_kb = 64 # Example: return 64KB
- self.registers['ax'] = extended_memory_size_kb
- self.log(f"INT 15h: Extended memory size in KB is {extended_memory_size_kb} KB.")
- else:
- self.warn("Unsupported ah value for INT 15h")
- self.registers['ah'] = 0xFF # Indicate an error
- def int_16h(self):
- """Simulate INT 16h for keyboard services."""
- if self.registers['ah'] == 0x00: # Read a character from the keyboard buffer
- if self.keyboard_buffer:
- # Read the character and remove it from the buffer
- char = self.keyboard_buffer.pop(0)
- self.registers['al'] = char # Store character in al
- self.registers['ah'] = 0x00 # Clear ah
- self.log(f"INT 16h: Read character '{chr(char)}'.")
- else:
- self.registers['ah'] = 0x01 # Indicate no character was read (error code)
- self.warn("INT 16h: No character in buffer.")
- elif self.registers['ah'] == 0x01: # Check if a key has been pressed
- if self.keyboard_buffer:
- self.registers['zf'] = 0 # Zero flag clear, indicating key is available
- self.log("INT 16h: Key pressed.")
- else:
- self.registers['zf'] = 1 # Zero flag set, indicating no key pressed
- self.log("INT 16h: No key pressed.")
- else:
- self.warn("Unsupported ah value for INT 16h")
- self.registers['ah'] = 0xFF # Indicate an error
- def int_17h(self):
- """Simulate INT 17h for printer services."""
- if self.registers['ah'] == 0x00: # Print a character
- char = self.registers['al'] # Get character from al
- # Here you would send the character to the printer.
- # For simplicity, we will self.log the character instead.
- self.log(f"INT 17h: Printing character '{(char)}'.")
- # Simulate successful print operation
- self.registers['ah'] = 0x00 # Clear ah to indicate success
- elif self.registers['ah'] == 0x01: # Check if the printer is ready
- # In a real scenario, check the printer status here.
- self.registers['al'] = 0x01 # Set al to indicate printer is ready (1)
- self.log("INT 17h: Printer is ready.")
- else:
- self.warn("Unsupported ah value for INT 17h")
- self.registers['ah'] = 0xFF # Indicate an error
- def int_18h(self):
- """Simulate INT 18h for diskette services."""
- if self.registers['ah'] == 0x00: # Read sectors
- # Let's assume we're reading from a simulated disk.
- # Normally we'd specify the drive, number of sectors, etc.
- # Here you would implement the self.logic for reading sectors from a disk.
- # For demonstration, let's simulate reading 1 sector into a buffer.
- sectors_to_read = self.registers['al'] # Number of sectors to read
- buffer_segment = self.registers['es'] # Segment where to store data
- buffer_offset = self.registers['bx'] # Offset where to store data
- self.log(f"INT 18h: Reading {sectors_to_read} sectors into segment:{buffer_segment} offset:{buffer_offset}.")
- # Simulate reading sectors (you'd fill the buffer with data from the disk)
- for i in range(sectors_to_read):
- # Example: fill buffer with dummy data
- self.memory[buffer_segment * 16 + buffer_offset + i] = i # Dummy data
- self.registers['ah'] = 0x00 # Set ah to 0 to indicate success
- else:
- self.warn("Unsupported ah value for INT 18h")
- self.registers['ah'] = 0xFF # Indicate an error
- def int_19h(self):
- """Simulate INT 19h for booting from a floppy disk."""
- # Simulate loading the boot sector into memory at 0x7C00
- boot_sector_size = 512 # Size of a boot sector
- destination_offset = self.registers['bx'] # Destination offset
- # Load simulated boot data into memory
- if destination_offset < len(self.memory):
- self.memory[destination_offset:destination_offset + boot_sector_size] = self.boot_data
- self.log("INT 19h: Boot sector loaded into memory at offset 0x{:04X}.".format(destination_offset))
- else:
- logging.error("INT 19h: Attempting to write out of memory bounds.")
- self.running = False
- return
- # Simulate setting up the boot process
- self.registers['ah'] = 0 # Success (0)
- self.log("INT 19h: Boot operation completed successfully.")
- # Simulate moving to the next instruction
- self.ip += 1
- def int_1ah(self):
- """Simulate INT 1Ah for getting the current system time."""
- # Get current time
- current_time = time.localtime()
- # Fill registers with the current time
- self.registers['ch'] = current_time.tm_hour # Hours
- self.registers['cl'] = current_time.tm_min # Minutes
- self.registers['dh'] = current_time.tm_sec # Seconds
- self.registers['al'] = current_time.tm_sec // 10 # Tenths of seconds
- self.log(f"INT 1Ah: Current Time Retrieved: {self.registers['ch']}:{self.registers['cl']}:{self.registers['dh']}."
- f"{self.registers['al']} tenths of seconds.")
- # Set carry flag to 0 (success)
- self.registers['ah'] = 0
- # Simulate moving to the next instruction
- self.ip += 1
- def run(self, file_path):
- self.setup_display(file_path)
- self.load_test_data() # Load test data before running the emulator
- self.load_file(file_path, self.ip)
- self.vgaMem = self.load_image_as_vga_memory(file_path)
- self.execute(file_path)
- pygame.quit()
- sys.exit()
- class VirtualDevice:
- def __init__(self, name):
- self.name = name
- self.memory = bytearray(0x10000) # 64KB memory for the device
- def read(self, address):
- """Read a byte from the device's memory."""
- return self.memory[address]
- def write(self, address, value):
- """Write a byte to the device's memory."""
- self.memory[address] = value
- if __name__ == "__main__":
- parser = argparse.ArgumentParser(description="Rah.py")
- parser.add_argument("file", help="Binary file to load into memory")
- parser.add_argument("verbose", help="Enables self.logs")
- args = parser.parse_args()
- emulator = SimpleEmulator()
- emulator.run(args.file)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement