37 lines
1.4 KiB
Python
37 lines
1.4 KiB
Python
"""Sequential execution wrapper to prevent parallel SSH connections."""
|
|
|
|
import threading
|
|
import logging
|
|
from typing import Any, Dict, List, Callable
|
|
from langchain_core.messages import BaseMessage
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SequentialExecutor:
|
|
"""Ensures agents execute sequentially when using SSH to prevent connection flooding."""
|
|
|
|
def __init__(self):
|
|
self._execution_lock = threading.Lock()
|
|
self._logger = logging.getLogger(__name__)
|
|
|
|
def execute_agent_safely(self, agent_func: Callable, messages: List[BaseMessage], agent_name: str = "unknown") -> Dict[str, Any]:
|
|
"""Execute an agent function with thread safety to prevent parallel SSH operations."""
|
|
with self._execution_lock:
|
|
self._logger.info(f"Executing agent: {agent_name}")
|
|
try:
|
|
result = agent_func({"messages": messages})
|
|
# Add a small delay to prevent rapid successive SSH connections
|
|
import time
|
|
time.sleep(0.3) # 300ms delay between agent executions
|
|
return result
|
|
except Exception as e:
|
|
self._logger.error(f"Error executing agent {agent_name}: {e}")
|
|
raise
|
|
finally:
|
|
self._logger.info(f"Completed execution of agent: {agent_name}")
|
|
|
|
|
|
# Global sequential executor instance
|
|
sequential_executor = SequentialExecutor()
|