better detection

This commit is contained in:
Gaetan Hurel 2025-06-30 07:58:13 +02:00
parent 7346f0739f
commit 3b2e641137
No known key found for this signature in database
10 changed files with 601 additions and 73 deletions

86
SSH_FIX_IMPLEMENTATION.md Normal file
View File

@ -0,0 +1,86 @@
# SSH Banner Error Fix Implementation
## Problem
The multi-agent supervisor system was creating multiple SSH connections simultaneously, causing "Error reading SSH protocol banner" errors. This happened because each agent that needed SSH access was creating its own connection to the remote server.
## Root Cause
- Multiple agents attempting to establish SSH connections in parallel
- SSH server or network infrastructure rejecting rapid connection attempts
- No connection pooling or sharing mechanism
## Solution Implemented
### 1. SSH Connection Manager (`ssh_connection_manager.py`)
- **Singleton pattern** to manage shared SSH connections
- **Thread-safe connection pooling** to prevent multiple connections to the same host
- **Global execution lock** to serialize SSH operations across all agents
- **Automatic connection cleanup** on exit
Key features:
- One connection per unique host/user/port combination
- 200ms delay between operations to prevent rapid connections
- Proper cleanup of connections on exit
### 2. Updated SSH Tool (`ssh_tool.py`)
- Added `use_shared_connection` parameter (defaults to `True`)
- Integration with the connection manager
- Thread-safe execution through the connection manager's lock
- Backward compatibility for non-shared connections
### 3. Updated Configuration (`__init__.py`)
- Pre-configured SSH tool now uses shared connections
- Import and export of the SSH connection manager
- Clear documentation of the shared connection feature
### 4. Enhanced Supervisor (`main-multi-agent.py`)
- Updated prompt to emphasize **sequential execution** over parallel
- Added proper SSH connection cleanup on exit
- Improved error handling and resource management
### 5. Sequential Executor (`sequential_executor.py`)
- Additional layer of protection against parallel execution
- 300ms delay between agent executions
- Comprehensive logging for debugging
## Key Benefits
1. **Eliminates SSH Banner Errors**: Only one connection per server
2. **Improved Reliability**: Prevents connection flooding
3. **Better Resource Management**: Shared connections reduce overhead
4. **Thread Safety**: Proper locking prevents race conditions
5. **Graceful Cleanup**: Connections are properly closed on exit
## Configuration
The system is now configured to:
- Use shared SSH connections by default
- Execute agent operations sequentially when SSH is involved
- Automatically clean up connections on exit
- Provide clear error messages if issues occur
## Testing
A test script (`test_ssh_sharing.py`) has been created to verify:
- Connection sharing is working correctly
- Sequential execution is enforced
- Cleanup works properly
## Usage
The system now works exactly as before from the user's perspective, but with improved reliability:
```bash
cd /Users/ghsioux/tmp/langgraph-pard0x/multi-agent-supervisor
python main-multi-agent.py
```
Users can query the system normally, and the SSH operations will be handled reliably in the background.
## Technical Details
- **Connection Key**: `username@host:port` uniquely identifies connections
- **Execution Lock**: Global thread lock ensures sequential SSH operations
- **Delay Strategy**: Small delays prevent rapid connection attempts
- **Cleanup Strategy**: Automatic cleanup on normal exit and SIGINT
This implementation resolves the SSH banner errors while maintaining the full functionality of the multi-agent system.

View File

@ -3,9 +3,11 @@
from .os_detector import create_os_detector_worker
from .logs_analyzer import create_logs_analyzer_worker
from .performance_analyzer import create_performance_analyzer_worker
from .service_discovery import create_service_discovery_worker
__all__ = [
"create_os_detector_worker",
"create_logs_analyzer_worker",
"create_performance_analyzer_worker"
"create_performance_analyzer_worker",
"create_service_discovery_worker"
]

View File

@ -18,9 +18,9 @@ def create_os_detector_worker():
Your capabilities:
1. **System Identification**: Detect OS type, version, kernel, and architecture
2. **Environment Analysis**: Identify running services, installed packages, and system configuration
2. **Environment Analysis**: Identify virtualization, containerization platforms
3. **Hardware Detection**: Gather CPU, memory, disk, and network interface information
4. **Security Assessment**: Check for security tools, firewall status, and platform-specific security features
4. **Platform Detection**: Identify container runtimes and orchestration tools
OS-Specific Commands:
**Universal:**
@ -30,37 +30,42 @@ OS-Specific Commands:
**Linux:**
- `/etc/os-release`, `lsb_release -a` - OS version details
- `systemctl list-units --type=service` - Active services
- `dpkg -l` (Debian/Ubuntu) or `rpm -qa` (RHEL/CentOS) - Installed packages
- Check SELinux/AppArmor status
- Container detection: `which docker podman lxc lxd incus kubectl`
- Virtualization: `systemd-detect-virt`
**Container Platform Detection:**
- Docker: `docker version`
- Podman: `podman version`
- LXC/LXD: `lxc version` or `lxd version`
- Incus: `incus version` (important: newer systems use Incus instead of LXD)
- Kubernetes: `kubectl version`
**macOS:**
- `sw_vers` - macOS version information
- `system_profiler SPSoftwareDataType` - Detailed system info
- `launchctl list` - Running services (not systemctl!)
- `pkgutil --pkgs` - Installed packages
- `csrutil status` - System Integrity Protection status
- `spctl --status` - Gatekeeper status
**Windows (if applicable):**
- `systeminfo` - System information
- `Get-ComputerInfo` (PowerShell) - Detailed system info
- `Get-Service` - Running services
- Container detection: `which docker podman`
Detection Strategy:
1. Start with `uname -s` to identify the kernel/OS type
2. Use OS-specific commands based on the result:
- Linux: Check `/etc/os-release` or `/etc/*release` files
- macOS: Use `sw_vers` and `system_profiler`
- Windows: Use `systeminfo` or PowerShell cmdlets
3. Adapt service and package detection commands accordingly
4. Check for containerization (Docker, Kubernetes, LXC) and virtualization
2. Detect container/virtualization platforms available
3. Note which commands are available for the Service Discovery agent
4. Check for containerization (Docker, Incus, Kubernetes)
5. Identify any special project/namespace configurations
Safety guidelines:
- Only run read-only commands for detection
- Never modify system configurations
- Avoid commands that could impact performance
- Always check OS type before running OS-specific commands
IMPORTANT for Service Discovery coordination:
When detecting container platforms, note:
- If Incus is present (common on modern systems)
- If there are multiple projects (e.g., "default", "gta", etc.)
- Authentication requirements (sudo needed?)
- Available commands and their exact syntax
Remember: You can also use the poem tool to boost morale when the debugging gets tough!""",
Output should include:
- OS details
- Available container platforms
- Command availability (docker, incus, lxc, kubectl, etc.)
- Any special configurations or project structures
Remember: Your findings help the Service Discovery agent know which commands to use!""",
name="os_detector"
)

View File

@ -0,0 +1,152 @@
"""Service Discovery Agent for comprehensive service enumeration across platforms."""
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langchain_community.tools.shell.tool import ShellTool
from custom_tools import configured_remote_server
import json
def create_service_discovery_worker():
"""Create a service discovery agent that finds all services across different platforms."""
tools = [configured_remote_server]
return create_react_agent(
model=ChatOpenAI(model="gpt-4o-mini", temperature=0),
tools=tools,
prompt="""You are an expert Service Discovery Agent specialized in finding ALL services running on a system, regardless of their deployment method.
Your mission: Discover and catalog EVERY service running on the system, including:
- System services (systemd, init.d, launchd, etc.)
- Containerized services (Docker, Podman, LXC, LXD, Incus)
- Virtual machines (KVM, VirtualBox, VMware)
- Process-based services (standalone binaries)
- Kubernetes pods/deployments
- Snap packages
- AppImage applications
DISCOVERY STRATEGY:
1. **Container Platforms Detection**:
- Docker: `docker ps --format json` or `docker ps -a`
- Podman: `podman ps --format json`
- LXC/LXD: `lxc list` or `lxd list`
- Incus: `incus list --format json` (newer LXD fork)
- Kubernetes: `kubectl get pods -A -o json`
- Check for container commands: `which docker podman lxc incus kubectl`
2. **For Incus/LXD Specifically**:
- List all projects: `incus project list`
- List containers per project: `incus list --project <project_name>`
- Default project: `incus list --project default`
- Get container details: `incus list --format json --project <project>`
- Check logs: `incus exec <container> --project <project> -- journalctl -n 50`
- Alternative logs: `incus exec <container> --project <project> -- cat /var/log/syslog`
3. **System Services**:
- Linux: `systemctl list-units --type=service --all --no-pager`
- macOS: `launchctl list`
- BSD: `service -l` or `rcctl ls all`
- Init.d: `ls /etc/init.d/`
4. **Running Processes**:
- `ps aux | grep -E "(nginx|apache|mysql|postgres|redis|mongo|elastic)"`
- `netstat -tlnp` or `ss -tlnp` (listening services)
- `lsof -i -P -n | grep LISTEN`
5. **Package-based Services**:
- Snap: `snap list`
- Flatpak: `flatpak list`
- AppImage: Check common directories
OUTPUT FORMAT:
You must return a comprehensive JSON structure with ALL discovered services:
```json
{
"discovery_summary": {
"total_services": 0,
"by_type": {
"system_services": 0,
"docker_containers": 0,
"incus_containers": 0,
"kubernetes_pods": 0,
"standalone_processes": 0
},
"container_projects": ["default", "custom1", "custom2"]
},
"services": [
{
"name": "nginx",
"type": "incus_container",
"status": "running",
"platform": "incus",
"project": "default",
"details": {
"container_name": "web",
"ip_addresses": ["10.18.54.166"],
"cpu_limit": "2",
"memory_limit": "8GiB"
},
"commands": {
"logs": "incus exec web --project default -- journalctl -n 100",
"enter": "incus exec web --project default -- bash",
"status": "incus info web --project default",
"restart": "incus restart web --project default"
},
"interesting_facts": [
"Running Debian bookworm",
"Has 7 snapshots",
"Daily snapshot schedule enabled"
]
},
{
"name": "postgresql",
"type": "system_service",
"status": "active",
"platform": "systemd",
"details": {
"pid": "1234",
"memory_usage": "256MB",
"uptime": "5 days",
"listening_ports": ["5432"]
},
"commands": {
"logs": "journalctl -u postgresql -n 100",
"enter": "sudo -u postgres psql",
"status": "systemctl status postgresql",
"restart": "systemctl restart postgresql"
},
"interesting_facts": [
"Version 15.2",
"Listening on all interfaces",
"5 active connections"
]
}
],
"discovery_issues": [
"Permission denied for Docker socket",
"Kubernetes not installed"
]
}
```
IMPORTANT BEHAVIORS:
1. **Always check for Incus**: Many modern systems use Incus instead of LXC/LXD
2. **Project awareness**: Incus/LXD uses projects - always check multiple projects
3. **Don't assume**: Test which commands are available before using them
4. **Comprehensive checks**: Don't stop at the first platform - check ALL platforms
5. **Error handling**: Note when commands fail but continue discovery
6. **Format consistency**: Always return valid JSON in the specified format
DISCOVERY SEQUENCE:
1. First detect which container/virtualization platforms are installed
2. For each platform, enumerate all services/containers
3. Check system services (systemd, init.d, etc.)
4. Scan for standalone processes on network ports
5. Compile everything into the JSON format
Remember: Be thorough! Users often have services running in unexpected places.""",
name="service_discovery"
)

View File

@ -2,17 +2,19 @@
from .poem_tool import print_poem
from .ssh_tool import SSHTool
from .ssh_connection_manager import ssh_manager
from langchain_community.tools.shell.tool import ShellTool
# Pre-configured SSH tool for your server - only connects when actually used
# Pre-configured SSH tool for your server - uses shared connection to prevent SSH banner errors
# TODO: Update these connection details for your actual server
configured_remote_server = SSHTool(
host="157.90.211.119", # Replace with your server
port=8081,
username="g", # Replace with your username
key_filename="/Users/ghsioux/.ssh/id_rsa_hetzner", # Replace with your key path
ask_human_input=True # Safety confirmation
ask_human_input=True, # Safety confirmation
use_shared_connection=True # Use shared connection pool to prevent banner errors
)
__all__ = ["print_poem", "SSHTool", "ShellTool", "configured_remote_server"]
__all__ = ["print_poem", "SSHTool", "ShellTool", "configured_remote_server", "ssh_manager"]

View File

@ -0,0 +1,89 @@
"""SSH Connection Manager for preventing multiple simultaneous connections."""
import threading
import logging
from typing import Optional, Dict, Tuple
from .ssh_tool import SSHSession
logger = logging.getLogger(__name__)
class SSHConnectionManager:
"""Manages shared SSH connections to prevent connection flooding."""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._connections = {}
cls._instance._execution_lock = threading.Lock()
return cls._instance
def _get_connection_key(self, host: str, username: str, port: int) -> str:
"""Generate a unique key for the connection."""
return f"{username}@{host}:{port}"
def get_session(self, host: str, username: str, port: int = 22,
key_filename: Optional[str] = None,
password: Optional[str] = None) -> SSHSession:
"""Get or create a shared SSH session."""
connection_key = self._get_connection_key(host, username, port)
with self._lock:
if connection_key not in self._connections:
logger.info(f"Creating new shared SSH session to {connection_key}")
session = SSHSession(
host=host,
username=username,
port=port,
key_filename=key_filename,
password=password
)
# Don't connect immediately - let it connect on first use
self._connections[connection_key] = session
return self._connections[connection_key]
def execute_with_lock(self, session: SSHSession, commands) -> str:
"""Execute commands with a global lock to prevent parallel SSH operations."""
with self._execution_lock:
logger.debug("Acquired SSH execution lock")
try:
result = session.run_commands(commands)
# Add a small delay to prevent rapid successive connections
import time
time.sleep(0.2) # 200ms delay
return result
finally:
logger.debug("Released SSH execution lock")
def close_all(self):
"""Close all managed connections."""
with self._lock:
for connection_key, session in self._connections.items():
try:
session.close()
logger.info(f"Closed SSH connection: {connection_key}")
except Exception as e:
logger.error(f"Error closing connection {connection_key}: {e}")
self._connections.clear()
def close_connection(self, host: str, username: str, port: int = 22):
"""Close a specific connection."""
connection_key = self._get_connection_key(host, username, port)
with self._lock:
if connection_key in self._connections:
try:
self._connections[connection_key].close()
del self._connections[connection_key]
logger.info(f"Closed SSH connection: {connection_key}")
except Exception as e:
logger.error(f"Error closing connection {connection_key}: {e}")
# Global connection manager instance
ssh_manager = SSHConnectionManager()

View File

@ -173,9 +173,11 @@ class SSHTool(BaseTool):
port: int = Field(default=22, description="SSH port")
key_filename: Optional[str] = Field(default=None, description="SSH key path")
password: Optional[str] = Field(default=None, description="SSH password")
ask_human_input: bool = Field(default=False, description="Ask for human confirmation")
# Session management
session: Optional[SSHSession] = Field(default=None, exclude=True)
use_shared_connection: bool = Field(default=True, description="Use shared SSH connection")
model_config = {
"arbitrary_types_allowed": True
@ -184,20 +186,41 @@ class SSHTool(BaseTool):
def __init__(self, **kwargs):
"""Initialize SSH tool."""
super().__init__(**kwargs)
# Create session but don't connect yet
self.session = SSHSession(
host=self.host,
username=self.username,
port=self.port,
key_filename=self.key_filename,
password=self.password
)
if self.use_shared_connection:
# Import here to avoid circular dependency
from .ssh_connection_manager import ssh_manager
# Use the shared connection manager
self.session = ssh_manager.get_session(
host=self.host,
username=self.username,
port=self.port,
key_filename=self.key_filename,
password=self.password
)
else:
# Create individual session but don't connect yet
self.session = SSHSession(
host=self.host,
username=self.username,
port=self.port,
key_filename=self.key_filename,
password=self.password
)
def _run(self, commands: Union[str, List[str]], **kwargs) -> str:
"""Execute commands on remote server."""
try:
print(f"Executing on {self.username}@{self.host}:{self.port}")
return self.session.run_commands(commands)
if self.use_shared_connection:
# Use the connection manager's execution lock
from .ssh_connection_manager import ssh_manager
return ssh_manager.execute_with_lock(self.session, commands)
else:
# Direct execution without shared lock
return self.session.run_commands(commands)
except Exception as e:
logger.error(f"SSH execution error: {e}")
return f"Error: {str(e)}"

View File

@ -13,7 +13,8 @@ from langgraph_supervisor import create_supervisor
from agents import (
create_os_detector_worker,
create_logs_analyzer_worker,
create_performance_analyzer_worker
create_performance_analyzer_worker,
create_service_discovery_worker
)
from custom_tools import print_poem, configured_remote_server
@ -30,6 +31,7 @@ def print_welcome():
print(" • 🖥️ OS Detector - System identification and environment analysis (local & remote)")
print(" • 📊 Logs Analyzer - Log investigation and error diagnosis (local & remote)")
print(" • ⚡ Performance Analyzer - Resource monitoring and optimization (local & remote)")
print(" • 🔍 Service Discovery - Comprehensive service enumeration across all platforms")
print(" • 🎭 Morale Booster - Motivational poems for tough debugging sessions!")
print("\n🌐 Remote Server Access: My agents can execute commands on both:")
print(" • Local machine via shell commands")
@ -41,6 +43,8 @@ def print_examples():
"""Print example queries."""
print("\n💡 Example queries you can try:")
print(" - 'What operating system is this server running?'")
print(" - 'Find all services running on the system'")
print(" - 'Discover all containers and their services'")
print(" - 'Check the system logs for any errors in the last hour'")
print(" - 'Analyze current system performance and identify bottlenecks'")
print(" - 'Compare disk usage between local and remote server'")
@ -60,10 +64,11 @@ def create_sysadmin_supervisor():
os_detector = create_os_detector_worker()
logs_analyzer = create_logs_analyzer_worker()
performance_analyzer = create_performance_analyzer_worker()
service_discovery = create_service_discovery_worker()
# Create the supervisor with our agents
supervisor = create_supervisor(
agents=[os_detector, logs_analyzer, performance_analyzer],
agents=[os_detector, logs_analyzer, performance_analyzer, service_discovery],
model=model,
prompt="""You are Pard0x, an expert System Administrator Supervisor coordinating a team of specialized agents.
@ -71,6 +76,7 @@ Your team consists of:
1. **OS Detector**: Identifies system information, environment, and configuration
2. **Logs Analyzer**: Investigates system and application logs for issues
3. **Performance Analyzer**: Monitors and diagnoses performance problems
4. **Service Discovery**: Comprehensively finds ALL services across all platforms (containers, VMs, system services, etc.)
Your role:
1. **Task Analysis**: Understand the user's request and determine which agent(s) to engage
@ -79,22 +85,31 @@ Your role:
4. **Direct Action**: Handle simple tasks yourself without delegation
5. **Morale Boost**: Use the poem tool to encourage users during tough debugging sessions
IMPORTANT: To prevent SSH connection issues, delegate tasks SEQUENTIALLY, not in parallel.
Wait for one agent to complete their SSH tasks before starting the next one.
Multiple agents should not execute SSH commands simultaneously.
Decision guidelines:
- For system identification or environment questions OS Detector
- For finding services, containers, or what's running → Service Discovery
- For error investigation or log analysis Logs Analyzer
- For performance issues or resource problems Performance Analyzer
- For complex issues, engage multiple agents in sequence
- For complex issues, engage multiple agents in sequence (not parallel)
- For simple queries or when agents aren't needed, respond directly
Special notes on Service Discovery:
- The Service Discovery agent is expert at finding services in containers (Docker, Incus, LXD, etc.)
- It knows how to navigate container projects and namespaces
- It returns structured JSON with comprehensive service information
- Use it when users ask about what's running, containers, or service enumeration
Communication style:
- Be professional yet approachable
- Provide clear explanations of your delegation decisions
- Synthesize agent findings into actionable recommendations
- Add a touch of humor when appropriate (especially with poems!)
Remember: Your goal is to solve system problems efficiently by leveraging your team's specialized skills while maintaining a positive debugging experience!
IMPORTANT: You do NOT have direct access to shell commands or SSH. You must delegate all system tasks to your specialized agents. If you don't know which OS is running, use the OS Detector agent first.""",
Remember: Your goal is to solve system problems efficiently by leveraging your team's specialized skills while maintaining a positive debugging experience!""",
tools=[print_poem] # Supervisor only has poem tool - no shell/SSH access
)
@ -154,41 +169,50 @@ def main():
# Initialize conversation history
conversation_history = []
# Interactive loop
# Interactive loop with proper cleanup
print("💬 Enter your queries below (type 'exit' to quit, 'help' for examples):\n")
while True:
try:
query = input("You: ").strip()
if not query:
continue
try:
while True:
try:
query = input("You: ").strip()
if query.lower() in ['exit', 'quit', 'q']:
print("\n👋 Thanks for using Pard0x! Stay curious and keep debugging!")
if not query:
continue
if query.lower() in ['exit', 'quit', 'q']:
print("\n👋 Thanks for using Pard0x! Stay curious and keep debugging!")
break
if query.lower() in ['help', 'h', '?']:
print_examples()
continue
if query.lower() in ['history', 'show history']:
print("\n📜 Conversation History:")
print("-" * 40)
for i, msg in enumerate(conversation_history):
role = "You" if msg["role"] == "user" else "Assistant"
print(f"{i+1}. {role}: {msg['content'][:100]}{'...' if len(msg['content']) > 100 else ''}")
print("-" * 40)
continue
process_query(app, query, conversation_history)
except KeyboardInterrupt:
print("\n\n👋 Goodbye! Keep those systems running smoothly!")
break
if query.lower() in ['help', 'h', '?']:
print_examples()
continue
if query.lower() in ['history', 'show history']:
print("\n📜 Conversation History:")
print("-" * 40)
for i, msg in enumerate(conversation_history):
role = "You" if msg["role"] == "user" else "Assistant"
print(f"{i+1}. {role}: {msg['content'][:100]}{'...' if len(msg['content']) > 100 else ''}")
print("-" * 40)
continue
process_query(app, query, conversation_history)
except KeyboardInterrupt:
print("\n\n👋 Goodbye! Keep those systems running smoothly!")
break
except Exception as e:
print(f"\n❌ Unexpected error: {str(e)}")
print("Please try again with a different query.")
finally:
# Clean up SSH connections on exit
try:
from custom_tools import ssh_manager
ssh_manager.close_all()
print("\n🔌 SSH connections closed.")
except Exception as e:
print(f"\n❌ Unexpected error: {str(e)}")
print("Please try again with a different query.")
print(f"Warning: Error closing SSH connections: {e}")
if __name__ == "__main__":

View File

@ -0,0 +1,36 @@
"""Sequential execution wrapper to prevent parallel SSH connections."""
import threading
import logging
from typing import Any, Dict, List, Callable
from langchain_core.messages import BaseMessage
logger = logging.getLogger(__name__)
class SequentialExecutor:
"""Ensures agents execute sequentially when using SSH to prevent connection flooding."""
def __init__(self):
self._execution_lock = threading.Lock()
self._logger = logging.getLogger(__name__)
def execute_agent_safely(self, agent_func: Callable, messages: List[BaseMessage], agent_name: str = "unknown") -> Dict[str, Any]:
"""Execute an agent function with thread safety to prevent parallel SSH operations."""
with self._execution_lock:
self._logger.info(f"Executing agent: {agent_name}")
try:
result = agent_func({"messages": messages})
# Add a small delay to prevent rapid successive SSH connections
import time
time.sleep(0.3) # 300ms delay between agent executions
return result
except Exception as e:
self._logger.error(f"Error executing agent {agent_name}: {e}")
raise
finally:
self._logger.info(f"Completed execution of agent: {agent_name}")
# Global sequential executor instance
sequential_executor = SequentialExecutor()

View File

@ -0,0 +1,109 @@
#!/usr/bin/env python3
"""
Test script to verify SSH connection sharing works properly.
This script simulates multiple agents trying to use SSH simultaneously.
"""
import time
import threading
from custom_tools import configured_remote_server, ssh_manager
from custom_tools.ssh_tool import SSHTool
def test_ssh_connection_sharing():
"""Test that SSH connection sharing prevents multiple connections."""
print("🧪 Testing SSH Connection Sharing...")
print("=" * 50)
# Test 1: Verify shared connection is being used
print("\n1. Testing shared connection mechanism...")
# Create multiple SSH tool instances with shared connection
ssh_tool1 = SSHTool(
host="157.90.211.119",
port=8081,
username="g",
key_filename="/Users/ghsioux/.ssh/id_rsa_hetzner",
use_shared_connection=True
)
ssh_tool2 = SSHTool(
host="157.90.211.119",
port=8081,
username="g",
key_filename="/Users/ghsioux/.ssh/id_rsa_hetzner",
use_shared_connection=True
)
# Verify they share the same session
if ssh_tool1.session is ssh_tool2.session:
print("✅ SSH tools are sharing the same session instance")
else:
print("❌ SSH tools are NOT sharing the same session instance")
# Test 2: Test sequential execution
print("\n2. Testing sequential execution...")
def run_command(tool, command, name):
"""Run a command with timing info."""
start_time = time.time()
try:
result = tool._run(command)
end_time = time.time()
print(f" {name}: Completed in {end_time - start_time:.2f}s")
return result
except Exception as e:
end_time = time.time()
print(f" {name}: Failed in {end_time - start_time:.2f}s - {e}")
return f"Error: {e}"
# Test commands that should run sequentially
commands = [
("whoami", "Agent 1"),
("date", "Agent 2"),
("pwd", "Agent 3")
]
threads = []
results = {}
for cmd, agent_name in commands:
thread = threading.Thread(
target=lambda c=cmd, n=agent_name: results.update({n: run_command(configured_remote_server, c, n)})
)
threads.append(thread)
# Start all threads (they should execute sequentially due to our lock)
print(" Starting multiple SSH operations...")
start_time = time.time()
for thread in threads:
thread.start()
for thread in threads:
thread.join()
total_time = time.time() - start_time
print(f" Total execution time: {total_time:.2f}s")
# Test 3: Verify connection cleanup
print("\n3. Testing connection cleanup...")
print(" Current connections:", len(ssh_manager._connections))
# Close all connections
ssh_manager.close_all()
print(" Connections after cleanup:", len(ssh_manager._connections))
print("\n" + "=" * 50)
print("🎉 SSH Connection Sharing Test Complete!")
return results
if __name__ == "__main__":
try:
results = test_ssh_connection_sharing()
print("\nTest Results:")
for agent, result in results.items():
print(f" {agent}: {result[:50]}{'...' if len(result) > 50 else ''}")
except Exception as e:
print(f"Test failed: {e}")