mvp-factory-openhands/test-scripts/openhands-pty.py

115 lines
3.6 KiB
Python
Executable File

#!/usr/bin/env python3
"""
OpenHands CLI wrapper with pty for proper TTY emulation
Usage: python3 /home/bam/openhands-pty.py "task description"
"""
import pty
import os
import sys
import select
import time
def run_openhands_with_pty(task):
"""Run OpenHands in a pseudo-TTY for proper interactive handling"""
print(f"Running OpenHands with task: {task}")
# Create a pseudo-TTY
master, slave = pty.openpty()
slave_name = os.ttyname(slave)
# Fork the process
pid = os.fork()
if pid == 0: # Child process
os.close(master)
os.dup2(slave, 0) # stdin
os.dup2(slave, 1) # stdout
os.dup2(slave, 2) # stderr
os.close(slave)
# Execute OpenHands
os.execv('/home/bam/.local/bin/openhands', ['openhands', '-t', task])
else: # Parent process
os.close(slave)
# Variables to track state
task_sent = False
confirmation_sent = False
exit_sent = False
# Set timeout for select
import signal
def timeout_handler(signum, frame):
print("\nTimeout reached, sending exit command")
os.write(master, b"exit\n")
os.waitpid(pid, 0)
os.close(master)
return
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(60) # 60 second timeout
try:
while True:
# Check if we can read from the master
r, w, e = select.select([master], [], [master], 1)
if master in r:
try:
output = os.read(master, 1024).decode('utf-8', errors='replace')
print(output, end='')
# Send task if not sent yet
if not task_sent and "Type your message" in output:
time.sleep(1)
os.write(master, f"{task}\n".encode())
task_sent = True
print(f"\n✓ Sent task: {task}")
# Send confirmation if prompted
if "Choose an option:" in output and not confirmation_sent:
time.sleep(2)
os.write(master, b"Always proceed (don't ask again)\n")
confirmation_sent = True
print("✓ Sent auto-confirmation")
# Send exit after some time
if confirmation_sent and not exit_sent:
time.sleep(5)
os.write(master, b"exit\n")
exit_sent = True
print("✓ Sent exit command")
except OSError:
# Process has finished
break
# Check if child process has exited
try:
pid_ret = os.waitpid(pid, os.WNOHANG)
if pid_ret[0] != 0:
break
except ChildProcessError:
break
finally:
signal.alarm(0) # Cancel timeout
os.close(master)
os.waitpid(pid, 0)
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage: python3 /home/bam/openhands-pty.py 'task description'")
sys.exit(1)
task = sys.argv[1]
try:
run_openhands_with_pty(task)
except KeyboardInterrupt:
print("\nInterrupted")
sys.exit(1)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)