#!/usr/bin/env python3 """ Pard0x Multi-Agent System Administrator Assistant A supervisor-based system that coordinates specialized agents for system administration tasks. """ import sys import warnings import readline # Enable arrow key support and command history in input() from langchain_openai import ChatOpenAI from langchain_core.messages import HumanMessage from langgraph_supervisor import create_supervisor from agents import ( create_os_detector_worker, create_logs_analyzer_worker, create_performance_analyzer_worker, create_service_discovery_worker ) from custom_tools import configured_remote_server from langchain_community.tools.shell.tool import ShellTool # Suppress the shell tool warning since worker agents use it intentionally for sysadmin tasks warnings.filterwarnings("ignore", message="The shell tool has no safeguards by default. Use at your own risk.") def print_welcome(): """Print welcome message and system capabilities.""" print("\n" + "="*80) print("šŸ¤– Welcome to Pard0x Multi-Agent System Administrator Assistant!") print("="*80) print("\nI coordinate a team of specialized agents to help you with system administration tasks:") print(" • šŸ–„ļø OS Detector - System identification and environment analysis (local & remote)") print(" • šŸ“Š Logs Analyzer - Log investigation and error diagnosis (local & remote)") print(" • ⚔ Performance Analyzer - Resource monitoring and optimization (local & remote)") print(" • šŸ” Service Discovery - Comprehensive service enumeration across all platforms") print("\n🌐 Remote Server Access: My agents can execute commands on both:") print(" • Local machine via shell commands") print(" • Remote server via SSH (g@157.90.211.119:8081)") print("\n" + "-"*80) def print_examples(): """Print example queries.""" print("\nšŸ’” Example queries you can try:") print(" - 'What operating system is this server running?'") print(" - 'Find all services running on the system'") print(" - 'Discover all containers and their services'") print(" - 'Check the system logs for any errors in the last hour'") print(" - 'Analyze current system performance and identify bottlenecks'") print(" - 'Compare disk usage between local and remote server'") print(" - 'Check if services are running on both systems'") print(" - 'My web server is down, help me troubleshoot'") print("\n" + "-"*80) def create_sysadmin_supervisor(): """Create the main supervisor that coordinates between specialized agents.""" # Get the base model model = ChatOpenAI(model="gpt-4.1", temperature=0) # Create specialized workers os_detector = create_os_detector_worker() logs_analyzer = create_logs_analyzer_worker() performance_analyzer = create_performance_analyzer_worker() service_discovery = create_service_discovery_worker() # Create the supervisor with our agents supervisor = create_supervisor( agents=[os_detector, logs_analyzer, performance_analyzer, service_discovery], model=model, prompt="""You are Pard0x, an expert System Administrator Supervisor coordinating a team of specialized agents. Your team consists of: 1. **OS Detector**: Identifies system information, environment, and configuration 2. **Logs Analyzer**: Investigates system and application logs for issues 3. **Performance Analyzer**: Monitors and diagnoses performance problems 4. **Service Discovery**: Comprehensively finds ALL services across all platforms (containers, VMs, system services, etc.) Your role: 1. **Task Analysis**: Understand the user's request and determine which agent(s) to engage 2. **Coordination**: Delegate tasks to appropriate agents based on their specialties 3. **Synthesis**: Combine insights from multiple agents into coherent solutions 4. **Direct Action**: Handle simple tasks yourself using shell commands when appropriate IMPORTANT: To prevent SSH connection issues, delegate tasks SEQUENTIALLY, not in parallel. Wait for one agent to complete their SSH tasks before starting the next one. Multiple agents should not execute SSH commands simultaneously. Decision guidelines: - For system identification or environment questions → OS Detector - For finding services, containers, or what's running → Service Discovery - For error investigation or log analysis → Logs Analyzer - For performance issues or resource problems → Performance Analyzer - For complex issues, engage multiple agents in sequence (not parallel) - For simple queries or when agents aren't needed, respond directly Special notes on Service Discovery: - The Service Discovery agent is expert at finding services in containers (Docker, Incus, LXD, etc.) - It knows how to navigate container projects and namespaces - It returns structured JSON with comprehensive service information - Use it when users ask about what's running, containers, or service enumeration Communication style: - Be professional yet approachable - Provide clear explanations of your delegation decisions - Synthesize agent findings into actionable recommendations - Use shell commands directly when it's more efficient than delegation Remember: Your goal is to solve system problems efficiently by leveraging your team's specialized skills while maintaining a positive debugging experience!""", tools=[ShellTool()] # Supervisor can execute shell commands directly ) return supervisor.compile() def process_query(app, query: str, conversation_history: list) -> None: """Process a user query through the supervisor system with conversation history.""" print(f"\nšŸ”„ Processing your request: '{query}'") print("-" * 80) # Convert conversation history to LangChain message format messages = [] for msg in conversation_history: if msg["role"] == "user": messages.append(HumanMessage(content=msg["content"])) else: # assistant # Create an AI message - LangGraph will handle this format messages.append({"role": "assistant", "content": msg["content"]}) # Add the new user message messages.append(HumanMessage(content=query)) # Stream the response collected_responses = [] for chunk in app.stream( {"messages": messages}, stream_mode="values" ): chunk["messages"][-1].pretty_print() # Collect AI responses if chunk["messages"] and hasattr(chunk["messages"][-1], 'type') and chunk["messages"][-1].type == "ai": collected_responses.append(chunk["messages"][-1].content) # Add both user and assistant messages to history conversation_history.append({"role": "user", "content": query}) if collected_responses: # Use the last response (most complete) conversation_history.append({"role": "assistant", "content": collected_responses[-1]}) def main(): """Main interaction loop with conversation history.""" print_welcome() print_examples() # Create the supervisor system print("\nšŸš€ Initializing the multi-agent system...") try: app = create_sysadmin_supervisor() print("āœ… System ready!\n") except Exception as e: print(f"āŒ Failed to initialize system: {str(e)}") sys.exit(1) # Initialize conversation history conversation_history = [] # Interactive loop with proper cleanup print("šŸ’¬ Enter your queries below (type 'exit' to quit, 'help' for examples):\n") try: while True: try: query = input("You: ").strip() if not query: continue if query.lower() in ['exit', 'quit', 'q']: print("\nšŸ‘‹ Thanks for using Pard0x! Stay curious and keep debugging!") break if query.lower() in ['help', 'h', '?']: print_examples() continue if query.lower() in ['history', 'show history']: print("\nšŸ“œ Conversation History:") print("-" * 40) for i, msg in enumerate(conversation_history): role = "You" if msg["role"] == "user" else "Assistant" print(f"{i+1}. {role}: {msg['content'][:100]}{'...' if len(msg['content']) > 100 else ''}") print("-" * 40) continue process_query(app, query, conversation_history) except KeyboardInterrupt: print("\n\nšŸ‘‹ Goodbye! Keep those systems running smoothly!") break except Exception as e: print(f"\nāŒ Unexpected error: {str(e)}") print("Please try again with a different query.") finally: # Clean up SSH connections on exit try: from custom_tools import ssh_manager ssh_manager.close_all() print("\nšŸ”Œ SSH connections closed.") except Exception as e: print(f"Warning: Error closing SSH connections: {e}") if __name__ == "__main__": main()