"""Sequential execution wrapper to prevent parallel SSH connections.""" import threading import logging from typing import Any, Dict, List, Callable from langchain_core.messages import BaseMessage logger = logging.getLogger(__name__) class SequentialExecutor: """Ensures agents execute sequentially when using SSH to prevent connection flooding.""" def __init__(self): self._execution_lock = threading.Lock() self._logger = logging.getLogger(__name__) def execute_agent_safely(self, agent_func: Callable, messages: List[BaseMessage], agent_name: str = "unknown") -> Dict[str, Any]: """Execute an agent function with thread safety to prevent parallel SSH operations.""" with self._execution_lock: self._logger.info(f"Executing agent: {agent_name}") try: result = agent_func({"messages": messages}) # Add a small delay to prevent rapid successive SSH connections import time time.sleep(0.3) # 300ms delay between agent executions return result except Exception as e: self._logger.error(f"Error executing agent {agent_name}: {e}") raise finally: self._logger.info(f"Completed execution of agent: {agent_name}") # Global sequential executor instance sequential_executor = SequentialExecutor()