implement 2 strategies

This commit is contained in:
Gaetan Hurel 2025-06-26 14:52:36 +02:00
parent 90ac5e9e82
commit 331e2e434d
No known key found for this signature in database
23 changed files with 1080 additions and 747 deletions

250
README.md
View File

@ -1,123 +1,187 @@
# 🤖 LangGraph Sysadmin Debugging Agent
# LangGraph Sysadmin AI Agents
A LangGraph-powered AI agent designed to assist system administrators in their daily debugging tasks by analyzing log files and executing shell commands with intelligent reasoning.
This repository demonstrates two different approaches to building AI-powered system administration agents using LangGraph:
## 🛠️ Technology Stack
## <EFBFBD> Two Approaches Available
This is a **LangGraph agent** that combines:
### 1. Simple ReAct Agent (`simple-react-agent/`)
A straightforward [single-agent approach](https://langchain-ai.github.io/langgraph/agents/agents/#1-install-dependencies) using the ReAct (Reasoning and Acting) pattern.
- **LangGraph**: State-based AI agent framework for building conversational AI workflows
- **ReAct (Reasoning and Acting)**: Langchain [primitive to create ReAct agents](https://langchain-ai.github.io/langgraph/agents/overview/)
- **OpenAI GPT-4o-mini**: Large Language Model for intelligent reasoning and tool usage
- **LangChain Tools**:
- [**ShellTool** (prebuilt)](https://python.langchain.com/api_reference/community/tools/langchain_community.tools.shell.tool.ShellTool.html): Executes shell commands for system investigation
- **log_analyzer** (custom tool): Structured log file analysis with pattern recognition
- [**Loghub Dataset**](https://github.com/logpai/loghub): Comprehensive collection of real-world system logs as git submodule
**Best for:**
- Learning LangGraph fundamentals
- Simple log analysis tasks
- Resource-constrained environments
- Quick prototyping
## 🎯 Agent Goals
### 2. Multi-Agent Supervisor (`multi-agent-supervisor/`)
A sophisticated system with [multiple agents coordinated by a supervisor](https://langchain-ai.github.io/langgraph/agents/multi-agent/#supervisor).
This agent helps sysadmins by:
**Best for:**
- Complex system administration tasks
- Comprehensive system analysis
- Production environments
- When you need domain expertise
- **Log Analysis**: Automatically detect error patterns, frequency anomalies, and timeline issues
- **Shell Operations**: Execute diagnostic commands (`grep`, `awk`, `tail`, `ps`, `netstat`, etc.)
- **Pattern Recognition**: Identify common system issues across different log types
- **Interactive Debugging**: Maintain conversation context for multi-step troubleshooting
- **Knowledge Transfer**: Demonstrate best practices for log analysis and system debugging
## 🤔 Which Approach Should You Choose?
## 📊 Dataset
| Factor | Simple ReAct | Multi-Agent Supervisor |
|--------|-------------|----------------------|
| **Complexity** | Low | High |
| **Setup Time** | Quick | More involved |
| **Resource Usage** | Light | Heavy |
| **Specialization** | General purpose | Domain experts |
| **Parallel Processing** | No | Yes |
| **Risk Assessment** | Basic | Advanced |
| **Debugging** | Easy | More complex |
| **Extensibility** | Limited | Highly extensible |
The agent uses the **Loghub** repository as a git submodule, providing access to:
## 📊 Feature Comparison
- **Distributed Systems**: HDFS, Hadoop, Spark, Zookeeper, OpenStack
- **Supercomputers**: BGL, HPC, Thunderbird
- **Operating Systems**: Windows, Linux, Mac
- **Mobile Systems**: Android, HealthApp
- **Server Applications**: Apache, OpenSSH
- **Standalone Software**: Proxifier
### Simple ReAct Agent
```
✅ Single agent handles all tasks
✅ Easy to understand and debug
✅ Low resource usage
✅ Quick setup
✅ Interactive chat with streaming
❌ No specialization
❌ Sequential processing only
❌ Limited scaling for complex tasks
```
## 🚀 Setup Instructions
### Multi-Agent Supervisor
```
✅ Specialized domain experts
✅ Parallel processing
✅ Intelligent task delegation
✅ Risk assessment and severity scoring
✅ Comprehensive analysis
✅ Highly extensible
❌ More complex setup
❌ Higher resource usage
❌ Coordination overhead
```
### Prerequisites
## 🛠 Setup
- Python 3.8+
- OpenAI API key
- Git
### Installation
1. **Clone the repository with submodules:**
```bash
git clone --recurse-submodules https://github.com/your-username/langgraph-pard0x.git
cd langgraph-pard0x
```
2. **Install dependencies:**
```bash
# Using uv (recommended)
uv sync
# Or using pip
pip install -r requirements.txt
```
3. **Set up OpenAI API key:**
```bash
export OPENAI_API_KEY='your-api-key-here'
# Or create a .env file
echo "OPENAI_API_KEY=your-api-key-here" > .env
```
4. **Initialize the loghub submodule (if not cloned with --recurse-submodules):**
```bash
git submodule update --init --recursive
```
### Running the Agent
Both approaches require the same base dependencies:
```bash
# Install dependencies
pip install langchain-openai langgraph langchain-community
# For multi-agent supervisor, also install:
pip install langgraph-supervisor
# Set your OpenAI API key
export OPENAI_API_KEY="your-api-key-here"
```
## 📁 Directory Structure
```
├── simple-react-agent/ # Single ReAct agent approach
│ ├── main.py # Main application
│ ├── log_analyzer.py # Log analysis tool
│ ├── loghub/ # → symlink to ../loghub
│ └── README.md # Detailed documentation
├── multi-agent-supervisor/ # Multi-agent supervisor approach
│ ├── main-multi-agent.py # Multi-agent implementation
│ ├── loghub/ # → symlink to ../loghub
│ └── README.md # Detailed documentation
├── loghub/ # Sample log files
│ ├── Apache/
│ ├── Linux/
│ ├── Nginx/
│ └── ... (various system logs)
└── README.md # This file
```
## 🚀 Quick Start
### Try the Simple ReAct Agent
```bash
cd simple-react-agent
python main.py
```
## 💡 Usage Examples
### Multi-steps multi-tools debugging:
```
User: Where is the log file named Linux_2k.log on my system?
Agent: I'll search the file Linux_2k.log on your system and return its path.
[Executes shell tool to `find / -name "Linux_2k.log"]
User: Analyze this log file and tell me if there are any issues or anomalies on my system
Agent:
[Use log analysis tools on Linux_2k.log]
### Try the Multi-Agent Supervisor
```bash
cd multi-agent-supervisor
python main-multi-agent.py
```
### Specific Analysis Types
## 💡 Example Use Cases
### Simple ReAct Agent Examples
```
User: Get a frequency analysis of Apache error patterns
Agent: [Uses analyze_log_file with analysis_type="frequency" on Apache logs]
User: Show me timeline patterns in Hadoop logs
Agent: [Uses analyze_log_file with analysis_type="timeline" on Hadoop logs]
User: Give me a summary of the Windows event logs
Agent: [Uses analyze_log_file with analysis_type="summary" on Windows logs]
"Analyze the Apache logs for error patterns"
"Check disk usage on the system"
"List all available log files"
"Find timeline patterns in Linux logs"
```
### Combined Approach
### Multi-Agent Supervisor Examples
```
User: Find all critical errors in the system and suggest fixes
Agent:
1. [Analyzes multiple log files for error patterns]
2. [Executes shell commands to gather system state]
3. [Provides structured analysis and recommendations]
"Nginx returns 502 Bad Gateway - diagnose the issue"
"Perform a comprehensive system health check"
"Analyze all services and provide a risk assessment"
"Check for security vulnerabilities and suggest hardening"
```
## 🔧 Available Analysis Types
## 🧪 Sample Logs Available
The `loghub/` directory contains sample logs from various systems:
- **Web Servers**: Apache, Nginx
- **Operating Systems**: Linux, Mac, Windows
- **Big Data**: Hadoop, HDFS, Spark
- **Databases**: Various database logs
- **Applications**: Health apps, mobile apps
- **Security**: SSH, authentication logs
## 🔍 Decision Guide
**Choose Simple ReAct Agent if:**
- You're new to LangGraph
- You need basic log analysis
- You have limited computational resources
- You prefer simplicity and transparency
- You're building a proof of concept
**Choose Multi-Agent Supervisor if:**
- You need comprehensive system analysis
- You're working with multiple services
- You want parallel processing
- You need risk assessment capabilities
- You're building a production system
- You want to leverage specialized expertise
## 📚 Learning Path
1. **Start with Simple ReAct** to understand LangGraph basics
2. **Examine the code** to see how agents and tools work
3. **Try both approaches** with the same queries
4. **Compare the results** and execution patterns
5. **Choose your approach** based on your specific needs
## 🤝 Contributing
Feel free to:
- Add new specialized agents to the multi-agent system
- Enhance the log analysis capabilities
- Add new tools for system administration
- Improve error handling and reliability
- Add tests and documentation
## 📝 License
This project is for educational and demonstration purposes. Modify and use as needed for your projects.
---
**Happy system administration with AI! 🤖🔧**
The custom `log_analyzer` tool supports:

View File

@ -1,142 +0,0 @@
import os
import re
from collections import Counter
from typing import List, Dict, Any
from langchain_core.tools import tool
@tool
def analyze_log_file(file_path: str, analysis_type: str = "error_patterns") -> Dict[str, Any]:
"""
Analyze log files for common sysadmin debugging patterns.
Args:
file_path: Path to the log file (relative to loghub directory)
analysis_type: Type of analysis - "error_patterns", "frequency", "timeline", or "summary"
Returns:
Dictionary with analysis results
"""
try:
# Construct full path
if not file_path.startswith('/'):
full_path = f"loghub/{file_path}"
else:
full_path = file_path
if not os.path.exists(full_path):
return {"error": f"File not found: {full_path}"}
with open(full_path, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
if analysis_type == "error_patterns":
return _analyze_error_patterns(lines, file_path)
elif analysis_type == "frequency":
return _analyze_frequency(lines, file_path)
elif analysis_type == "timeline":
return _analyze_timeline(lines, file_path)
elif analysis_type == "summary":
return _analyze_summary(lines, file_path)
else:
return {"error": f"Unknown analysis type: {analysis_type}"}
except Exception as e:
return {"error": f"Error analyzing file: {str(e)}"}
def _analyze_error_patterns(lines: List[str], file_path: str) -> Dict[str, Any]:
"""Analyze error patterns in log lines."""
error_keywords = ['error', 'fail', 'exception', 'critical', 'fatal', 'denied', 'refused', 'timeout']
error_lines = []
error_counts = Counter()
for i, line in enumerate(lines, 1):
line_lower = line.lower()
for keyword in error_keywords:
if keyword in line_lower:
error_lines.append(f"Line {i}: {line.strip()}")
error_counts[keyword] += 1
break
return {
"file": file_path,
"analysis_type": "error_patterns",
"total_lines": len(lines),
"error_lines_count": len(error_lines),
"error_keywords_frequency": dict(error_counts.most_common()),
"sample_errors": error_lines[:10], # First 10 error lines
"summary": f"Found {len(error_lines)} error-related lines out of {len(lines)} total lines"
}
def _analyze_frequency(lines: List[str], file_path: str) -> Dict[str, Any]:
"""Analyze frequency patterns in logs."""
# Extract common patterns (simplified)
patterns = Counter()
for line in lines:
# Remove timestamps and specific values for pattern matching
cleaned = re.sub(r'\d+', 'NUM', line)
cleaned = re.sub(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', 'IP', cleaned)
cleaned = re.sub(r'[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}', 'UUID', cleaned)
patterns[cleaned.strip()] += 1
return {
"file": file_path,
"analysis_type": "frequency",
"total_lines": len(lines),
"unique_patterns": len(patterns),
"most_common_patterns": [{"pattern": p, "count": c} for p, c in patterns.most_common(10)],
"summary": f"Found {len(patterns)} unique patterns in {len(lines)} lines"
}
def _analyze_timeline(lines: List[str], file_path: str) -> Dict[str, Any]:
"""Analyze timeline patterns in logs."""
timestamps = []
# Try to extract timestamps (simplified for demo)
timestamp_patterns = [
r'(\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})', # Jun 14 15:16:01
r'(\[\w{3}\s+\w{3}\s+\d{2}\s+\d{2}:\d{2}:\d{2}\s+\d{4}\])', # [Sun Dec 04 04:47:44 2005]
]
for line in lines[:100]: # Sample first 100 lines for demo
for pattern in timestamp_patterns:
match = re.search(pattern, line)
if match:
timestamps.append(match.group(1))
break
return {
"file": file_path,
"analysis_type": "timeline",
"total_lines": len(lines),
"timestamps_found": len(timestamps),
"sample_timestamps": timestamps[:10],
"summary": f"Extracted {len(timestamps)} timestamps from first 100 lines"
}
def _analyze_summary(lines: List[str], file_path: str) -> Dict[str, Any]:
"""Provide a general summary of the log file."""
total_lines = len(lines)
# Basic statistics
avg_line_length = sum(len(line) for line in lines) / total_lines if total_lines > 0 else 0
empty_lines = sum(1 for line in lines if not line.strip())
# Sample content
sample_lines = [line.strip() for line in lines[:5] if line.strip()]
return {
"file": file_path,
"analysis_type": "summary",
"total_lines": total_lines,
"empty_lines": empty_lines,
"average_line_length": round(avg_line_length, 2),
"sample_content": sample_lines,
"summary": f"Log file with {total_lines} lines, average length {avg_line_length:.1f} characters"
}

213
main.py
View File

@ -1,213 +0,0 @@
import os
from langchain.chat_models import init_chat_model
from langchain_community.tools.shell.tool import ShellTool
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import HumanMessage
from log_analyzer import analyze_log_file
def create_agent():
"""Create and return a ReAct agent with shell and log analysis capabilities."""
# Initialize the chat model (using OpenAI GPT-4)
# Make sure you have set your OPENAI_API_KEY environment variable
llm = init_chat_model("openai:gpt-4o-mini")
# Define the tools available to the agent
shell_tool = ShellTool()
tools = [shell_tool, analyze_log_file]
# Create a ReAct agent with system prompt
system_prompt = """You are a helpful assistant with access to shell commands and log analysis capabilities.
You can:
1. Execute shell commands using the shell tool to interact with the system
2. Analyze log files using the analyze_log_file tool to help with debugging and system administration tasks
The log analyzer can process files in the loghub directory with different analysis types:
- "error_patterns": Find and categorize error messages
- "frequency": Analyze frequency of different log patterns
- "timeline": Show chronological patterns of events
- "summary": Provide an overall summary of the log file
When helping users:
- Be thorough in your analysis
- Explain what you're doing and why
- Use appropriate tools based on the user's request
- If analyzing logs, suggest which analysis type might be most helpful
- Always be cautious with shell commands and explain what they do
Available log files are in the loghub directory with subdirectories for different systems like:
Android, Apache, BGL, Hadoop, HDFS, HealthApp, HPC, Linux, Mac, OpenSSH, OpenStack, Proxifier, Spark, Thunderbird, Windows, Zookeeper
"""
# Create the ReAct agent
agent = create_react_agent(
llm,
tools,
prompt=system_prompt
)
return agent
def stream_agent_updates(agent, user_input: str, conversation_history: list):
"""Stream agent updates for a user input with conversation history."""
# Create a human message
message = HumanMessage(content=user_input)
# Add the new message to conversation history
conversation_history.append(message)
print("\nAgent: ", end="", flush=True)
# Use the agent's stream method to get real-time updates with full conversation
final_response = ""
tool_calls_made = False
for event in agent.stream({"messages": conversation_history}, stream_mode="updates"):
for node_name, node_output in event.items():
if node_name == "agent" and "messages" in node_output:
last_message = node_output["messages"][-1]
# Check if this is a tool call
if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
tool_calls_made = True
for tool_call in last_message.tool_calls:
print(f"\n🔧 Using tool: {tool_call['name']}")
if tool_call.get('args'):
print(f" Args: {tool_call['args']}")
# Check if this is the final response (no tool calls)
elif hasattr(last_message, 'content') and last_message.content and not getattr(last_message, 'tool_calls', None):
final_response = last_message.content
elif node_name == "tools" and "messages" in node_output:
# Show tool results
for msg in node_output["messages"]:
if hasattr(msg, 'content'):
print(f"\n📋 Tool result: {msg.content[:200]}{'...' if len(msg.content) > 200 else ''}")
# Print the final response
if final_response:
if tool_calls_made:
print(f"\n\n{final_response}")
else:
print(final_response)
# Add the agent's response to conversation history
from langchain_core.messages import AIMessage
conversation_history.append(AIMessage(content=final_response))
else:
print("No response generated.")
print() # Add newline
def visualize_agent(agent):
"""Display the agent's graph structure."""
try:
print("\n📊 Agent Graph Structure:")
print("=" * 40)
# Get the graph and display its structure
graph = agent.get_graph()
# Print nodes
print("Nodes:")
for node_id in graph.nodes:
print(f" - {node_id}")
# Print edges
print("\nEdges:")
for edge in graph.edges:
print(f" - {edge}")
print("=" * 40)
print("This agent follows the ReAct (Reasoning and Acting) pattern:")
print("1. Receives user input")
print("2. Reasons about what tools to use")
print("3. Executes tools when needed")
print("4. Provides final response")
print("=" * 40)
except Exception as e:
print(f"Could not visualize agent: {e}")
def main():
# Check if required API keys are set
if not os.getenv("OPENAI_API_KEY"):
print("Please set your OPENAI_API_KEY environment variable.")
print("You can set it by running: export OPENAI_API_KEY='your-api-key-here'")
return
print("🤖 LangGraph Log Analysis Agent")
print("Type 'quit', 'exit', or 'q' to exit the chat.")
print("Type 'help' or 'h' for help and examples.")
print("Type 'graph' to see the agent structure.")
print("Type 'clear' or 'reset' to clear conversation history.")
print("⚠️ WARNING: This agent has shell access - use with caution!")
print("📊 Available log analysis capabilities:")
print(" - Analyze log files in the loghub directory")
print(" - Execute shell commands for system administration")
print(" - Help with debugging and troubleshooting")
print("-" * 60)
# Create the agent
try:
agent = create_agent()
print("✅ Log Analysis Agent initialized successfully!")
print("💡 Try asking: 'Analyze the Apache logs for error patterns'")
print("💡 Or: 'List the available log files in the loghub directory'")
# Show agent structure
visualize_agent(agent)
except Exception as e:
print(f"❌ Error initializing agent: {e}")
return
# Start the chat loop
conversation_history = [] # Initialize conversation history
while True:
try:
user_input = input("\nUser: ")
if user_input.lower() in ["quit", "exit", "q"]:
print("👋 Goodbye!")
break
elif user_input.lower() in ["help", "h"]:
print("\n🆘 Help:")
print("Commands:")
print(" - quit/exit/q: Exit the agent")
print(" - help/h: Show this help")
print(" - graph: Show agent structure")
print("\nExample queries:")
print(" - 'Analyze the Apache logs for error patterns'")
print(" - 'Show me a summary of the HDFS logs'")
print(" - 'List all available log files'")
print(" - 'Find error patterns in Linux logs'")
print(" - 'Check disk usage on the system'")
print(" - 'clear': Clear conversation history")
continue
elif user_input.lower() in ["graph", "structure"]:
visualize_agent(agent)
continue
elif user_input.lower() in ["clear", "reset"]:
conversation_history = []
print("🗑️ Conversation history cleared!")
continue
if user_input.strip():
stream_agent_updates(agent, user_input, conversation_history)
else:
print("Please enter a message.")
except KeyboardInterrupt:
print("\n👋 Goodbye!")
break
except Exception as e:
print(f"❌ Error: {e}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,90 @@
# Multi-Agent Sysadmin Assistant
A modular multi-agent system for system administration tasks using LangChain and LangGraph.
## Architecture
The system is organized into several modules for better maintainability:
### 📁 Project Structure
```
multi-agent-supervisor/
├── main-multi-agent.py # Main entry point
├── config.py # Configuration and settings
├── supervisor.py # Supervisor orchestration
├── utils.py # Utility functions
├── requirements.txt # Dependencies
├── custom_tools/ # Custom tool implementations
│ ├── __init__.py
│ ├── log_tail_tool.py # Log reading tool
│ └── shell_tool_wrapper.py # Shell tool wrapper
└── agents/ # Agent definitions
├── __init__.py
├── system_agents.py # System monitoring agents
├── service_agents.py # Service-specific agents
├── network_agents.py # Network and security agents
└── analysis_agents.py # Analysis and remediation agents
```
## Agents
### System Agents
- **System Info Worker**: Gathers CPU, RAM, and disk usage
- **Service Inventory Worker**: Lists running services
### Service Agents
- **MariaDB Analyzer**: Checks MariaDB configuration and logs
- **Nginx Analyzer**: Validates Nginx configuration and logs
- **PHP-FPM Analyzer**: Monitors PHP-FPM status and performance
### Network Agents
- **Network Diagnostics**: Uses ping, traceroute, and dig
- **Certificate Checker**: Monitors TLS certificate expiration
### Analysis Agents
- **Risk Scorer**: Aggregates findings and assigns severity levels
- **Remediation Worker**: Proposes safe fixes for issues
- **Harmonizer Worker**: Applies system hardening best practices
## Benefits of Modular Architecture
1. **Separation of Concerns**: Each module has a single responsibility
2. **Reusability**: Tools and agents can be easily reused across projects
3. **Maintainability**: Easy to update individual components
4. **Testability**: Each module can be tested independently
5. **Scalability**: Easy to add new agents or tools
6. **Code Organization**: Clear structure makes navigation easier
## Usage
```python
from supervisor import create_sysadmin_supervisor
# Create supervisor with all agents
supervisor = create_sysadmin_supervisor()
# Run analysis
query = {
"messages": [
{
"role": "user",
"content": "Check if my web server is running properly"
}
]
}
result = supervisor.invoke(query)
```
## Adding New Agents
1. Create agent function in appropriate module under `agents/`
2. Import and add to supervisor in `supervisor.py`
3. Update supervisor prompt in `config.py`
## Adding New Tools
1. Create tool class in `custom_tools/`
2. Export from `custom_tools/__init__.py`
3. Import and use in agent definitions

View File

@ -0,0 +1,185 @@
# Multi-Agent Supervisor System for Sysadmin Tasks
This directory contains a sophisticated multi-agent system with a supervisor pattern for comprehensive system administration and troubleshooting.
## Overview
The multi-agent supervisor system uses multiple specialized agents coordinated by a supervisor to handle complex sysadmin tasks:
1. **Supervisor Agent**: Orchestrates and delegates tasks to specialized workers
2. **Specialized Workers**: Each agent is an expert in a specific domain
3. **Parallel Processing**: Multiple agents can work simultaneously
4. **Intelligent Routing**: Tasks are routed to the most appropriate specialist
## Architecture
```
User Input → Supervisor → Specialized Agents → Aggregated Response
┌─────────────────────────────────────────────────┐
│ system_info │ nginx │ mariadb │ network │ ... │
└─────────────────────────────────────────────────┘
```
## Specialized Agents
### Core System Agents
- **`system_info_worker`**: CPU, RAM, disk usage monitoring
- **`service_inventory_worker`**: Lists running services
### Service-Specific Agents
- **`mariadb_analyzer`**: MariaDB configuration and log analysis
- **`nginx_analyzer`**: Nginx configuration validation and log analysis
- **`phpfpm_analyzer`**: PHP-FPM performance and error analysis
### Network & Security Agents
- **`network_diag`**: Network connectivity and DNS diagnostics
- **`cert_checker`**: TLS certificate validation and expiry alerts
### Analysis & Action Agents
- **`risk_scorer`**: Aggregates findings and assigns severity levels
- **`remediation_worker`**: Proposes safe fixes for detected issues
- **`harmonizer_worker`**: Applies security hardening best practices
## Features
### Advanced Capabilities
- **Intelligent Delegation**: Supervisor routes tasks to appropriate specialists
- **Parallel Execution**: Multiple agents can work simultaneously
- **Severity Assessment**: Risk scoring with Critical/High/Medium/Low levels
- **Safe Remediation**: Proposes fixes with confirmation requests
- **Security Hardening**: Automated best-practice application
### Execution Modes
- **Invoke Mode**: Complete analysis with final result
- **Stream Mode**: Real-time step-by-step execution visibility
## Files
- `main-multi-agent.py`: Complete multi-agent supervisor implementation
- `loghub/`: Symbolic link to log files directory
## Usage
```bash
cd multi-agent-supervisor
python main-multi-agent.py
```
The script includes both execution modes:
### 1. Invoke Mode (Complete Analysis)
```python
result = supervisor.invoke(query)
print(result["messages"][-1]["content"])
```
### 2. Stream Mode (Step-by-Step)
```python
for chunk in supervisor.stream(query):
# Real-time agent execution monitoring
print(f"🤖 ACTIVE AGENT: {current_agent}")
print(f"🔧 TOOL CALLS: {len(tool_calls)} tool(s)")
```
## Example Workflow
For the query: *"Nginx returns 502 Bad Gateway on my server. What can I do?"*
1. **Supervisor** analyzes the request
2. **system_info_worker** checks system resources
3. **service_inventory_worker** lists running services
4. **nginx_analyzer** validates Nginx configuration and checks logs
5. **phpfpm_analyzer** checks PHP-FPM status (common 502 cause)
6. **risk_scorer** assesses the severity
7. **remediation_worker** proposes specific fixes
## Pros and Cons
### ✅ Pros
- **Domain Expertise**: Each agent specializes in specific areas
- **Parallel Processing**: Multiple agents work simultaneously
- **Comprehensive Analysis**: Systematic approach to complex problems
- **Risk Assessment**: Built-in severity scoring
- **Intelligent Routing**: Tasks go to the right specialist
- **Scalable**: Easy to add new specialized agents
### ❌ Cons
- **Complexity**: More sophisticated setup and debugging
- **Resource Intensive**: Higher computational overhead
- **Coordination Overhead**: Supervisor management complexity
- **Potential Over-engineering**: May be overkill for simple tasks
## When to Use
Choose the multi-agent supervisor when:
- You need comprehensive system analysis
- Multiple services/components are involved
- You want parallel processing capabilities
- Risk assessment and severity scoring are important
- You're dealing with complex, multi-faceted problems
- You need specialized domain expertise
## Agent Interaction Flow
```mermaid
graph TD
A[User Query] --> B[Supervisor]
B --> C[system_info_worker]
B --> D[service_inventory_worker]
B --> E[Service Specialists]
E --> F[nginx_analyzer]
E --> G[mariadb_analyzer]
E --> H[phpfpm_analyzer]
C --> I[risk_scorer]
D --> I
F --> I
G --> I
H --> I
I --> J[remediation_worker]
J --> K[Final Response]
```
## Customization
### Adding New Agents
```python
new_agent = create_react_agent(
model="openai:gpt-4o-mini",
tools=[shell_tool, custom_tools],
prompt="Your specialized agent prompt...",
name="new_specialist"
)
# Add to supervisor
supervisor = create_supervisor(
agents=[...existing_agents, new_agent],
model=model,
prompt=updated_supervisor_prompt
)
```
### Custom Tools
```python
class CustomTool(BaseTool):
name = "custom_tool"
description = "Tool description"
def _run(self, **kwargs):
# Tool implementation
return result
```
## Requirements
```bash
pip install langchain-openai langgraph langgraph-supervisor langchain-community
export OPENAI_API_KEY="your-api-key"
```
## Performance Considerations
- **Token Usage**: Higher due to multiple agent interactions
- **Execution Time**: May be longer due to coordination overhead
- **Memory**: Higher memory usage with multiple concurrent agents
- **Rate Limits**: Monitor API rate limits with parallel requests

View File

@ -0,0 +1,143 @@
# Understanding Multi-Agent Transfers
## What "Successfully transferred..." means
When you see messages like:
- `Successfully transferred to system_info_worker`
- `Successfully transferred back to supervisor`
These are **tool execution results** from the LangGraph supervisor pattern. Here's what's happening:
## 🔄 The Transfer Flow
1. **Supervisor receives user query**: "Nginx returns 502 Bad Gateway on my server. What can I do?"
2. **Supervisor analyzes and delegates**: Based on the `SUPERVISOR_PROMPT` in `config.py`, it decides to start with `system_info_worker`
3. **Transfer tool execution**: Supervisor calls `transfer_to_system_info_worker` tool
- **Result**: "Successfully transferred to system_info_worker"
- **Meaning**: Control is now handed to the system_info_worker agent
4. **Agent executes**: The `system_info_worker` gets:
- Full conversation context (including the original user query)
- Its own specialized prompt from `agents/system_agents.py`
- Access to its tools (shell commands for system info)
5. **Agent completes and returns**: Agent calls `transfer_back_to_supervisor`
- **Result**: "Successfully transferred back to supervisor"
- **Meaning**: Agent finished its task and returned control
- **Important**: Agent's results are now part of the conversation history
6. **Supervisor decides next step**: Based on **accumulated results**, supervisor either:
- Delegates to another agent (e.g., `service_inventory_worker`)
- Provides final response to user
- **Key**: Supervisor can see ALL previous agent results when making decisions
## 🧠 How Prompts Work
### Supervisor Prompt (config.py)
```python
SUPERVISOR_PROMPT = """
You are the supervisor of a team of specialised sysadmin agents.
Decide which agent to delegate to based on the user's query **or** on results already collected.
Available agents:
- system_info_worker: gather system metrics
- service_inventory_worker: list running services
- mariadb_analyzer: analyse MariaDB
...
Always start with `system_info_worker` and `service_inventory_worker` before drilling into a specific service.
"""
```
### Agent Prompts (agents/*.py)
Each agent has its own specialized prompt, for example:
```python
# system_info_worker prompt
"""
You are a Linux sysadmin. Use shell commands like `lscpu`, `free -h`, and `df -h` to gather CPU, RAM, and disk usage.
Return a concise plaintext summary. Only run safe, readonly commands.
"""
```
## 🎯 What Each Agent Receives
When an agent is activated via transfer:
- **Full conversation history**: All previous messages between user, supervisor, and other agents
- **Specialized prompt**: Guides how the agent should interpret and act on the conversation
- **Tools**: Shell access, specific analyzers, etc.
- **Context**: Results from previous agents in the conversation
## 🔄 How Agent Results Flow Back to Supervisor
**This is the key mechanism that makes the multi-agent system intelligent:**
1. **Agent produces results**: Each agent generates an `AIMessage` with its findings/analysis
2. **Results become part of conversation**: The agent's response is added to the shared message history
3. **Supervisor sees everything**: When control returns to supervisor, it has access to:
- Original user query
- All previous agent responses
- Tool execution results
- Complete conversation context
4. **Supervisor strategy updates**: Based on accumulated knowledge, supervisor can:
- Decide which agent to call next
- Skip unnecessary agents if enough info is gathered
- Synthesize results from multiple agents
- Provide final comprehensive response
### Example Flow:
```
User: "Nginx 502 error, help!"
├── Supervisor → system_info_worker
│ └── Returns: "502 usually means upstream server issues, check logs..."
├── Supervisor (now knows about upstream issues) → service_inventory_worker
│ └── Returns: "Check PHP-FPM status, verify upstream config..."
└── Supervisor (has both perspectives) → Final synthesis
└── "Based on system analysis and service inventory, here's comprehensive solution..."
```
## 🔍 Enhanced Debugging
The updated `utils.py` now shows:
- **Transfer explanations**: What each "Successfully transferred" means
- **Conversation context**: Last few messages to understand the flow
- **Tool call details**: What tools are being used and why
- **Agent delegation**: Which agent is being called and for what purpose
## 🔍 Observing Result Flow in Practice
To see how results flow back to the supervisor, run the enhanced debugging and watch for:
1. **Agent Results**: Look for `AIMessage` from agents (not just transfer confirmations)
2. **Conversation Context**: The expanding message history in each step
3. **Supervisor Decision Changes**: How supervisor's next choice is influenced by results
### Example Debug Output Analysis:
```
🔄 STEP 2: system_info_worker
💬 MESSAGE TYPE: AIMessage ← AGENT'S ACTUAL RESULT
📄 CONTENT: "502 typically indicates upstream server issues..."
🔄 STEP 4: service_inventory_worker
💬 MESSAGE TYPE: AIMessage ← AGENT'S ACTUAL RESULT
📄 CONTENT: "Check PHP-FPM status, verify upstream config..."
🔄 STEP 5: supervisor
💬 MESSAGE TYPE: AIMessage ← SUPERVISOR'S SYNTHESIS
📄 CONTENT: "Based on system analysis and service inventory..."
📚 CONVERSATION CONTEXT (12 messages) ← SUPERVISOR SEES ALL RESULTS
```
The supervisor's final response demonstrates it has processed and synthesized results from both agents!
## 📋 Key Takeaways
- **"Successfully transferred"** = Control handoff confirmation, not data transfer
- **Each agent** gets the full conversation context INCLUDING previous agent results
- **Agent prompts** determine how they process that context
- **Supervisor** orchestrates the workflow based on its prompt strategy
- **The conversation** builds up context as each agent contributes their expertise
- **Results accumulate**: Each agent can see and build upon previous agents' work
- **Supervisor learns**: Strategy updates based on what agents discover
- **Dynamic workflow**: Supervisor can skip agents or change direction based on results

View File

@ -0,0 +1,33 @@
"""Agent definitions for the multi-agent sysadmin system."""
from .system_agents import (
create_system_info_worker,
create_service_inventory_worker,
)
from .service_agents import (
create_mariadb_worker,
create_nginx_worker,
create_phpfpm_worker,
)
from .network_agents import (
create_network_worker,
create_cert_worker,
)
from .analysis_agents import (
create_risk_worker,
create_remediation_worker,
create_harmonizer_worker,
)
__all__ = [
"create_system_info_worker",
"create_service_inventory_worker",
"create_mariadb_worker",
"create_nginx_worker",
"create_phpfpm_worker",
"create_network_worker",
"create_cert_worker",
"create_risk_worker",
"create_remediation_worker",
"create_harmonizer_worker",
]

View File

@ -0,0 +1,42 @@
"""Analysis and remediation agents."""
from langgraph.prebuilt import create_react_agent
from custom_tools import get_shell_tool
def create_risk_worker():
"""Create risk assessment agent."""
return create_react_agent(
model="openai:gpt-4o-mini",
tools=[], # pureLLM reasoning
prompt="""
Aggregate the findings from other agents and assign a severity: Critical, High, Medium, or Low.
Output a short report.
""",
name="risk_scorer"
)
def create_remediation_worker():
"""Create remediation agent."""
return create_react_agent(
model="openai:gpt-4o-mini",
tools=[get_shell_tool()],
prompt="""
Propose safe bash commands or configuration edits to fix detected issues.
NEVER run destructive commands automatically; always request confirmation.
""",
name="remediation_worker"
)
def create_harmonizer_worker():
"""Create system hardening agent."""
return create_react_agent(
model="openai:gpt-4o-mini",
tools=[get_shell_tool()],
prompt="""
Apply bestpractice hardening (`ulimit`, `sysctl`, journald rotation) in dryrun mode unless severity is High.
""",
name="harmonizer_worker"
)

View File

@ -0,0 +1,29 @@
"""Network and security monitoring agents."""
from langgraph.prebuilt import create_react_agent
from custom_tools import get_shell_tool
def create_network_worker():
"""Create network diagnostics agent."""
return create_react_agent(
model="openai:gpt-4o-mini",
tools=[get_shell_tool()],
prompt="""
Diagnose network issues using `ping`, `traceroute`, and `dig`.
""",
name="network_diag"
)
def create_cert_worker():
"""Create certificate checking agent."""
return create_react_agent(
model="openai:gpt-4o-mini",
tools=[get_shell_tool()],
prompt="""
Check TLS certificates on disk with `openssl x509 -noout -enddate -in <cert>`.
Raise an alert when a certificate expires in fewer than 30 days.
""",
name="cert_checker"
)

View File

@ -0,0 +1,42 @@
"""Service-specific monitoring agents."""
from langgraph.prebuilt import create_react_agent
from custom_tools import get_shell_tool, LogTailTool
def create_mariadb_worker():
"""Create MariaDB analysis agent."""
return create_react_agent(
model="openai:gpt-4o-mini",
tools=[get_shell_tool(), LogTailTool()],
prompt="""
You are a MariaDB expert. Check config files in /etc/mysql and inspect `/var/log/mysql/*.log` for errors.
Use `mysqladmin status` and other readonly commands. Use the `tail_log` tool for logs.
""",
name="mariadb_analyzer"
)
def create_nginx_worker():
"""Create Nginx analysis agent."""
return create_react_agent(
model="openai:gpt-4o-mini",
tools=[get_shell_tool(), LogTailTool()],
prompt="""
You are an Nginx expert. Validate configuration with `nginx -t` and inspect access/error logs.
Use the `tail_log` tool for `/var/log/nginx/error.log`.
""",
name="nginx_analyzer"
)
def create_phpfpm_worker():
"""Create PHP-FPM analysis agent."""
return create_react_agent(
model="openai:gpt-4o-mini",
tools=[get_shell_tool(), LogTailTool()],
prompt="""
You are a PHPFPM expert. Check `systemctl status php*-fpm` and look for memory leaks or timeouts in the logs.
""",
name="phpfpm_analyzer"
)

View File

@ -0,0 +1,30 @@
"""System monitoring agents."""
from langgraph.prebuilt import create_react_agent
from custom_tools import get_shell_tool
def create_system_info_worker():
"""Create system information gathering agent."""
return create_react_agent(
model="openai:gpt-4o-mini",
tools=[get_shell_tool()],
prompt="""
You are a Linux sysadmin. Use shell commands like `lscpu`, `free -h`, and `df -h` to gather CPU, RAM, and disk usage.
Return a concise plaintext summary. Only run safe, readonly commands.
""",
name="system_info_worker"
)
def create_service_inventory_worker():
"""Create service inventory agent."""
return create_react_agent(
model="openai:gpt-4o-mini",
tools=[get_shell_tool()],
prompt="""
List all running services using `systemctl list-units --type=service --state=running`.
Return a JSON array of service names.
""",
name="service_inventory_worker"
)

View File

@ -0,0 +1,26 @@
"""Configuration settings for the multi-agent system."""
from langchain_openai import ChatOpenAI
def get_base_model():
"""Get the base LLM model configuration."""
return ChatOpenAI(model="gpt-4o-mini", temperature=0)
SUPERVISOR_PROMPT = """
You are the supervisor of a team of specialised sysadmin agents.
Decide which agent to delegate to based on the user's query **or** on results already collected.
Available agents:
- system_info_worker: gather system metrics
- service_inventory_worker: list running services
- mariadb_analyzer: analyse MariaDB
- nginx_analyzer: analyse Nginx
- phpfpm_analyzer: analyse PHPFPM
- network_diag: diagnose network issues
- cert_checker: check TLS certificates
- risk_scorer: aggregate severity
- remediation_worker: propose fixes
- harmonizer_worker: apply hardening
Always start with `system_info_worker` and `service_inventory_worker` before drilling into a specific service.
"""

View File

@ -0,0 +1,6 @@
"""Custom tools for the multi-agent sysadmin system."""
from .log_tail_tool import LogTailTool
from .shell_tool_wrapper import get_shell_tool
__all__ = ["LogTailTool", "get_shell_tool"]

View File

@ -0,0 +1,24 @@
"""Log tail tool for reading log files."""
import subprocess
from langchain_core.tools import BaseTool
class LogTailTool(BaseTool):
"""Tail the last N lines from a log file."""
name: str = "tail_log"
description: str = "Tail the last N lines of a log file given its path and optional number of lines."
def _run(self, path: str, lines: int = 500): # type: ignore[override]
"""Run the tool to tail log files."""
try:
return subprocess.check_output(["tail", "-n", str(lines), path], text=True)
except subprocess.CalledProcessError as e:
return f"Error reading log file {path}: {e}"
except FileNotFoundError:
return f"Log file not found: {path}"
async def _arun(self, *args, **kwargs): # noqa: D401
"""Async version not implemented."""
raise NotImplementedError("Use the synchronous version of this tool.")

View File

@ -0,0 +1,8 @@
"""Shell tool wrapper for consistent access."""
from langchain_community.tools import ShellTool
def get_shell_tool() -> ShellTool:
"""Get a configured shell tool instance."""
return ShellTool()

View File

View File

@ -0,0 +1 @@
../loghub

View File

@ -0,0 +1,68 @@
# Multi-agent sysadmin assistant using LangChain + LangGraph Supervisor
# Requires: `pip install langchain-openai langgraph langgraph-supervisor`
from __future__ import annotations
from supervisor import create_sysadmin_supervisor
from utils import print_step_info, explain_supervisor_pattern
if __name__ == "__main__":
# Create the supervisor
supervisor = create_sysadmin_supervisor()
# Example run - demonstrating both invoke and streaming with debug output
query = {
"messages": [
{
"role": "user",
"content": "Nginx returns 502 Bad Gateway on my server. What can I do?",
}
]
}
print("🚀 Starting multi-agent sysadmin analysis...")
print(f"📝 User Query: {query['messages'][0]['content']}")
print("=" * 80)
# Show explanation of the supervisor pattern
explain_supervisor_pattern()
print("\n=== Using invoke() method ===")
result = supervisor.invoke(query)
print("\n📊 FINAL RESULT:")
print("-" * 40)
print(result["messages"][-1].content)
print("-" * 40)
print(f"\n📈 Total messages exchanged: {len(result['messages'])}")
print("\n=== Using stream() method for detailed step-by-step analysis ===")
step_count = 0
max_steps = 20 # Prevent infinite loops
try:
chunks_processed = []
for chunk in supervisor.stream(query):
step_count += 1
chunks_processed.append(chunk)
print_step_info(step_count, chunk)
# Safety check to prevent infinite loops
if step_count >= max_steps:
print(f"\n⚠️ Reached maximum steps ({max_steps}), stopping stream...")
break
print(f"\n✅ Streaming completed successfully with {step_count} steps")
print(f"📊 Total chunks processed: {len(chunks_processed)}")
# Check if the last chunk contains a complete final response
if chunks_processed:
last_chunk = chunks_processed[-1]
print(f"🔍 Last chunk keys: {list(last_chunk.keys()) if isinstance(last_chunk, dict) else type(last_chunk)}")
except Exception as e:
print(f"\n❌ Streaming error after {step_count} steps: {e}")
print("💡 The invoke() method worked fine, so the supervisor itself is functional.")
import traceback
traceback.print_exc()

View File

@ -0,0 +1,37 @@
"""Multi-agent supervisor for sysadmin tasks."""
from langchain_openai import ChatOpenAI
from langgraph_supervisor import create_supervisor
from agents.system_agents import create_system_info_worker, create_service_inventory_worker
from agents.service_agents import create_mariadb_worker, create_nginx_worker, create_phpfpm_worker
from agents.network_agents import create_network_worker, create_cert_worker
from agents.analysis_agents import create_risk_worker, create_remediation_worker, create_harmonizer_worker
from config import get_base_model, SUPERVISOR_PROMPT
def create_sysadmin_supervisor():
"""Create a supervisor that coordinates sysadmin agents."""
# Create all the specialized agents
agents = [
create_system_info_worker(),
create_service_inventory_worker(),
create_mariadb_worker(),
create_nginx_worker(),
create_phpfpm_worker(),
create_network_worker(),
create_cert_worker(),
create_risk_worker(),
create_remediation_worker(),
create_harmonizer_worker(),
]
# Create and return the supervisor
supervisor = create_supervisor(
agents=agents,
model=get_base_model(),
prompt=SUPERVISOR_PROMPT
)
return supervisor.compile()

View File

@ -0,0 +1,142 @@
"""Utility functions for the multi-agent system."""
def explain_supervisor_pattern():
"""Explain how the LangGraph supervisor pattern works."""
print("🏗️ MULTI-AGENT SUPERVISOR PATTERN EXPLANATION:")
print("=" * 60)
print("1. 🎯 SUPERVISOR: Receives user query and decides which agent to delegate to")
print("2. 🔄 TRANSFER: Uses transfer tools (e.g., transfer_to_system_info_worker)")
print("3. 🤖 AGENT: Specialized agent executes its task with its own prompt/tools")
print("4. 🔙 RETURN: Agent uses transfer_back_to_supervisor when done")
print("5. 🧠 DECISION: Supervisor analyzes results and decides next agent or final response")
print()
print("📋 WHAT 'Successfully transferred' MEANS:")
print(" - It's the response from a transfer tool call")
print(" - Indicates control handoff between supervisor and agent")
print(" - Each agent gets the full conversation context")
print(" - Agent's prompt guides how it processes that context")
print()
print("🔍 SUPERVISOR PROMPT (from config.py):")
print(" - Defines available agents and their specialties")
print(" - Guides delegation strategy (start with system_info & service_inventory)")
print(" - Agent prompts are in agents/*.py files")
print("=" * 60)
print()
def print_step_info(step_count: int, chunk):
"""Print formatted step information during streaming."""
print(f"\n🔄 STEP {step_count}:")
print("-" * 30)
try:
# Extract agent information from chunk
if isinstance(chunk, dict):
# Look for agent names in the chunk keys
agent_names = [key for key in chunk.keys() if key in [
'system_info_worker', 'service_inventory_worker', 'mariadb_analyzer',
'nginx_analyzer', 'phpfpm_analyzer', 'network_diag', 'cert_checker',
'risk_scorer', 'remediation_worker', 'harmonizer_worker', 'supervisor'
]]
if agent_names:
current_agent = agent_names[0]
print(f"🤖 ACTIVE AGENT: {current_agent}")
# Show the messages from this agent
agent_data = chunk[current_agent]
if 'messages' in agent_data:
messages = agent_data['messages']
if messages:
last_message = messages[-1]
# Get message type from the class name
message_type = type(last_message).__name__
print(f"💬 MESSAGE TYPE: {message_type}")
# Show content preview if available
if hasattr(last_message, 'content') and last_message.content:
content = last_message.content
content_length = len(content)
print(f"📏 CONTENT LENGTH: {content_length} characters")
# Show full content for final AI responses, abbreviated for others
if message_type == 'AIMessage':
print(f"📄 FULL CONTENT:")
print(content)
print() # Extra line for readability
else:
# Truncate other message types for brevity
preview = content[:200] + "..." if len(content) > 200 else content
print(f"📄 CONTENT PREVIEW:")
print(preview)
print() # Extra line for readability
# Show tool calls if any
if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
tool_calls = last_message.tool_calls
print(f"🔧 TOOL CALLS: {len(tool_calls)} tool(s)")
for i, tool_call in enumerate(tool_calls):
tool_name = getattr(tool_call, 'name', 'unknown')
print(f" {i+1}. {tool_name}")
# Show transfer details for supervisor delegation
if tool_name.startswith('transfer_to_'):
target_agent = tool_name.replace('transfer_to_', '')
print(f" 🎯 DELEGATING to: {target_agent}")
# Show the arguments/context being passed
if hasattr(tool_call, 'args') and tool_call.args:
print(f" 📋 Context/Args: {tool_call.args}")
# Show additional info for ToolMessage
if message_type == 'ToolMessage':
if hasattr(last_message, 'name'):
tool_name = last_message.name
print(f"🔧 TOOL NAME: {tool_name}")
# Explain what "Successfully transferred" means
if "transfer" in tool_name and "Successfully transferred" in content:
if tool_name.startswith('transfer_to_'):
target_agent = tool_name.replace('transfer_to_', '')
print(f" EXPLANATION: Supervisor delegated control to {target_agent}")
print(f" The {target_agent} will now execute its specialized tasks")
elif tool_name == 'transfer_back_to_supervisor':
print(f" EXPLANATION: Agent completed its task and returned control to supervisor")
print(f" Supervisor will decide the next step based on results")
if hasattr(last_message, 'tool_call_id'):
print(f"🔧 TOOL CALL ID: {last_message.tool_call_id}")
# Show conversation context for better understanding
agent_data = chunk[current_agent]
if 'messages' in agent_data and len(agent_data['messages']) > 1:
print(f"\n📚 CONVERSATION CONTEXT ({len(agent_data['messages'])} messages):")
for i, msg in enumerate(agent_data['messages'][-3:], start=max(0, len(agent_data['messages'])-3)):
msg_type = type(msg).__name__
if hasattr(msg, 'content') and msg.content:
preview = msg.content[:100].replace('\n', ' ')
if len(msg.content) > 100:
preview += "..."
print(f" {i+1}. {msg_type}: {preview}")
elif hasattr(msg, 'tool_calls') and msg.tool_calls:
tool_names = [getattr(tc, 'name', 'unknown') for tc in msg.tool_calls]
print(f" {i+1}. {msg_type}: Tool calls: {tool_names}")
else:
print(f" {i+1}. {msg_type}: (no content)")
print() # Extra spacing for readability
else:
print("📋 CHUNK DATA:")
# Show first few keys for debugging
chunk_keys = list(chunk.keys())[:3]
print(f" Keys: {chunk_keys}")
else:
print(f"📦 CHUNK TYPE: {type(chunk)}")
print(f"📄 CONTENT: {str(chunk)[:100]}...")
except Exception as e:
print(f"❌ Error processing chunk: {e}")
print(f"📦 CHUNK TYPE: {type(chunk)}")
if hasattr(chunk, '__dict__'):
print(f"📄 CHUNK ATTRIBUTES: {list(chunk.__dict__.keys())}")
print("-" * 30)

View File

@ -8,6 +8,7 @@ dependencies = [
"langchain>=0.3.26",
"langchain-openai>=0.3.25",
"langgraph>=0.4.9",
"langgraph-supervisor",
"langsmith>=0.4.2",
"langchain-community>=0.3.0",
"langchain-experimental>=0.3.0",

View File

@ -1,299 +0,0 @@
# ReAct Agent vs Custom StateGraph: Architectural Decision Guide
This document explores the two main approaches for building LangGraph agents: using the prebuilt `create_react_agent` vs implementing a custom `StateGraph`.
## TL;DR Recommendation
**Use `create_react_agent` for most use cases**. Only migrate to custom `StateGraph` when you hit specific limitations of the ReAct pattern.
## Option 1: `create_react_agent` (Current Implementation)
### What it is
```python
# Simple 5-line agent creation
llm = init_chat_model("openai:gpt-4o-mini")
tools = [shell_tool, analyze_log_file]
agent = create_react_agent(llm, tools, prompt=system_prompt)
```
### Under the Hood
`create_react_agent` uses a predefined `StateGraph` with this structure:
```
START → agent → tools → agent → END
↑________________↓
```
- **`agent` node**: LLM reasoning (decides what to do)
- **`tools` node**: Tool execution (acting)
- **Conditional loop**: Continues until final response
### Advantages ✅
**Simplicity & Speed**
- Minimal code to get started
- Battle-tested ReAct pattern
- Automatic reasoning/acting cycles
**Maintenance**
- Automatic updates with LangGraph improvements
- Less code to debug and maintain
- Well-documented pattern
**Perfect for Standard Use Cases**
- Tool-based interactions
- Conversational interfaces
- Analysis workflows
- System administration tasks
### Limitations ⚠️
- Fixed ReAct pattern only
- Limited state management
- No custom routing logic
- No parallel tool execution
- No complex workflow orchestration
## Option 2: Custom StateGraph Implementation
### What it looks like
```python
from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage
class AgentState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
current_task: str # "log_analysis", "shell_command", "general"
log_context: dict # Remember previous analyses
safety_mode: bool # Control dangerous commands
def classify_request(state: AgentState) -> AgentState:
"""Classify user request type"""
last_message = state["messages"][-1].content.lower()
if any(word in last_message for word in ["log", "analyze", "error", "pattern"]):
state["current_task"] = "log_analysis"
elif any(word in last_message for word in ["command", "shell", "run", "execute"]):
state["current_task"] = "shell_command"
else:
state["current_task"] = "general"
return state
def route_request(state: AgentState) -> Literal["log_analyzer", "shell_executor", "general_chat"]:
"""Route to appropriate node based on request type"""
return {
"log_analysis": "log_analyzer",
"shell_command": "shell_executor",
"general": "general_chat"
}[state["current_task"]]
def analyze_logs_node(state: AgentState) -> AgentState:
"""Specialized node for log analysis"""
llm = init_chat_model("openai:gpt-4o-mini")
# Custom logic for log analysis
# - Parallel file processing
# - Context from previous analyses
# - Specialized prompting
prompt = f"""You are a log analysis expert.
Previous context: {state.get("log_context", {})}
Use analyze_log_file tool for the requested analysis.
"""
response = llm.invoke([HumanMessage(content=prompt)] + state["messages"][-3:])
state["messages"].append(response)
# Update context for future analyses
state["log_context"]["last_analysis"] = "completed"
return state
def execute_shell_node(state: AgentState) -> AgentState:
"""Specialized node for shell commands with safety checks"""
llm = init_chat_model("openai:gpt-4o-mini")
# Safety validation before execution
dangerous_commands = ["rm -rf", "sudo rm", "format", "dd if="]
last_message = state["messages"][-1].content.lower()
if any(cmd in last_message for cmd in dangerous_commands):
state["messages"].append(
AIMessage(content="⚠️ Potentially dangerous command detected. Please confirm.")
)
state["safety_mode"] = True
return state
# Normal execution with ShellTool
# Custom logic for command validation and execution
return state
def general_chat_node(state: AgentState) -> AgentState:
"""Handle general conversation"""
llm = init_chat_model("openai:gpt-4o-mini")
prompt = """You are a helpful system administration assistant.
Provide guidance and suggestions for system debugging tasks.
"""
response = llm.invoke([HumanMessage(content=prompt)] + state["messages"][-5:])
state["messages"].append(response)
return state
def create_advanced_agent():
"""Create custom agent with StateGraph"""
# Define workflow
workflow = StateGraph(AgentState)
# Add nodes
workflow.add_node("classifier", classify_request)
workflow.add_node("log_analyzer", analyze_logs_node)
workflow.add_node("shell_executor", execute_shell_node)
workflow.add_node("general_chat", general_chat_node)
# Define edges
workflow.add_edge(START, "classifier")
workflow.add_conditional_edges(
"classifier",
route_request,
{
"log_analyzer": "log_analyzer",
"shell_executor": "shell_executor",
"general_chat": "general_chat"
}
)
# All terminal nodes lead to END
workflow.add_edge("log_analyzer", END)
workflow.add_edge("shell_executor", END)
workflow.add_edge("general_chat", END)
return workflow.compile()
```
### Advantages ✅
**Complete Control**
- Custom business logic
- Complex state management
- Advanced routing and validation
- Parallel processing capabilities
**Specialized Workflows**
- Different handling per task type
- Memory between interactions
- Safety checks and validation
- Custom error handling
**Performance Optimization**
- Optimized tool selection
- Reduced unnecessary LLM calls
- Parallel execution where possible
### Disadvantages ❌
**Complexity**
- 50+ lines vs 5 lines
- More potential bugs
- Custom maintenance required
**Development Time**
- Slower initial development
- More testing needed
- Complex debugging
## Comparison Matrix
| Aspect | `create_react_agent` | Custom `StateGraph` |
|--------|---------------------|-------------------|
| **Lines of Code** | ~5 | ~50+ |
| **Development Time** | Minutes | Hours/Days |
| **Flexibility** | ReAct pattern only | Complete freedom |
| **Maintenance** | Automatic | Manual |
| **Performance** | Good, optimized | Depends on implementation |
| **Debugging** | Limited visibility | Full control |
| **State Management** | Basic messages | Rich custom state |
| **Routing Logic** | Tool-based only | Custom conditional |
| **Parallel Execution** | No | Yes |
| **Safety Checks** | Tool-level only | Custom validation |
| **Use Cases Coverage** | 80% | 100% |
## When to Use Each Approach
### Stick with `create_react_agent` when:
**Tool-based interactions** (your current use case)
✅ **Standard conversational AI**
✅ **Rapid prototyping**
✅ **Simple reasoning/acting cycles**
✅ **Maintenance is a priority**
✅ **Team has limited LangGraph experience**
### Migrate to Custom `StateGraph` when:
🔄 **Complex business logic** required
🔄 **Multi-step workflows** with different paths
🔄 **Advanced state management** needed
🔄 **Parallel processing** requirements
🔄 **Custom validation/safety** logic
🔄 **Performance optimization** critical
🔄 **Specialized routing** based on context
## Migration Strategy
If you decide to eventually migrate to custom StateGraph:
### Phase 1: Enhance Current Implementation
```python
# Add more sophisticated tools to your current setup
def create_enhanced_react_agent():
tools = [
shell_tool,
analyze_log_file,
safety_validator_tool, # New: safety checks
parallel_log_analyzer, # New: batch processing
context_manager_tool # New: conversation context
]
return create_react_agent(llm, tools, enhanced_prompt)
```
### Phase 2: Hybrid Approach
```python
# Use create_react_agent for some tasks, custom StateGraph for others
def create_hybrid_agent():
# Route complex workflows to custom graph
# Keep simple interactions with ReAct agent
pass
```
### Phase 3: Full Custom Implementation
- Implement complete StateGraph when requirements demand it
## Recommendation for Your Project
**Keep `create_react_agent` for now** because:
1. ✅ Your use case (log analysis + shell commands) fits perfectly
2. ✅ Current implementation is clean and working
3. ✅ Maintenance overhead is minimal
4. ✅ Team can focus on improving tools rather than framework
**Consider custom StateGraph later** if you need:
- Advanced workflow orchestration
- Complex state management between analyses
- Parallel processing of multiple log files
- Sophisticated safety validation
- Performance optimization for large-scale deployments
## Conclusion
Your current `create_react_agent` implementation is excellent for an MVP and likely covers 80% of system administration use cases. The ReAct pattern provides a solid foundation for tool-based AI interactions.
Only migrate to custom StateGraph when you have specific requirements that the ReAct pattern cannot handle efficiently. Focus on enhancing your tools (`log_analyzer.py`, additional custom tools) rather than changing the underlying agent framework.
**The best architecture is the one that solves your current problems without overengineering for hypothetical future needs.**

16
uv.lock generated
View File

@ -489,6 +489,7 @@ dependencies = [
{ name = "langchain-experimental" },
{ name = "langchain-openai" },
{ name = "langgraph" },
{ name = "langgraph-supervisor" },
{ name = "langsmith" },
]
@ -499,6 +500,7 @@ requires-dist = [
{ name = "langchain-experimental", specifier = ">=0.3.0" },
{ name = "langchain-openai", specifier = ">=0.3.25" },
{ name = "langgraph", specifier = ">=0.4.9" },
{ name = "langgraph-supervisor" },
{ name = "langsmith", specifier = ">=0.4.2" },
]
@ -528,6 +530,20 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/8c/77/b0930ca5d54ef91e2bdb37e0f7dbeda1923e1e0b5b71ab3af35c103c2e39/langgraph_sdk-0.1.70-py3-none-any.whl", hash = "sha256:47f2b04a964f40a610c1636b387ea52f961ce7a233afc21d3103e5faac8ca1e5", size = 49986, upload_time = "2025-05-21T22:23:21.377Z" },
]
[[package]]
name = "langgraph-supervisor"
version = "0.0.27"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "langchain-core" },
{ name = "langgraph" },
{ name = "langgraph-prebuilt" },
]
sdist = { url = "https://files.pythonhosted.org/packages/c4/96/46a6bfa2df4a9f120438e1e6dc343f3804485e188f26e4428185c864699a/langgraph_supervisor-0.0.27.tar.gz", hash = "sha256:1d07b722f54ab446e4ce8ad45f26cde7a593a77b1d1641684d91cb8fe6ac725a", size = 20769, upload_time = "2025-05-29T14:45:46.155Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/74/0e/48d0d29739e969450cd4aa5d83b68cb9cd3d1ba663cb3e02f43c445cbaf5/langgraph_supervisor-0.0.27-py3-none-any.whl", hash = "sha256:f3b200acf04fd7a0476b4688136fee49b0ed1505e6cec7058367e62fec2e8121", size = 15760, upload_time = "2025-05-29T14:45:44.76Z" },
]
[[package]]
name = "langsmith"
version = "0.4.2"