115 lines
3.6 KiB
Python
Executable File
115 lines
3.6 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
OpenHands CLI wrapper with pty for proper TTY emulation
|
|
Usage: python3 /home/bam/openhands-pty.py "task description"
|
|
"""
|
|
|
|
import pty
|
|
import os
|
|
import sys
|
|
import select
|
|
import time
|
|
|
|
def run_openhands_with_pty(task):
|
|
"""Run OpenHands in a pseudo-TTY for proper interactive handling"""
|
|
print(f"Running OpenHands with task: {task}")
|
|
|
|
# Create a pseudo-TTY
|
|
master, slave = pty.openpty()
|
|
slave_name = os.ttyname(slave)
|
|
|
|
# Fork the process
|
|
pid = os.fork()
|
|
|
|
if pid == 0: # Child process
|
|
os.close(master)
|
|
os.dup2(slave, 0) # stdin
|
|
os.dup2(slave, 1) # stdout
|
|
os.dup2(slave, 2) # stderr
|
|
os.close(slave)
|
|
|
|
# Execute OpenHands
|
|
os.execv('/home/bam/.local/bin/openhands', ['openhands', '-t', task])
|
|
else: # Parent process
|
|
os.close(slave)
|
|
|
|
# Variables to track state
|
|
task_sent = False
|
|
confirmation_sent = False
|
|
exit_sent = False
|
|
|
|
# Set timeout for select
|
|
import signal
|
|
|
|
def timeout_handler(signum, frame):
|
|
print("\nTimeout reached, sending exit command")
|
|
os.write(master, b"exit\n")
|
|
os.waitpid(pid, 0)
|
|
os.close(master)
|
|
return
|
|
|
|
signal.signal(signal.SIGALRM, timeout_handler)
|
|
signal.alarm(60) # 60 second timeout
|
|
|
|
try:
|
|
while True:
|
|
# Check if we can read from the master
|
|
r, w, e = select.select([master], [], [master], 1)
|
|
|
|
if master in r:
|
|
try:
|
|
output = os.read(master, 1024).decode('utf-8', errors='replace')
|
|
print(output, end='')
|
|
|
|
# Send task if not sent yet
|
|
if not task_sent and "Type your message" in output:
|
|
time.sleep(1)
|
|
os.write(master, f"{task}\n".encode())
|
|
task_sent = True
|
|
print(f"\n✓ Sent task: {task}")
|
|
|
|
# Send confirmation if prompted
|
|
if "Choose an option:" in output and not confirmation_sent:
|
|
time.sleep(2)
|
|
os.write(master, b"Always proceed (don't ask again)\n")
|
|
confirmation_sent = True
|
|
print("✓ Sent auto-confirmation")
|
|
|
|
# Send exit after some time
|
|
if confirmation_sent and not exit_sent:
|
|
time.sleep(5)
|
|
os.write(master, b"exit\n")
|
|
exit_sent = True
|
|
print("✓ Sent exit command")
|
|
|
|
except OSError:
|
|
# Process has finished
|
|
break
|
|
|
|
# Check if child process has exited
|
|
try:
|
|
pid_ret = os.waitpid(pid, os.WNOHANG)
|
|
if pid_ret[0] != 0:
|
|
break
|
|
except ChildProcessError:
|
|
break
|
|
|
|
finally:
|
|
signal.alarm(0) # Cancel timeout
|
|
os.close(master)
|
|
os.waitpid(pid, 0)
|
|
|
|
if __name__ == '__main__':
|
|
if len(sys.argv) < 2:
|
|
print("Usage: python3 /home/bam/openhands-pty.py 'task description'")
|
|
sys.exit(1)
|
|
|
|
task = sys.argv[1]
|
|
try:
|
|
run_openhands_with_pty(task)
|
|
except KeyboardInterrupt:
|
|
print("\nInterrupted")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1) |