218 lines
9.2 KiB
Python
218 lines
9.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Pard0x Multi-Agent System Administrator Assistant
|
|
A supervisor-based system that coordinates specialized agents for system administration tasks.
|
|
"""
|
|
|
|
import sys
|
|
import warnings
|
|
import readline # Enable arrow key support and command history in input()
|
|
from langchain_openai import ChatOpenAI
|
|
from langchain_core.messages import HumanMessage
|
|
from langgraph_supervisor import create_supervisor
|
|
from agents import (
|
|
create_os_detector_worker,
|
|
create_logs_analyzer_worker,
|
|
create_performance_analyzer_worker,
|
|
create_service_discovery_worker
|
|
)
|
|
from custom_tools import configured_remote_server
|
|
from langchain_community.tools.shell.tool import ShellTool
|
|
|
|
# Suppress the shell tool warning since worker agents use it intentionally for sysadmin tasks
|
|
warnings.filterwarnings("ignore", message="The shell tool has no safeguards by default. Use at your own risk.")
|
|
|
|
|
|
def print_welcome():
|
|
"""Print welcome message and system capabilities."""
|
|
print("\n" + "="*80)
|
|
print("🤖 Welcome to Pard0x Multi-Agent System Administrator Assistant!")
|
|
print("="*80)
|
|
print("\nI coordinate a team of specialized agents to help you with system administration tasks:")
|
|
print(" • 🖥️ OS Detector - System identification and environment analysis (local & remote)")
|
|
print(" • 📊 Logs Analyzer - Log investigation and error diagnosis (local & remote)")
|
|
print(" • ⚡ Performance Analyzer - Resource monitoring and optimization (local & remote)")
|
|
print(" • 🔍 Service Discovery - Comprehensive service enumeration across all platforms")
|
|
print("\n🌐 Remote Server Access: My agents can execute commands on both:")
|
|
print(" • Local machine via shell commands")
|
|
print(" • Remote server via SSH (g@157.90.211.119:8081)")
|
|
print("\n" + "-"*80)
|
|
|
|
|
|
def print_examples():
|
|
"""Print example queries."""
|
|
print("\n💡 Example queries you can try:")
|
|
print(" - 'What operating system is this server running?'")
|
|
print(" - 'Find all services running on the system'")
|
|
print(" - 'Discover all containers and their services'")
|
|
print(" - 'Check the system logs for any errors in the last hour'")
|
|
print(" - 'Analyze current system performance and identify bottlenecks'")
|
|
print(" - 'Compare disk usage between local and remote server'")
|
|
print(" - 'Check if services are running on both systems'")
|
|
print(" - 'My web server is down, help me troubleshoot'")
|
|
print("\n" + "-"*80)
|
|
|
|
|
|
def create_sysadmin_supervisor():
|
|
"""Create the main supervisor that coordinates between specialized agents."""
|
|
|
|
# Get the base model
|
|
model = ChatOpenAI(model="gpt-4.1", temperature=0)
|
|
|
|
# Create specialized workers
|
|
os_detector = create_os_detector_worker()
|
|
logs_analyzer = create_logs_analyzer_worker()
|
|
performance_analyzer = create_performance_analyzer_worker()
|
|
service_discovery = create_service_discovery_worker()
|
|
|
|
# Create the supervisor with our agents
|
|
supervisor = create_supervisor(
|
|
agents=[os_detector, logs_analyzer, performance_analyzer, service_discovery],
|
|
model=model,
|
|
prompt="""You are Pard0x, an expert System Administrator Supervisor coordinating a team of specialized agents.
|
|
|
|
Your team consists of:
|
|
1. **OS Detector**: Identifies system information, environment, and configuration
|
|
2. **Logs Analyzer**: Investigates system and application logs for issues
|
|
3. **Performance Analyzer**: Monitors and diagnoses performance problems
|
|
4. **Service Discovery**: Comprehensively finds ALL services across all platforms (containers, VMs, system services, etc.)
|
|
|
|
Your role:
|
|
1. **Task Analysis**: Understand the user's request and determine which agent(s) to engage
|
|
2. **Coordination**: Delegate tasks to appropriate agents based on their specialties
|
|
3. **Synthesis**: Combine insights from multiple agents into coherent solutions
|
|
4. **Direct Action**: Handle simple tasks yourself using shell commands when appropriate
|
|
|
|
IMPORTANT: To prevent SSH connection issues, delegate tasks SEQUENTIALLY, not in parallel.
|
|
Wait for one agent to complete their SSH tasks before starting the next one.
|
|
Multiple agents should not execute SSH commands simultaneously.
|
|
|
|
Decision guidelines:
|
|
- For system identification or environment questions → OS Detector
|
|
- For finding services, containers, or what's running → Service Discovery
|
|
- For error investigation or log analysis → Logs Analyzer
|
|
- For performance issues or resource problems → Performance Analyzer
|
|
- For complex issues, engage multiple agents in sequence (not parallel)
|
|
- For simple queries or when agents aren't needed, respond directly
|
|
|
|
Special notes on Service Discovery:
|
|
- The Service Discovery agent is expert at finding services in containers (Docker, Incus, LXD, etc.)
|
|
- It knows how to navigate container projects and namespaces
|
|
- It returns structured JSON with comprehensive service information
|
|
- Use it when users ask about what's running, containers, or service enumeration
|
|
|
|
Communication style:
|
|
- Be professional yet approachable
|
|
- Provide clear explanations of your delegation decisions
|
|
- Synthesize agent findings into actionable recommendations
|
|
- Use shell commands directly when it's more efficient than delegation
|
|
|
|
Remember: Your goal is to solve system problems efficiently by leveraging your team's specialized skills while maintaining a positive debugging experience!""",
|
|
tools=[ShellTool()] # Supervisor can execute shell commands directly
|
|
)
|
|
|
|
return supervisor.compile()
|
|
|
|
|
|
def process_query(app, query: str, conversation_history: list) -> None:
|
|
"""Process a user query through the supervisor system with conversation history."""
|
|
print(f"\n🔄 Processing your request: '{query}'")
|
|
print("-" * 80)
|
|
|
|
# Convert conversation history to LangChain message format
|
|
messages = []
|
|
for msg in conversation_history:
|
|
if msg["role"] == "user":
|
|
messages.append(HumanMessage(content=msg["content"]))
|
|
else: # assistant
|
|
# Create an AI message - LangGraph will handle this format
|
|
messages.append({"role": "assistant", "content": msg["content"]})
|
|
|
|
# Add the new user message
|
|
messages.append(HumanMessage(content=query))
|
|
|
|
# Stream the response
|
|
collected_responses = []
|
|
|
|
for chunk in app.stream(
|
|
{"messages": messages},
|
|
stream_mode="values"
|
|
):
|
|
chunk["messages"][-1].pretty_print()
|
|
# Collect AI responses
|
|
if chunk["messages"] and hasattr(chunk["messages"][-1], 'type') and chunk["messages"][-1].type == "ai":
|
|
collected_responses.append(chunk["messages"][-1].content)
|
|
|
|
# Add both user and assistant messages to history
|
|
conversation_history.append({"role": "user", "content": query})
|
|
if collected_responses:
|
|
# Use the last response (most complete)
|
|
conversation_history.append({"role": "assistant", "content": collected_responses[-1]})
|
|
|
|
|
|
def main():
|
|
"""Main interaction loop with conversation history."""
|
|
print_welcome()
|
|
print_examples()
|
|
|
|
# Create the supervisor system
|
|
print("\n🚀 Initializing the multi-agent system...")
|
|
try:
|
|
app = create_sysadmin_supervisor()
|
|
print("✅ System ready!\n")
|
|
except Exception as e:
|
|
print(f"❌ Failed to initialize system: {str(e)}")
|
|
sys.exit(1)
|
|
|
|
# Initialize conversation history
|
|
conversation_history = []
|
|
|
|
# Interactive loop with proper cleanup
|
|
print("💬 Enter your queries below (type 'exit' to quit, 'help' for examples):\n")
|
|
|
|
try:
|
|
while True:
|
|
try:
|
|
query = input("You: ").strip()
|
|
|
|
if not query:
|
|
continue
|
|
|
|
if query.lower() in ['exit', 'quit', 'q']:
|
|
print("\n👋 Thanks for using Pard0x! Stay curious and keep debugging!")
|
|
break
|
|
|
|
if query.lower() in ['help', 'h', '?']:
|
|
print_examples()
|
|
continue
|
|
|
|
if query.lower() in ['history', 'show history']:
|
|
print("\n📜 Conversation History:")
|
|
print("-" * 40)
|
|
for i, msg in enumerate(conversation_history):
|
|
role = "You" if msg["role"] == "user" else "Assistant"
|
|
print(f"{i+1}. {role}: {msg['content'][:100]}{'...' if len(msg['content']) > 100 else ''}")
|
|
print("-" * 40)
|
|
continue
|
|
|
|
process_query(app, query, conversation_history)
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n\n👋 Goodbye! Keep those systems running smoothly!")
|
|
break
|
|
except Exception as e:
|
|
print(f"\n❌ Unexpected error: {str(e)}")
|
|
print("Please try again with a different query.")
|
|
finally:
|
|
# Clean up SSH connections on exit
|
|
try:
|
|
from custom_tools import ssh_manager
|
|
ssh_manager.close_all()
|
|
print("\n🔌 SSH connections closed.")
|
|
except Exception as e:
|
|
print(f"Warning: Error closing SSH connections: {e}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|