Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- import os
- import time
- import subprocess
- import filecmp
- import json
- import datetime
- from openai import OpenAI
- # -------------------------------------------------------------------------
- # 1) Configure your DeepSeek credentials
- # -------------------------------------------------------------------------
- DEEPSEEK_API_KEY = "YOURDEEPSEEKAPIKEY"
- client = OpenAI(api_key=DEEPSEEK_API_KEY, base_url="https://api.deepseek.com")
- # -------------------------------------------------------------------------
- # 2) Base system prompt for the LLM
- # -------------------------------------------------------------------------
- BASE_SYSTEM_PROMPT = {
- "role": "system",
- "content": (
- "You are an advanced optimization assistant. Your ONLY goal is "
- "to optimize the user's Python code to reduce runtime as much as possible, "
- "while producing an identical final_output.ppm image bit-for-bit. "
- "Do NOT focus on readability or comments. Do not break the code. "
- "If you introduce new dependencies, ensure they don't cause errors. "
- "Return a JSON object with the following format:\n"
- "{\n"
- ' "code": "... optimized code ...",\n'
- ' "changes": "Brief description of key optimizations",\n'
- ' "estimated_impact": "Expected performance impact"\n'
- "}"
- )
- }
- # -------------------------------------------------------------------------
- # 3) Helper functions
- # -------------------------------------------------------------------------
- def write_code_to_file(code_str, filename):
- """Write the given code string to a file, using UTF-8 encoding."""
- with open(filename, "w", encoding="utf-8") as f:
- f.write(code_str)
- def run_code_and_measure(pyfile):
- """
- Run the given Python script, measure how long it takes.
- Returns: (time_taken, success, error_msg).
- """
- start_time = time.time()
- try:
- print(f"[DEBUG] Running '{pyfile}' to measure performance.")
- subprocess.check_output(["python", pyfile], stderr=subprocess.STDOUT)
- end_time = time.time()
- return (end_time - start_time, True, "")
- except subprocess.CalledProcessError as e:
- end_time = time.time()
- error_msg = e.output.decode("utf-8", errors="ignore")
- return (end_time - start_time, False, error_msg)
- def compare_ppm_files(file1="final_output.ppm", file2="original_output.ppm"):
- """Byte-for-byte check of two PPM files."""
- if not (os.path.exists(file1) and os.path.exists(file2)):
- return False
- return filecmp.cmp(file1, file2, shallow=False)
- def read_original_script():
- """Read the original 'render.py' content as UTF-8 safely."""
- if not os.path.exists("render.py"):
- print("No 'render.py' found. Exiting.")
- return None
- with open("render.py", "r", encoding="latin-1") as f:
- return f.read().encode("utf-8", errors="ignore").decode("utf-8")
- # -------------------------------------------------------------------------
- # 4) Main optimization loop
- # -------------------------------------------------------------------------
- def main_optimization_loop(rounds=10):
- # Create output directory and set log file path ONCE at the start
- output_dir = "./loopOptimizer"
- os.makedirs(output_dir, exist_ok=True)
- log_file = os.path.join(output_dir, "optimization_results.txt")
- # (A) Read the baseline code from 'render.py'
- original_content = read_original_script()
- if not original_content:
- return
- # We'll run the baseline to measure time and produce original_output.ppm if needed
- baseline_file = "render_baseline.py"
- write_code_to_file(original_content, baseline_file)
- print("=== Running baseline code (render.py) ===")
- orig_time, orig_success, orig_err = run_code_and_measure(baseline_file)
- if not orig_success:
- print("[DEBUG] Baseline code error:")
- print(orig_err)
- return
- if not os.path.exists("original_output.ppm"):
- if os.path.exists("final_output.ppm"):
- os.rename("final_output.ppm", "original_output.ppm")
- print("Saved reference image as 'original_output.ppm'.")
- else:
- print("ERROR: 'final_output.ppm' not found; cannot create reference.")
- return
- else:
- print("=== Using existing 'original_output.ppm' as reference ===")
- best_time = orig_time
- current_code = original_content
- # (B) Prepare conversation messages
- messages = [BASE_SYSTEM_PROMPT]
- # We'll keep a local performance history in memory
- # e.g. list of dicts: [{'round': 1, 'time': 10.5, 'match': False, 'success': True}, ...]
- performance_history = []
- # Initial header write
- timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- with open(log_file, "a", encoding="utf-8") as lf:
- lf.write(f"{'=' * 50}\n")
- lf.write(f"Optimization Run: {timestamp}\n")
- lf.write(f"{'=' * 50}\n\n")
- lf.write("=== STARTING OPTIMIZATION ===\n")
- lf.write(f"Original time: {orig_time:.2f}s\n\n")
- # Track failed attempts per round
- failed_attempts = {} # {round_num: num_fails}
- for iteration in range(1, rounds + 1):
- # Calculate current round number with retries
- if iteration in failed_attempts:
- round_num = f"{iteration}.{failed_attempts[iteration]}"
- failed_attempts[iteration] += 1
- else:
- round_num = str(iteration)
- failed_attempts[iteration] = 1
- # Update version file path
- version_file = os.path.join(output_dir, f"render_v{round_num}.py")
- abs_version_file = os.path.abspath(version_file)
- print(f"\n=== OPTIMIZATION ROUND {iteration} ===")
- # We'll build a short summary of performance so far
- # e.g., "Round1: 10.20s, Round2: 9.50s..."
- perf_summary = []
- for rec in performance_history:
- perf_summary.append(f"Round{rec['round']}: {rec['time']:.2f}s")
- perf_string = ", ".join(perf_summary) if perf_summary else "No prior performance data yet."
- # We'll feed the last best time plus the performance summary
- user_prompt = (
- f"Performance history so far: {perf_string}\n\n"
- f"Last known best run time: {best_time:.2f} seconds.\n"
- "Here is the current code:\n[ ... ]\n\n" # Redact actual code in the debug prompt
- "Please optimize further while preserving EXACT output.\n"
- "You MUST return a JSON object with 'code', 'changes', and 'estimated_impact'."
- )
- # We'll store the actual code in memory but not show it in debug
- # We'll still *send* that code to the LLM by appending it to user_prompt behind the scenes,
- # but for demonstration, we can just do something like:
- full_user_prompt = user_prompt + f"\n\nActual code content:\n{current_code}"
- # Debug print: Show the entire conversation except the actual code
- # We'll replace the code with "[ ... ]" in logs
- print("[DEBUG] Building conversation for LLM:")
- # System prompt is already in messages[0], so let's show them in short
- for idx, msg in enumerate(messages, 1):
- # We'll never print out the entire code or entire content if it's too large
- role = msg["role"].upper()
- truncated = msg["content"][:250] + ("..." if len(msg["content"]) > 250 else "")
- print(f" [{idx}] {role}: {truncated}")
- # Add the user prompt to messages
- messages.append({"role": "user", "content": full_user_prompt})
- # Debug print: show just the user prompt without full code
- prompt_truncated = user_prompt[:400] + ("..." if len(user_prompt) > 400 else "")
- print(f"[DEBUG] SENDING USER PROMPT: {prompt_truncated}")
- # Now call the LLM
- try:
- response = client.chat.completions.create(
- model="deepseek-chat",
- messages=messages,
- response_format={"type": "json_object"}, # no max_tokens
- stream=False
- )
- except Exception as e:
- print(f"[DEBUG] Error calling LLM: {str(e)}")
- # Log the error
- with open("optimization_results.txt", "a", encoding="utf-8") as lf:
- lf.write(f"\n[ROUND {iteration}]\nTime: N/A (Error calling LLM)\n")
- lf.write("Output Match: False\nChanges: Unknown\nEstimated Impact: Unknown\n")
- lf.write(f"ERROR: {str(e)}\n")
- continue
- # Attempt to parse JSON from the LLM's reply
- raw_json = response.choices[0].message.content
- print(f"[DEBUG] Raw LLM response (500 chars max): {raw_json[:500]}{'...' if len(raw_json)>500 else ''}")
- new_code = ""
- changes = ""
- impact = ""
- success = False
- match = False
- time_taken = 0.0
- error_msg = ""
- try:
- result = json.loads(raw_json)
- new_code = result.get("code", "")
- changes = result.get("changes", "")
- impact = result.get("estimated_impact", "")
- success = True # means we got valid JSON
- except json.JSONDecodeError as e:
- print("[DEBUG] Failed to parse JSON, continuing to next round.")
- parse_fail_msg = (
- "Your output was invalid JSON. Please return valid JSON next time "
- "using the specified format with code, changes, and estimated_impact."
- )
- messages.append({"role": "user", "content": parse_fail_msg})
- # Log
- with open("optimization_results.txt", "a", encoding="utf-8") as lf:
- lf.write(f"\n[ROUND {iteration}]\nTime: N/A (JSON parse failed)\n")
- lf.write("Output Match: False\nChanges: Unknown\nEstimated Impact: Unknown\n")
- lf.write(f"ERROR: JSON parse error: {str(e)}\n")
- performance_history.append({"round": iteration, "time": 0.0, "match": False, "success": False})
- continue
- # We add the entire successful JSON reply as assistant content for the next iteration
- messages.append({"role": "assistant", "content": raw_json})
- # (E) Let's run & measure this new code
- version_file = f"render_v{iteration}.py"
- write_code_to_file(new_code, version_file)
- time_taken, code_success, error_msg = run_code_and_measure(version_file)
- if code_success:
- match = compare_ppm_files("final_output.ppm", "original_output.ppm")
- # (F) Log results in optimization_results.txt
- with open(log_file, "a", encoding="utf-8") as lf:
- log_entry = (
- f"{'=' * 20} Round {round_num} {'=' * 20}\n"
- f"{abs_version_file}\n"
- )
- if success:
- time_diff_pct = ((time_taken - orig_time) / orig_time) * 100
- log_entry += f"Time: {time_taken:.2f}s ({time_diff_pct:+.1f}% vs original)\n"
- else:
- log_entry += "Time: N/A (Script failed to run)\n"
- log_entry += (
- f"Output Match: {match}\n"
- f"Changes: {changes}\n"
- f"Expected Impact: {impact}\n"
- )
- if not success:
- log_entry += f"ERROR: {error_msg}\n"
- lf.write(log_entry + "\n")
- # (G) If code fails or mismatches, tell LLM
- if not code_success:
- short_err = error_msg[:600]
- fail_msg = (
- f"Your code threw an error:\n\n{short_err}\n\n"
- "Please fix this and ensure the output matches exactly next time."
- )
- messages.append({"role": "user", "content": fail_msg})
- print(f"[DEBUG] Round {iteration} code run ERROR, time={time_taken:.2f}\n{error_msg}")
- elif not match:
- mismatch_msg = (
- "Your code ran but final_output.ppm != original_output.ppm.\n"
- "Please fix this so the output is bit-for-bit identical."
- )
- messages.append({"role": "user", "content": mismatch_msg})
- print(f"[DEBUG] Round {iteration} MISMATCH, time={time_taken:.2f}.")
- else:
- # Code success & matched => adopt new code
- current_code = new_code
- best_time = time_taken
- print(f"[DEBUG] Round {iteration}: SUCCESS. time={time_taken:.2f}, output matched.")
- # (H) Record this round's performance in memory
- performance_history.append({
- "round": iteration,
- "time": time_taken if code_success else 0.0,
- "match": match,
- "success": code_success
- })
- print("\n=== Optimization loop finished. Check 'optimization_results.txt' for details. ===")
- if __name__ == "__main__":
- main_optimization_loop(rounds=10)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement