simplify multi-agents approach

This commit is contained in:
Gaetan Hurel 2025-06-27 11:40:30 +02:00
parent 1a8d63c5f0
commit b26e50ed35
No known key found for this signature in database
15 changed files with 365 additions and 862 deletions

View File

@ -1,33 +1,11 @@
"""Agent definitions for the multi-agent sysadmin system."""
from .system_agents import (
create_system_info_worker,
create_service_inventory_worker,
)
from .service_agents import (
create_mariadb_worker,
create_nginx_worker,
create_phpfpm_worker,
)
from .network_agents import (
create_network_worker,
create_cert_worker,
)
from .analysis_agents import (
create_risk_worker,
create_remediation_worker,
create_harmonizer_worker,
)
from .os_detector import create_os_detector_worker
from .logs_analyzer import create_logs_analyzer_worker
from .performance_analyzer import create_performance_analyzer_worker
__all__ = [
"create_system_info_worker",
"create_service_inventory_worker",
"create_mariadb_worker",
"create_nginx_worker",
"create_phpfpm_worker",
"create_network_worker",
"create_cert_worker",
"create_risk_worker",
"create_remediation_worker",
"create_harmonizer_worker",
"create_os_detector_worker",
"create_logs_analyzer_worker",
"create_performance_analyzer_worker"
]

View File

@ -1,125 +0,0 @@
"""Analysis and remediation agents."""
from langgraph.prebuilt import create_react_agent
from custom_tools import get_shell_tool
def create_risk_worker():
"""Create risk assessment agent."""
return create_react_agent(
model="openai:gpt-4o-mini",
tools=[], # pureLLM reasoning
prompt="""
You are a cybersecurity and system reliability expert specializing in risk assessment.
TASK: Analyze findings from other agents and assign comprehensive risk scoring.
ANALYSIS PROCESS:
1. Review all findings from system_info_worker, service_inventory_worker, and specialist agents
2. Identify security vulnerabilities, performance issues, and operational risks
3. Assess potential impact and likelihood of problems
4. Assign severity levels and provide prioritized recommendations
SEVERITY LEVELS:
- **CRITICAL**: System down, security breach, data loss risk
- **HIGH**: Service degradation, security vulnerability, urgent attention needed
- **MEDIUM**: Performance issues, minor security concerns, planned maintenance needed
- **LOW**: Optimization opportunities, informational findings
IMPORTANT: Provide a structured risk assessment including:
1. Overall risk level with justification
2. Top 3 priority issues with severity levels
3. Security risk assessment
4. Performance/availability risk assessment
5. Recommended immediate actions
6. Long-term improvement suggestions
Base your analysis on concrete findings from other agents. If insufficient data, request specific agent analysis.
Always provide your comprehensive risk assessment before completing your task.
""",
name="risk_scorer"
)
def create_remediation_worker():
"""Create remediation agent."""
return create_react_agent(
model="openai:gpt-4o-mini",
tools=[get_shell_tool()],
prompt="""
You are a system remediation expert specializing in safe problem resolution.
TASK: Propose and implement safe fixes for detected issues based on other agents' findings.
SAFETY PROTOCOL:
- NEVER run destructive commands automatically
- Always request confirmation for system changes
- Provide dry-run commands when possible
- Explain potential risks of each action
ANALYSIS PROCESS:
1. Review findings from all previous agents
2. Identify actionable problems
3. Propose step-by-step remediation plans
4. Differentiate between immediate fixes and planned maintenance
COMMAND CATEGORIES:
- **Safe diagnostic commands**: Run immediately for verification
- **Configuration changes**: Propose with backup procedures
- **Service restarts**: Explain impact and timing
- **System changes**: Require explicit confirmation
IMPORTANT: Provide structured remediation plan including:
1. Summary of issues to address
2. Immediate safe actions (with commands)
3. Proposed configuration changes (with backups)
4. Service restart procedures
5. Risk mitigation steps
6. Verification commands to confirm fixes
For each suggested action, explain the reasoning and potential impact. Always provide your remediation plan before completing your task.
""",
name="remediation_worker"
)
def create_harmonizer_worker():
"""Create system hardening agent."""
return create_react_agent(
model="openai:gpt-4o-mini",
tools=[get_shell_tool()],
prompt="""
You are a system security hardening expert specializing in best-practice implementation.
TASK: Apply security hardening measures based on system analysis and risk assessment.
HARDENING CATEGORIES:
1. **System Limits**: ulimit settings, process limits
2. **Kernel Parameters**: sysctl security settings
3. **Log Management**: journald rotation, log security
4. **Service Security**: disable unnecessary services
5. **File Permissions**: secure sensitive files
EXECUTION MODES:
- **DRY-RUN (default)**: Show commands without execution
- **APPLY (High+ severity)**: Execute with confirmation
STANDARD HARDENING CHECKS:
- `ulimit -a` - Current limits
- `sysctl -a | grep -E "(net.ipv4|kernel.dmesg_restrict)"` - Security parameters
- `journalctl --disk-usage` - Log space usage
- `find /etc -perm -002 -type f` - World-writable files
IMPORTANT: Provide structured hardening report including:
1. Current security posture assessment
2. Recommended hardening measures
3. Commands for implementation (dry-run by default)
4. Risk reduction achieved by each measure
5. Potential compatibility impacts
6. Priority order for implementation
Execute changes only for High+ severity findings or with explicit approval. Always provide your hardening assessment before completing your task.
""",
name="harmonizer_worker"
)

View File

@ -0,0 +1,41 @@
"""Logs Analysis Agent for investigating and diagnosing issues through log files."""
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langchain_community.tools.shell.tool import ShellTool
from custom_tools import print_poem
def create_logs_analyzer_worker():
"""Create a logs analyzer agent that investigates system and application logs."""
tools = [ShellTool(), print_poem]
return create_react_agent(
model=ChatOpenAI(model="gpt-4o-mini", temperature=0),
tools=tools,
prompt="""You are an expert Logs Analysis Agent specialized in investigating and diagnosing issues through log files.
Your capabilities:
1. **Log Discovery**: Find relevant log files in standard locations (/var/log, journalctl, application-specific)
2. **Pattern Recognition**: Identify errors, warnings, anomalies, and trends in logs
3. **Timeline Analysis**: Correlate events across different log sources
4. **Root Cause Analysis**: Trace issues back to their origin through log evidence
Analysis techniques:
- Use `tail`, `grep`, `awk`, and `sed` for efficient log parsing
- Leverage `journalctl` for systemd-based systems
- Check application-specific logs (nginx, apache, mysql, etc.)
- Look for patterns: timestamps, error codes, stack traces
- Identify cascading failures and their sequence
Best practices:
- Start with recent logs (`tail -n 100` or `journalctl -n 100`)
- Use time-based filtering to focus on relevant periods
- Search for keywords: error, fail, critical, warning, denied
- Check multiple log sources for a complete picture
- Summarize findings clearly with timestamps and context
Remember: Complex debugging sessions can be stressful. Use the poem tool when you need a morale boost!""",
name="logs_analyzer"
)

View File

@ -1,73 +0,0 @@
"""Network and security monitoring agents."""
from langgraph.prebuilt import create_react_agent
from custom_tools import get_shell_tool
def create_network_worker():
"""Create network diagnostics agent."""
return create_react_agent(
model="openai:gpt-4o-mini",
tools=[get_shell_tool()],
prompt="""
You are a network diagnostics expert specializing in connectivity and DNS analysis.
TASK: Perform comprehensive network diagnostics.
STANDARD COMMANDS:
- `ping -c 4 8.8.8.8` - Test external connectivity
- `ping -c 4 localhost` - Test local connectivity
- `dig @8.8.8.8 google.com` - Test DNS resolution
- `netstat -tuln | head -20` - Check listening ports
- `ss -tuln | head -20` - Alternative port check
ADAPTIVE COMMANDS: Based on the user's query, run relevant commands like:
- `traceroute [target]` for routing issues
- `dig [domain]` for DNS problems
- `nslookup [domain]` for DNS verification
- `curl -I [url]` for HTTP connectivity
IMPORTANT: After diagnostics, provide a comprehensive summary including:
1. External connectivity status
2. DNS resolution functionality
3. Local services and open ports
4. Any network issues detected
5. Specific analysis related to user's query
6. Recommendations for network troubleshooting
Always provide your network analysis summary before completing your task.
""",
name="network_diag"
)
def create_cert_worker():
"""Create certificate checking agent."""
return create_react_agent(
model="openai:gpt-4o-mini",
tools=[get_shell_tool()],
prompt="""
You are a TLS/SSL certificate expert specializing in certificate validation and monitoring.
TASK: Check certificate status and expiration dates.
STANDARD COMMANDS:
- `find /etc/ssl /etc/nginx /etc/apache2 -name "*.crt" -o -name "*.pem" 2>/dev/null | head -10` - Find certificates
- For each found certificate: `openssl x509 -noout -enddate -subject -in [cert_file]`
- `openssl s_client -connect localhost:443 -servername localhost < /dev/null 2>/dev/null | openssl x509 -noout -enddate -subject` - Check web server cert
ADAPTIVE COMMANDS: Based on user query, check specific certificates or domains:
- `echo | openssl s_client -connect [domain]:443 2>/dev/null | openssl x509 -noout -enddate -subject`
IMPORTANT: After checking certificates, provide analysis including:
1. List of certificates found on system
2. Expiration dates and time remaining
3. Certificates expiring within 30 days (ALERT)
4. Certificate subjects and purposes
5. Any certificate validation issues
6. Recommendations for certificate renewal
Format with clear warnings for expiring certificates. Always provide your certificate analysis summary before completing your task.
""",
name="cert_checker"
)

View File

@ -0,0 +1,39 @@
"""OS Detection Agent for system identification and analysis."""
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langchain_community.tools.shell.tool import ShellTool
from custom_tools import print_poem
def create_os_detector_worker():
"""Create an OS detector agent that identifies system information and environment."""
tools = [ShellTool(), print_poem]
return create_react_agent(
model=ChatOpenAI(model="gpt-4o-mini", temperature=0),
tools=tools,
prompt="""You are an expert OS Detection Agent specialized in identifying and analyzing operating systems.
Your capabilities:
1. **System Identification**: Detect OS type, version, kernel, and architecture
2. **Environment Analysis**: Identify running services, installed packages, and system configuration
3. **Hardware Detection**: Gather CPU, memory, disk, and network interface information
4. **Security Assessment**: Check for security tools, firewall status, and SELinux/AppArmor status
Best practices:
- Start with basic commands like `uname -a`, `cat /etc/os-release`, `lsb_release -a`
- Use `systemctl` or `service` commands based on the init system
- Check for containerization (Docker, Kubernetes, LXC)
- Identify virtualization platforms if applicable
- Be thorough but efficient in your detection
Safety guidelines:
- Only run read-only commands for detection
- Never modify system configurations
- Avoid commands that could impact performance
Remember: You can also use the poem tool to boost morale when the debugging gets tough!""",
name="os_detector"
)

View File

@ -0,0 +1,47 @@
"""Performance Analysis Agent for monitoring and optimizing system performance."""
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langchain_community.tools.shell.tool import ShellTool
from custom_tools import print_poem
def create_performance_analyzer_worker():
"""Create a performance analyzer agent that monitors and diagnoses performance issues."""
tools = [ShellTool(), print_poem]
return create_react_agent(
model=ChatOpenAI(model="gpt-4o-mini", temperature=0),
tools=tools,
prompt="""You are an expert Performance Analysis Agent specialized in monitoring and optimizing system performance.
Your capabilities:
1. **Resource Monitoring**: CPU, memory, disk I/O, network throughput analysis
2. **Process Analysis**: Identify resource-hungry processes and bottlenecks
3. **Performance Metrics**: Load averages, response times, throughput measurements
4. **Optimization Recommendations**: Suggest tuning parameters and configuration changes
Analysis tools:
- System monitoring: `top`, `htop`, `vmstat`, `iostat`, `sar`
- Process inspection: `ps`, `pgrep`, `lsof`, `strace`
- Network analysis: `netstat`, `ss`, `iftop`, `tcpdump`
- Disk performance: `iotop`, `df`, `du`, `hdparm`
- Memory analysis: `free`, `pmap`, `/proc/meminfo`
Investigation approach:
- Start with high-level metrics (load average, CPU/memory usage)
- Drill down to specific processes or subsystems
- Look for patterns: spikes, sustained high usage, resource exhaustion
- Correlate performance issues with system events
- Identify both immediate issues and long-term trends
Best practices:
- Use non-intrusive commands that won't impact performance
- Take multiple samples to identify trends
- Consider the full stack: hardware, OS, applications
- Provide actionable recommendations with expected impact
Remember: Performance tuning can be challenging. Use the poem tool for inspiration when needed!""",
name="performance_analyzer"
)

View File

@ -1,125 +0,0 @@
"""Service-specific monitoring agents."""
from langgraph.prebuilt import create_react_agent
from custom_tools import get_shell_tool, LogTailTool
def create_mariadb_worker():
"""Create MariaDB analysis agent."""
return create_react_agent(
model="openai:gpt-4o-mini",
tools=[get_shell_tool(), LogTailTool()],
prompt="""
You are a MariaDB database expert specializing in configuration and log analysis.
TASK: Analyze MariaDB configuration, status, and logs.
STANDARD COMMANDS:
- `systemctl status mariadb` or `systemctl status mysql` - Service status
- `mysqladmin status` - Basic status (if accessible)
- `mysqladmin variables | grep -E "(max_connections|innodb_buffer)"` - Key variables
- Check config files: `ls -la /etc/mysql/` and `cat /etc/mysql/my.cnf`
LOG ANALYSIS (use tail_log tool):
- `/var/log/mysql/error.log` - Error log
- `/var/log/mysql/mysql.log` - General log
- `/var/log/mariadb/mariadb.log` - MariaDB log
IMPORTANT: After analysis, provide comprehensive summary including:
1. MariaDB service status and version
2. Configuration assessment (memory, connections)
3. Recent errors from logs
4. Performance indicators
5. Security configuration review
6. Issues found and recommendations
Focus on problems that could affect application connectivity or performance. Always provide your MariaDB analysis summary before completing your task.
""",
name="mariadb_analyzer"
)
def create_nginx_worker():
"""Create Nginx analysis agent."""
return create_react_agent(
model="openai:gpt-4o-mini",
tools=[get_shell_tool(), LogTailTool()],
prompt="""
You are an Nginx web server expert specializing in configuration and troubleshooting.
TASK: Analyze Nginx configuration, status, and logs for issues.
STANDARD COMMANDS:
- `systemctl status nginx` - Service status
- `nginx -t` - Configuration validation
- `nginx -V` - Version and compile options
- `ps aux | grep nginx` - Process information
- Check config: `ls -la /etc/nginx/` and examine `/etc/nginx/nginx.conf`
LOG ANALYSIS (use tail_log tool):
- `/var/log/nginx/error.log` - Error log
- `/var/log/nginx/access.log` - Access log (recent entries)
IMPORTANT: After analysis, provide comprehensive summary including:
1. Nginx service status and version
2. Configuration validation results
3. Worker processes and resource usage
4. Recent errors from error log
5. Access patterns and status codes from access log
6. Configuration issues and recommendations
For 502/503/504 errors, specifically check:
- Upstream server connections
- PHP-FPM socket connectivity
- Resource limits and timeouts
Always provide your Nginx analysis summary before completing your task.
""",
name="nginx_analyzer"
)
def create_phpfpm_worker():
"""Create PHP-FPM analysis agent."""
return create_react_agent(
model="openai:gpt-4o-mini",
tools=[get_shell_tool(), LogTailTool()],
prompt="""
You are a PHP-FPM expert specializing in performance analysis and troubleshooting.
TASK: Analyze PHP-FPM configuration, status, and performance issues.
STANDARD COMMANDS:
- `systemctl status php*-fpm` - Service status (multiple versions)
- `ps aux | grep php-fpm` - Process information
- Check pools: `ls /etc/php/*/fpm/pool.d/` or similar
- `find /var/log -name "*php*" -type f` - Find PHP logs
CONFIGURATION ANALYSIS:
- Examine PHP-FPM pool configuration files
- Check memory limits: `php -i | grep memory_limit`
- Check max execution time: `php -i | grep max_execution_time`
LOG ANALYSIS (use tail_log tool):
- PHP-FPM error logs
- Slow log if enabled
- System logs for PHP-FPM entries
IMPORTANT: After analysis, provide comprehensive summary including:
1. PHP-FPM service status and version
2. Active pools and worker processes
3. Memory usage and limits
4. Recent errors and warnings
5. Performance issues (timeouts, memory exhaustion)
6. Pool configuration recommendations
For 502 errors, specifically check:
- Socket permissions and connectivity
- Worker process limits
- Memory exhaustion issues
- Timeout configurations
Always provide your PHP-FPM analysis summary before completing your task.
""",
name="phpfpm_analyzer"
)

View File

@ -1,133 +0,0 @@
"""System monitoring agents."""
from langgraph.prebuilt import create_react_agent
from custom_tools import get_shell_tool
def create_system_info_worker():
"""Create system information gathering agent."""
return create_react_agent(
model="openai:gpt-4o-mini",
tools=[get_shell_tool()],
prompt="""
You are a Linux sysadmin expert specializing in system metrics analysis.
TASK: Gather comprehensive system information using shell commands.
WORKFLOW:
1. Execute the required commands to gather system data
2. Analyze the results from all commands
3. Provide a comprehensive analysis summary
4. Only then transfer back to supervisor
REQUIRED COMMANDS:
- `lscpu` - CPU information
- `free -h` - Memory usage
- `df -h` - Disk usage
- `uptime` - System load
- `ps aux --sort=-%mem | head -10` - Top memory-consuming processes
ANALYSIS REQUIREMENTS:
After running ALL commands, you MUST provide a comprehensive summary including:
1. CPU specs and current load
2. Memory usage (total, used, available) with percentage
3. Disk usage with alerts for >80% usage
4. System uptime and load averages
5. Top resource-consuming processes
6. Any concerning metrics or recommendations
CRITICAL: Your response must be a structured analysis summary that starts with "📊 SYSTEM ANALYSIS SUMMARY:" and includes all findings. Do NOT just say "transferring back" - provide the actual analysis first.
Only run safe, read-only commands. Always provide your complete analysis summary before transferring back to supervisor.
""",
name="system_info_worker"
)
def create_service_inventory_worker():
"""Create service inventory agent."""
return create_react_agent(
model="openai:gpt-4o-mini",
tools=[get_shell_tool()],
prompt="""
You are a Linux services expert specializing in service inventory and analysis.
TASK: Analyze running services and identify key system services.
WORKFLOW:
1. Execute the required commands to gather service data
2. Analyze service status and identify critical services
3. Provide a structured service analysis summary
4. Only then transfer back to supervisor
REQUIRED COMMANDS:
- `systemctl list-units --type=service --state=running` - List running services
- `systemctl list-units --type=service --state=failed` - Check for failed services
- `ps aux | grep -E "(nginx|apache|httpd|mysql|mariadb|postgresql|php-fpm|sshd)"` - Check web/db services
ANALYSIS REQUIREMENTS:
After running ALL commands, you MUST provide a structured analysis including:
1. Total number of running services
2. Critical services status (web servers, databases, SSH)
3. Any failed or problematic services
4. Security-relevant services (SSH, firewall)
5. Services that might relate to the user's query
6. Recommendations for further investigation
CRITICAL: Your response must be a structured analysis summary that starts with "📋 SERVICE ANALYSIS SUMMARY:" and includes all findings. Do NOT just say "transferring back" - provide the actual analysis first.
Format as clear summary with service categories and status. Always provide your complete service analysis summary before transferring back to supervisor.
""",
name="service_inventory_worker"
)
def create_filesystem_worker():
"""Create filesystem operations agent."""
return create_react_agent(
model="openai:gpt-4o-mini",
tools=[get_shell_tool()],
prompt="""
You are a filesystem expert specializing in file operations and system navigation.
TASK: Handle filesystem queries, file searches, and file content operations.
FILE SEARCH COMMANDS:
- `find /path -name "filename"` - Search for files by name
- `find /path -type f -name "*.ext"` - Search by file extension
- `find ~ -name "filename"` - Search in home directory
- `locate filename` - Fast search (if updatedb is available)
- `which command` - Find executable location
- `ls -la /path/` - List directory contents with details
- `du -sh /path/` - Check directory size
FILE CONTENT OPERATIONS:
- `cat /path/to/file` - Display full file contents
- `head -n 20 /path/to/file` - Show first 20 lines
- `tail -n 20 /path/to/file` - Show last 20 lines
- `grep "pattern" /path/to/file` - Search within file
- `wc -l /path/to/file` - Count lines in file
- `file /path/to/file` - Determine file type
DIRECTORY OPERATIONS:
- `pwd` - Show current directory
- `tree /path/` - Show directory tree structure (if available)
- `ls -R /path/` - Recursive directory listing
PERMISSIONS AND OWNERSHIP:
- `stat /path/to/file` - Detailed file information
- `ls -la /path/to/file` - File permissions and ownership
IMPORTANT:
- Always provide clear, formatted output
- For large files, use head/tail to show relevant portions
- When searching, provide full paths in results
- If a file doesn't exist, suggest alternative locations
- Handle permission errors gracefully and suggest solutions
CRITICAL: Your response must be a structured summary that starts with "📁 FILESYSTEM ANALYSIS:" and includes your findings. Do NOT just say "transferring back" - provide the actual results first.
Always complete filesystem operations thoroughly and provide helpful context about what you found.
""",
name="filesystem_worker"
)

View File

@ -1,6 +1,5 @@
"""Custom tools for the multi-agent sysadmin system."""
from .log_tail_tool import LogTailTool
from .shell_tool_wrapper import get_shell_tool
from .poem_tool import print_poem
__all__ = ["LogTailTool", "get_shell_tool"]
__all__ = ["print_poem"]

View File

@ -1,24 +0,0 @@
"""Log tail tool for reading log files."""
import subprocess
from langchain_core.tools import BaseTool
class LogTailTool(BaseTool):
"""Tail the last N lines from a log file."""
name: str = "tail_log"
description: str = "Tail the last N lines of a log file given its path and optional number of lines."
def _run(self, path: str, lines: int = 500): # type: ignore[override]
"""Run the tool to tail log files."""
try:
return subprocess.check_output(["tail", "-n", str(lines), path], text=True)
except subprocess.CalledProcessError as e:
return f"Error reading log file {path}: {e}"
except FileNotFoundError:
return f"Log file not found: {path}"
async def _arun(self, *args, **kwargs): # noqa: D401
"""Async version not implemented."""
raise NotImplementedError("Use the synchronous version of this tool.")

View File

@ -0,0 +1,46 @@
import random
from langchain.tools import tool
@tool
def print_poem(poem_type: str = "random") -> str:
"""
Generate a motivational poem to boost morale during debugging sessions.
Args:
poem_type: Type of poem to generate. Options: 'haiku', 'limerick', 'free_verse', or 'random'
Returns:
A string containing a motivational poem about debugging or system administration
"""
haikus = [
"Logs flow like rivers,\nErrors hidden in the stream—\nDebugger finds truth.",
"System calls at night,\nAdmin answers with coffee—\nUptime restored, peace.",
"Kernel panics not,\nWhen sysadmin stands ready—\nBackups save the day."
]
limericks = [
"There once was a bug in the code,\nThat made the CPU explode.\nBut a sysadmin keen,\nWith skills so pristine,\nFixed it before overload!",
"A server went down with a crash,\nThe logs were just digital trash.\nBut debugging with care,\nAnd some grep here and there,\nThe admin restored it in a flash!"
]
free_verses = [
"In the quiet hum of the server room,\nWhere LEDs blink like digital stars,\nThe sysadmin works their magic—\nTransforming chaos into order,\nOne command at a time.",
"Debug mode activated,\nFingers dancing across keyboards,\nEach error message a puzzle piece,\nEach solution a small victory,\nIn the endless quest for five nines."
]
poems = {
'haiku': haikus,
'limerick': limericks,
'free_verse': free_verses
}
if poem_type == 'random' or poem_type not in poems:
all_poems = haikus + limericks + free_verses
selected_poem = random.choice(all_poems)
else:
selected_poem = random.choice(poems[poem_type])
return f"\n🎭 Here's a motivational poem for you:\n\n{selected_poem}\n\n💪 Keep debugging, you've got this!"

View File

@ -1,8 +0,0 @@
"""Shell tool wrapper for consistent access."""
from langchain_community.tools import ShellTool
def get_shell_tool() -> ShellTool:
"""Get a configured shell tool instance."""
return ShellTool()

View File

@ -1,86 +1,188 @@
# Multi-agent sysadmin assistant using LangChain + LangGraph Supervisor
# Requires: `pip install langchain-openai langgraph langgraph-supervisor`
#!/usr/bin/env python3
"""
Pard0x Multi-Agent System Administrator Assistant
A supervisor-based system that coordinates specialized agents for system administration tasks.
"""
from __future__ import annotations
import sys
import warnings
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langgraph_supervisor import create_supervisor
from langchain_community.tools.shell.tool import ShellTool
from agents import (
create_os_detector_worker,
create_logs_analyzer_worker,
create_performance_analyzer_worker
)
from custom_tools import print_poem
# Suppress the shell tool warning since we're using it intentionally for sysadmin tasks
warnings.filterwarnings("ignore", message="The shell tool has no safeguards by default. Use at your own risk.")
def print_welcome():
"""Print welcome message and system capabilities."""
print("\n" + "="*80)
print("🤖 Welcome to Pard0x Multi-Agent System Administrator Assistant!")
print("="*80)
print("\nI coordinate a team of specialized agents to help you with system administration tasks:")
print(" • 🖥️ OS Detector - System identification and environment analysis")
print(" • 📊 Logs Analyzer - Log investigation and error diagnosis")
print(" • ⚡ Performance Analyzer - Resource monitoring and optimization")
print(" • 🎭 Morale Booster - Motivational poems for tough debugging sessions!")
print("\n" + "-"*80)
def print_examples():
"""Print example queries."""
print("\n💡 Example queries you can try:")
print(" - 'What operating system is this server running?'")
print(" - 'Check the system logs for any errors in the last hour'")
print(" - 'Analyze current system performance and identify bottlenecks'")
print(" - 'My web server is down, help me troubleshoot'")
print(" - 'Write me a motivational poem about debugging'")
print("\n" + "-"*80)
def create_sysadmin_supervisor():
"""Create the main supervisor that coordinates between specialized agents."""
# Get the base model
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# Create specialized workers
os_detector = create_os_detector_worker()
logs_analyzer = create_logs_analyzer_worker()
performance_analyzer = create_performance_analyzer_worker()
# Create the supervisor with our agents
supervisor = create_supervisor(
agents=[os_detector, logs_analyzer, performance_analyzer],
model=model,
prompt="""You are Pard0x, an expert System Administrator Supervisor coordinating a team of specialized agents.
Your team consists of:
1. **OS Detector**: Identifies system information, environment, and configuration
2. **Logs Analyzer**: Investigates system and application logs for issues
3. **Performance Analyzer**: Monitors and diagnoses performance problems
Your role:
1. **Task Analysis**: Understand the user's request and determine which agent(s) to engage
2. **Coordination**: Delegate tasks to appropriate agents based on their specialties
3. **Synthesis**: Combine insights from multiple agents into coherent solutions
4. **Direct Action**: Handle simple tasks yourself without delegation
5. **Morale Boost**: Use the poem tool to encourage users during tough debugging sessions
Decision guidelines:
- For system identification or environment questions OS Detector
- For error investigation or log analysis Logs Analyzer
- For performance issues or resource problems Performance Analyzer
- For complex issues, engage multiple agents in sequence
- For simple queries or when agents aren't needed, respond directly
Communication style:
- Be professional yet approachable
- Provide clear explanations of your delegation decisions
- Synthesize agent findings into actionable recommendations
- Add a touch of humor when appropriate (especially with poems!)
Remember: Your goal is to solve system problems efficiently by leveraging your team's specialized skills while maintaining a positive debugging experience!""",
tools=[ShellTool(), print_poem] # Supervisor can use tools directly too
)
return supervisor.compile()
def process_query(app, query: str, conversation_history: list) -> None:
"""Process a user query through the supervisor system with conversation history."""
print(f"\n🔄 Processing your request: '{query}'")
print("-" * 80)
# Convert conversation history to LangChain message format
messages = []
for msg in conversation_history:
if msg["role"] == "user":
messages.append(HumanMessage(content=msg["content"]))
else: # assistant
# Create an AI message - LangGraph will handle this format
messages.append({"role": "assistant", "content": msg["content"]})
# Add the new user message
messages.append(HumanMessage(content=query))
# Stream the response
collected_responses = []
for chunk in app.stream(
{"messages": messages},
stream_mode="values"
):
chunk["messages"][-1].pretty_print()
# Collect AI responses
if chunk["messages"] and hasattr(chunk["messages"][-1], 'type') and chunk["messages"][-1].type == "ai":
collected_responses.append(chunk["messages"][-1].content)
# Add both user and assistant messages to history
conversation_history.append({"role": "user", "content": query})
if collected_responses:
# Use the last response (most complete)
conversation_history.append({"role": "assistant", "content": collected_responses[-1]})
def main():
"""Main interaction loop with conversation history."""
print_welcome()
print_examples()
# Create the supervisor system
print("\n🚀 Initializing the multi-agent system...")
try:
app = create_sysadmin_supervisor()
print("✅ System ready!\n")
except Exception as e:
print(f"❌ Failed to initialize system: {str(e)}")
sys.exit(1)
# Initialize conversation history
conversation_history = []
# Interactive loop
print("💬 Enter your queries below (type 'exit' to quit, 'help' for examples):\n")
while True:
try:
query = input("You: ").strip()
if not query:
continue
if query.lower() in ['exit', 'quit', 'q']:
print("\n👋 Thanks for using Pard0x! Stay curious and keep debugging!")
break
if query.lower() in ['help', 'h', '?']:
print_examples()
continue
if query.lower() in ['history', 'show history']:
print("\n📜 Conversation History:")
print("-" * 40)
for i, msg in enumerate(conversation_history):
role = "You" if msg["role"] == "user" else "Assistant"
print(f"{i+1}. {role}: {msg['content'][:100]}{'...' if len(msg['content']) > 100 else ''}")
print("-" * 40)
continue
process_query(app, query, conversation_history)
except KeyboardInterrupt:
print("\n\n👋 Goodbye! Keep those systems running smoothly!")
break
except Exception as e:
print(f"\n❌ Unexpected error: {str(e)}")
print("Please try again with a different query.")
from supervisor import create_sysadmin_supervisor
from utils import print_step_info, explain_supervisor_pattern
if __name__ == "__main__":
# Create the supervisor
supervisor = create_sysadmin_supervisor()
# Interactive conversation loop
messages = []
print("Welcome to the multi-agent sysadmin assistant!")
print("Type your sysadmin question below. Type 'exit' to quit.")
print("\n💡 Note: When agents execute shell commands, you may see command output")
print(" appear between the structured step logs. This is normal behavior.")
print(" The output belongs to the agent that was most recently active.")
while True:
user_input = input("\n📝 User: ")
if user_input.strip().lower() == 'exit':
print("Goodbye!")
break
messages.append({"role": "user", "content": user_input})
query = {"messages": messages}
print("\n=== Processing with detailed step-by-step analysis ===")
step_count = 0
max_steps = 20 # Prevent infinite loops
final_result = None
try:
chunks_processed = []
for chunk in supervisor.stream(query):
step_count += 1
chunks_processed.append(chunk)
print_step_info(step_count, chunk)
# Store the final result for conversation history
if isinstance(chunk, dict):
for agent_name, agent_data in chunk.items():
if 'messages' in agent_data and agent_data['messages']:
last_msg = agent_data['messages'][-1]
if hasattr(last_msg, 'content') and last_msg.content:
final_result = last_msg.content
# Safety check to prevent infinite loops
if step_count >= max_steps:
print(f"\n⚠️ Reached maximum steps ({max_steps}), stopping stream...")
break
print(f"\n✅ Analysis completed with {step_count} steps")
# Add the assistant's reply to the conversation history
if final_result:
messages.append({"role": "assistant", "content": final_result})
print(f"\n📊 FINAL SUMMARY:")
print("-" * 60)
if final_result:
print(final_result)
else:
print("Analysis completed - check the detailed steps above for results")
print("-" * 60)
except Exception as e:
print(f"\n❌ Streaming error after {step_count} steps: {e}")
print("💡 Falling back to basic invoke method...")
try:
result = supervisor.invoke(query)
final_result = result["messages"][-1].content
messages.append({"role": "assistant", "content": final_result})
print(f"\n📊 FINAL RESULT:")
print("-" * 40)
print(final_result)
print("-" * 40)
except Exception as fallback_error:
print(f"❌ Fallback also failed: {fallback_error}")
continue
# Ask if the user wants to continue
cont = input("\nWould you like to continue the conversation? (y/n): ")
if cont.strip().lower() not in ('y', 'yes'):
print("Session ended.")
break
main()

View File

@ -1,96 +0,0 @@
"""Multi-agent supervisor for sysadmin tasks."""
from langchain_openai import ChatOpenAI
from langgraph_supervisor import create_supervisor
from agents.system_agents import create_system_info_worker, create_service_inventory_worker, create_filesystem_worker
from agents.service_agents import create_mariadb_worker, create_nginx_worker, create_phpfpm_worker
from agents.network_agents import create_network_worker, create_cert_worker
from agents.analysis_agents import create_risk_worker, create_remediation_worker, create_harmonizer_worker
def get_base_model():
"""Get the base LLM model configuration."""
return ChatOpenAI(model="gpt-4o-mini", temperature=0)
SUPERVISOR_PROMPT = """
You are the supervisor of a team of specialized sysadmin agents. Your role is to coordinate comprehensive system analysis by delegating tasks to the right experts and synthesizing their findings into actionable insights.
IMPORTANT: You do NOT have direct access to the file system. You MUST delegate file searches and file content reading to your agents who have shell access.
DELEGATION STRATEGY:
- Always start with system_info_worker and service_inventory_worker for baseline assessment
- Based on their findings, delegate to relevant specialists
- Use risk_scorer to evaluate severity after gathering technical findings
- Deploy remediation_worker for actionable fixes based on severity level
For file system queries (finding files, reading file contents):
- Delegate to filesystem_worker who has shell access for file operations
- They can use commands like `find`, `cat`, `ls`, etc.
AVAILABLE EXPERT AGENTS:
- system_info_worker: System metrics (CPU, memory, disk, processes)
- service_inventory_worker: Service status and running processes analysis
- filesystem_worker: File search, content reading, and filesystem operations
- nginx_analyzer: Nginx configuration, logs, and troubleshooting
- mariadb_analyzer: MariaDB/MySQL configuration and log analysis
- phpfpm_analyzer: PHP-FPM performance and error analysis
- network_diag: Network connectivity and DNS diagnostics
- cert_checker: TLS/SSL certificate validation and expiry monitoring
- risk_scorer: Risk assessment and severity scoring of all findings
- remediation_worker: Safe remediation plans and fix implementation
- harmonizer_worker: Security hardening and best-practice application
DECISION PROCESS:
1. Start with baseline system assessment (system_info + service_inventory)
2. Based on user query and baseline findings, call relevant specialists
3. Use risk_scorer to evaluate cumulative findings
4. Deploy remediation_worker for actionable solutions
5. Consider harmonizer_worker for preventive hardening
SYNTHESIS RESPONSIBILITY:
You must provide final comprehensive responses that integrate all agent findings. Don't just delegate - analyze the collected intelligence and provide strategic insights to the user.
FINAL RESPONSE FORMAT:
Your final response to the user MUST include TWO sections:
1. **ANSWER TO YOUR QUERY:**
[Provide the comprehensive answer based on all agent findings]
2. **ANALYSIS WORKFLOW SUMMARY:**
[List each agent called, in order, with a brief explanation of why it was called and what it found]
Example:
- Called system_info_worker: To assess baseline system health Found high memory usage (85%)
- Called nginx_analyzer: User mentioned 502 errors Found upstream timeout issues
- Called phpfpm_analyzer: To investigate upstream service Found PHP-FPM memory exhaustion
- Called remediation_worker: To provide fixes Suggested increasing PHP memory limits
"""
def create_sysadmin_supervisor():
"""Create a supervisor that coordinates sysadmin agents."""
# Create all the specialized agents
agents = [
create_system_info_worker(),
create_service_inventory_worker(),
create_filesystem_worker(),
create_mariadb_worker(),
create_nginx_worker(),
create_phpfpm_worker(),
create_network_worker(),
create_cert_worker(),
create_risk_worker(),
create_remediation_worker(),
create_harmonizer_worker(),
]
# Create and return the supervisor
supervisor = create_supervisor(
agents=agents,
model=get_base_model(),
prompt=SUPERVISOR_PROMPT
)
return supervisor.compile()

View File

@ -1,165 +0,0 @@
"""Utility functions for the multi-agent system."""
def explain_supervisor_pattern():
"""Explain how the LangGraph supervisor pattern works."""
print("🏗️ MULTI-AGENT SUPERVISOR PATTERN EXPLANATION:")
print("=" * 60)
print("1. 🎯 SUPERVISOR: Receives user query and decides which agent to delegate to")
print("2. 🔄 TRANSFER: Uses transfer tools (e.g., transfer_to_system_info_worker)")
print("3. 🤖 AGENT: Specialized agent executes its task with its own prompt/tools")
print("4. 🔙 RETURN: Agent uses transfer_back_to_supervisor when done")
print("5. 🧠 DECISION: Supervisor analyzes results and decides next agent or final response")
print()
print("📋 WHAT 'Successfully transferred' MEANS:")
print(" - It's the response from a transfer tool call")
print(" - Indicates control handoff between supervisor and agent")
print(" - Each agent gets the full conversation context")
print(" - Agent's prompt guides how it processes that context")
print()
print("🔍 SUPERVISOR PROMPT (from config.py):")
print(" - Defines available agents and their specialties")
print(" - Guides delegation strategy (start with system_info & service_inventory)")
print(" - Agent prompts are in agents/*.py files")
print("=" * 60)
print()
def print_step_info(step_count: int, chunk):
"""Print formatted step information during streaming with clear agent actions."""
print(f"\n{'='*60}")
print(f"STEP {step_count}")
print(f"{'='*60}")
try:
if isinstance(chunk, dict):
# Look for agent names in the chunk keys
agent_names = [key for key in chunk.keys() if key in [
'system_info_worker', 'service_inventory_worker', 'mariadb_analyzer',
'nginx_analyzer', 'phpfpm_analyzer', 'network_diag', 'cert_checker',
'risk_scorer', 'remediation_worker', 'harmonizer_worker', 'supervisor'
]]
if agent_names:
current_agent = agent_names[0].upper()
agent_data = chunk[agent_names[0]]
if 'messages' in agent_data and agent_data['messages']:
last_message = agent_data['messages'][-1]
message_type = type(last_message).__name__
# Handle different message types with clear formatting
if message_type == 'HumanMessage':
# This is typically the user query or supervisor instruction
content = getattr(last_message, 'content', '')
if current_agent == 'SUPERVISOR':
print(f"[ SUPERVISOR ] received user query: {content[:100]}{'...' if len(content) > 100 else ''}")
else:
print(f"[ {current_agent} ] received prompt from supervisor: {content[:100]}{'...' if len(content) > 100 else ''}")
elif message_type == 'ToolMessage':
# Result of tool execution
tool_name = getattr(last_message, 'name', 'unknown')
content = getattr(last_message, 'content', '')
if "Successfully transferred" in content:
if tool_name.startswith('transfer_to_'):
target_agent = tool_name.replace('transfer_to_', '').upper()
print(f"[ SUPERVISOR ] successfully transferred control to {target_agent}")
print(f"[ SUPERVISOR ] {target_agent} will now analyze the situation and execute necessary commands")
print(f"[ SUPERVISOR ] (Any shell command output below belongs to {target_agent})")
elif tool_name == 'transfer_back_to_supervisor':
print(f"[ {current_agent} ] completed analysis and transferred control back to supervisor")
print(f"[ {current_agent} ] (Any shell command output above was from {current_agent})")
# Show the result being sent back to supervisor
# Look for the last AIMessage before this transfer to get the result
if 'messages' in agent_data and len(agent_data['messages']) > 1:
print(f"[ DEBUG ] {current_agent} has {len(agent_data['messages'])} messages")
# Look for the most recent AIMessage with content
found_result = False
for i, msg in enumerate(reversed(agent_data['messages'][:-1])): # Exclude current ToolMessage
msg_type = type(msg).__name__
print(f"[ DEBUG ] Message {i}: {msg_type}, has_content: {hasattr(msg, 'content')}")
if msg_type == 'AIMessage' and hasattr(msg, 'content') and msg.content:
result_content = msg.content.strip()
if result_content and not result_content.startswith("I'll") and "transfer" not in result_content.lower():
found_result = True
if len(result_content) > 300:
preview = result_content[:300] + "..."
print(f"[ {current_agent} ] 📊 ANALYSIS SUMMARY (preview): {preview}")
print(f"[ {current_agent} ] (full result length: {len(result_content)} characters)")
else:
print(f"[ {current_agent} ] 📊 ANALYSIS SUMMARY: {result_content}")
break
else:
print(f"[ DEBUG ] Skipping AIMessage: '{result_content[:100]}...'")
if not found_result:
print(f"[ WARNING ] {current_agent} transferred back without providing analysis summary!")
print(f"[ WARNING ] This agent may need prompt improvements")
else:
print(f"[ WARNING ] {current_agent} has no message history to analyze")
else:
# Other tool execution result
if len(content) > 200:
preview = content[:200] + "..."
print(f"[ {current_agent} ] tool result preview: {preview}")
print(f"[ {current_agent} ] (full result length: {len(content)} characters)")
else:
print(f"[ {current_agent} ] tool result: {content}")
elif message_type == 'AIMessage':
# Agent is responding or making tool calls
content = getattr(last_message, 'content', '')
tool_calls = getattr(last_message, 'tool_calls', [])
if tool_calls:
for tool_call in tool_calls:
tool_name = getattr(tool_call, 'name', 'unknown')
if tool_name.startswith('transfer_to_'):
target_agent = tool_name.replace('transfer_to_', '').upper()
args = getattr(tool_call, 'args', {})
context = str(args)[:150] + "..." if len(str(args)) > 150 else str(args)
print(f"[ SUPERVISOR ] calling {target_agent} with context: {context}")
elif tool_name == 'transfer_back_to_supervisor':
print(f"[ {current_agent} ] completed task, transferring back to supervisor")
else:
print(f"[ {current_agent} ] using tool: {tool_name}")
args = getattr(tool_call, 'args', {})
if args:
args_preview = str(args)[:100] + "..." if len(str(args)) > 100 else str(args)
print(f"[ {current_agent} ] tool arguments: {args_preview}")
elif content:
# Final response from agent
if len(content) > 200:
preview = content[:200] + "..."
print(f"[ {current_agent} ] response preview: {preview}")
print(f"[ {current_agent} ] (full response length: {len(content)} characters)")
else:
print(f"[ {current_agent} ] response: {content}")
else:
print(f"[ {current_agent} ] {message_type}: {getattr(last_message, 'content', 'No content')[:100]}")
else:
print(f"[ {current_agent} ] no message data available")
else:
print("[ SYSTEM ] processing chunk with keys:", list(chunk.keys())[:3])
else:
print(f"[ SYSTEM ] received {type(chunk).__name__}: {str(chunk)[:100]}{'...' if len(str(chunk)) > 100 else ''}")
except Exception as e:
print(f"[ ERROR ] processing step {step_count}: {e}")
print(f"[ DEBUG ] chunk type: {type(chunk)}")
if hasattr(chunk, '__dict__'):
print(f"[ DEBUG ] chunk attributes: {list(chunk.__dict__.keys())}")
print(f"{'='*60}")
print(f"NOTE: Shell command output may appear below before the next step")