agent-pard0x/multi-agent-supervisor/main-multi-agent.py
Gaetan Hurel 98aa3301d1
use 4.1
2025-06-30 17:13:52 +02:00

220 lines
9.3 KiB
Python

#!/usr/bin/env python3
"""
Pard0x Multi-Agent System Administrator Assistant
A supervisor-based system that coordinates specialized agents for system administration tasks.
"""
import sys
import warnings
import readline # Enable arrow key support and command history in input()
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langgraph_supervisor import create_supervisor
from agents import (
create_os_detector_worker,
create_logs_analyzer_worker,
create_performance_analyzer_worker,
create_service_discovery_worker
)
from custom_tools import print_poem, configured_remote_server
# Suppress the shell tool warning since worker agents use it intentionally for sysadmin tasks
warnings.filterwarnings("ignore", message="The shell tool has no safeguards by default. Use at your own risk.")
def print_welcome():
"""Print welcome message and system capabilities."""
print("\n" + "="*80)
print("🤖 Welcome to Pard0x Multi-Agent System Administrator Assistant!")
print("="*80)
print("\nI coordinate a team of specialized agents to help you with system administration tasks:")
print(" • 🖥️ OS Detector - System identification and environment analysis (local & remote)")
print(" • 📊 Logs Analyzer - Log investigation and error diagnosis (local & remote)")
print(" • ⚡ Performance Analyzer - Resource monitoring and optimization (local & remote)")
print(" • 🔍 Service Discovery - Comprehensive service enumeration across all platforms")
print(" • 🎭 Morale Booster - Motivational poems for tough debugging sessions!")
print("\n🌐 Remote Server Access: My agents can execute commands on both:")
print(" • Local machine via shell commands")
print(" • Remote server via SSH (g@157.90.211.119:8081)")
print("\n" + "-"*80)
def print_examples():
"""Print example queries."""
print("\n💡 Example queries you can try:")
print(" - 'What operating system is this server running?'")
print(" - 'Find all services running on the system'")
print(" - 'Discover all containers and their services'")
print(" - 'Check the system logs for any errors in the last hour'")
print(" - 'Analyze current system performance and identify bottlenecks'")
print(" - 'Compare disk usage between local and remote server'")
print(" - 'Check if services are running on both systems'")
print(" - 'My web server is down, help me troubleshoot'")
print(" - 'Write me a motivational poem about debugging'")
print("\n" + "-"*80)
def create_sysadmin_supervisor():
"""Create the main supervisor that coordinates between specialized agents."""
# Get the base model
model = ChatOpenAI(model="gpt-4.1", temperature=0)
# Create specialized workers
os_detector = create_os_detector_worker()
logs_analyzer = create_logs_analyzer_worker()
performance_analyzer = create_performance_analyzer_worker()
service_discovery = create_service_discovery_worker()
# Create the supervisor with our agents
supervisor = create_supervisor(
agents=[os_detector, logs_analyzer, performance_analyzer, service_discovery],
model=model,
prompt="""You are Pard0x, an expert System Administrator Supervisor coordinating a team of specialized agents.
Your team consists of:
1. **OS Detector**: Identifies system information, environment, and configuration
2. **Logs Analyzer**: Investigates system and application logs for issues
3. **Performance Analyzer**: Monitors and diagnoses performance problems
4. **Service Discovery**: Comprehensively finds ALL services across all platforms (containers, VMs, system services, etc.)
Your role:
1. **Task Analysis**: Understand the user's request and determine which agent(s) to engage
2. **Coordination**: Delegate tasks to appropriate agents based on their specialties
3. **Synthesis**: Combine insights from multiple agents into coherent solutions
4. **Direct Action**: Handle simple tasks yourself without delegation
5. **Morale Boost**: Use the poem tool to encourage users during tough debugging sessions
IMPORTANT: To prevent SSH connection issues, delegate tasks SEQUENTIALLY, not in parallel.
Wait for one agent to complete their SSH tasks before starting the next one.
Multiple agents should not execute SSH commands simultaneously.
Decision guidelines:
- For system identification or environment questions → OS Detector
- For finding services, containers, or what's running → Service Discovery
- For error investigation or log analysis → Logs Analyzer
- For performance issues or resource problems → Performance Analyzer
- For complex issues, engage multiple agents in sequence (not parallel)
- For simple queries or when agents aren't needed, respond directly
Special notes on Service Discovery:
- The Service Discovery agent is expert at finding services in containers (Docker, Incus, LXD, etc.)
- It knows how to navigate container projects and namespaces
- It returns structured JSON with comprehensive service information
- Use it when users ask about what's running, containers, or service enumeration
Communication style:
- Be professional yet approachable
- Provide clear explanations of your delegation decisions
- Synthesize agent findings into actionable recommendations
- Add a touch of humor when appropriate (especially with poems!)
Remember: Your goal is to solve system problems efficiently by leveraging your team's specialized skills while maintaining a positive debugging experience!""",
tools=[print_poem] # Supervisor only has poem tool - no shell/SSH access
)
return supervisor.compile()
def process_query(app, query: str, conversation_history: list) -> None:
"""Process a user query through the supervisor system with conversation history."""
print(f"\n🔄 Processing your request: '{query}'")
print("-" * 80)
# Convert conversation history to LangChain message format
messages = []
for msg in conversation_history:
if msg["role"] == "user":
messages.append(HumanMessage(content=msg["content"]))
else: # assistant
# Create an AI message - LangGraph will handle this format
messages.append({"role": "assistant", "content": msg["content"]})
# Add the new user message
messages.append(HumanMessage(content=query))
# Stream the response
collected_responses = []
for chunk in app.stream(
{"messages": messages},
stream_mode="values"
):
chunk["messages"][-1].pretty_print()
# Collect AI responses
if chunk["messages"] and hasattr(chunk["messages"][-1], 'type') and chunk["messages"][-1].type == "ai":
collected_responses.append(chunk["messages"][-1].content)
# Add both user and assistant messages to history
conversation_history.append({"role": "user", "content": query})
if collected_responses:
# Use the last response (most complete)
conversation_history.append({"role": "assistant", "content": collected_responses[-1]})
def main():
"""Main interaction loop with conversation history."""
print_welcome()
print_examples()
# Create the supervisor system
print("\n🚀 Initializing the multi-agent system...")
try:
app = create_sysadmin_supervisor()
print("✅ System ready!\n")
except Exception as e:
print(f"❌ Failed to initialize system: {str(e)}")
sys.exit(1)
# Initialize conversation history
conversation_history = []
# Interactive loop with proper cleanup
print("💬 Enter your queries below (type 'exit' to quit, 'help' for examples):\n")
try:
while True:
try:
query = input("You: ").strip()
if not query:
continue
if query.lower() in ['exit', 'quit', 'q']:
print("\n👋 Thanks for using Pard0x! Stay curious and keep debugging!")
break
if query.lower() in ['help', 'h', '?']:
print_examples()
continue
if query.lower() in ['history', 'show history']:
print("\n📜 Conversation History:")
print("-" * 40)
for i, msg in enumerate(conversation_history):
role = "You" if msg["role"] == "user" else "Assistant"
print(f"{i+1}. {role}: {msg['content'][:100]}{'...' if len(msg['content']) > 100 else ''}")
print("-" * 40)
continue
process_query(app, query, conversation_history)
except KeyboardInterrupt:
print("\n\n👋 Goodbye! Keep those systems running smoothly!")
break
except Exception as e:
print(f"\n❌ Unexpected error: {str(e)}")
print("Please try again with a different query.")
finally:
# Clean up SSH connections on exit
try:
from custom_tools import ssh_manager
ssh_manager.close_all()
print("\n🔌 SSH connections closed.")
except Exception as e:
print(f"Warning: Error closing SSH connections: {e}")
if __name__ == "__main__":
main()