Gaetan Hurel d33cddef1e
wip
2025-06-26 18:02:43 +02:00

166 lines
10 KiB
Python

"""Utility functions for the multi-agent system."""
def explain_supervisor_pattern():
"""Explain how the LangGraph supervisor pattern works."""
print("🏗️ MULTI-AGENT SUPERVISOR PATTERN EXPLANATION:")
print("=" * 60)
print("1. 🎯 SUPERVISOR: Receives user query and decides which agent to delegate to")
print("2. 🔄 TRANSFER: Uses transfer tools (e.g., transfer_to_system_info_worker)")
print("3. 🤖 AGENT: Specialized agent executes its task with its own prompt/tools")
print("4. 🔙 RETURN: Agent uses transfer_back_to_supervisor when done")
print("5. 🧠 DECISION: Supervisor analyzes results and decides next agent or final response")
print()
print("📋 WHAT 'Successfully transferred' MEANS:")
print(" - It's the response from a transfer tool call")
print(" - Indicates control handoff between supervisor and agent")
print(" - Each agent gets the full conversation context")
print(" - Agent's prompt guides how it processes that context")
print()
print("🔍 SUPERVISOR PROMPT (from config.py):")
print(" - Defines available agents and their specialties")
print(" - Guides delegation strategy (start with system_info & service_inventory)")
print(" - Agent prompts are in agents/*.py files")
print("=" * 60)
print()
def print_step_info(step_count: int, chunk):
"""Print formatted step information during streaming with clear agent actions."""
print(f"\n{'='*60}")
print(f"STEP {step_count}")
print(f"{'='*60}")
try:
if isinstance(chunk, dict):
# Look for agent names in the chunk keys
agent_names = [key for key in chunk.keys() if key in [
'system_info_worker', 'service_inventory_worker', 'mariadb_analyzer',
'nginx_analyzer', 'phpfpm_analyzer', 'network_diag', 'cert_checker',
'risk_scorer', 'remediation_worker', 'harmonizer_worker', 'supervisor'
]]
if agent_names:
current_agent = agent_names[0].upper()
agent_data = chunk[agent_names[0]]
if 'messages' in agent_data and agent_data['messages']:
last_message = agent_data['messages'][-1]
message_type = type(last_message).__name__
# Handle different message types with clear formatting
if message_type == 'HumanMessage':
# This is typically the user query or supervisor instruction
content = getattr(last_message, 'content', '')
if current_agent == 'SUPERVISOR':
print(f"[ SUPERVISOR ] received user query: {content[:100]}{'...' if len(content) > 100 else ''}")
else:
print(f"[ {current_agent} ] received prompt from supervisor: {content[:100]}{'...' if len(content) > 100 else ''}")
elif message_type == 'ToolMessage':
# Result of tool execution
tool_name = getattr(last_message, 'name', 'unknown')
content = getattr(last_message, 'content', '')
if "Successfully transferred" in content:
if tool_name.startswith('transfer_to_'):
target_agent = tool_name.replace('transfer_to_', '').upper()
print(f"[ SUPERVISOR ] successfully transferred control to {target_agent}")
print(f"[ SUPERVISOR ] {target_agent} will now analyze the situation and execute necessary commands")
print(f"[ SUPERVISOR ] (Any shell command output below belongs to {target_agent})")
elif tool_name == 'transfer_back_to_supervisor':
print(f"[ {current_agent} ] completed analysis and transferred control back to supervisor")
print(f"[ {current_agent} ] (Any shell command output above was from {current_agent})")
# Show the result being sent back to supervisor
# Look for the last AIMessage before this transfer to get the result
if 'messages' in agent_data and len(agent_data['messages']) > 1:
print(f"[ DEBUG ] {current_agent} has {len(agent_data['messages'])} messages")
# Look for the most recent AIMessage with content
found_result = False
for i, msg in enumerate(reversed(agent_data['messages'][:-1])): # Exclude current ToolMessage
msg_type = type(msg).__name__
print(f"[ DEBUG ] Message {i}: {msg_type}, has_content: {hasattr(msg, 'content')}")
if msg_type == 'AIMessage' and hasattr(msg, 'content') and msg.content:
result_content = msg.content.strip()
if result_content and not result_content.startswith("I'll") and "transfer" not in result_content.lower():
found_result = True
if len(result_content) > 300:
preview = result_content[:300] + "..."
print(f"[ {current_agent} ] 📊 ANALYSIS SUMMARY (preview): {preview}")
print(f"[ {current_agent} ] (full result length: {len(result_content)} characters)")
else:
print(f"[ {current_agent} ] 📊 ANALYSIS SUMMARY: {result_content}")
break
else:
print(f"[ DEBUG ] Skipping AIMessage: '{result_content[:100]}...'")
if not found_result:
print(f"[ WARNING ] {current_agent} transferred back without providing analysis summary!")
print(f"[ WARNING ] This agent may need prompt improvements")
else:
print(f"[ WARNING ] {current_agent} has no message history to analyze")
else:
# Other tool execution result
if len(content) > 200:
preview = content[:200] + "..."
print(f"[ {current_agent} ] tool result preview: {preview}")
print(f"[ {current_agent} ] (full result length: {len(content)} characters)")
else:
print(f"[ {current_agent} ] tool result: {content}")
elif message_type == 'AIMessage':
# Agent is responding or making tool calls
content = getattr(last_message, 'content', '')
tool_calls = getattr(last_message, 'tool_calls', [])
if tool_calls:
for tool_call in tool_calls:
tool_name = getattr(tool_call, 'name', 'unknown')
if tool_name.startswith('transfer_to_'):
target_agent = tool_name.replace('transfer_to_', '').upper()
args = getattr(tool_call, 'args', {})
context = str(args)[:150] + "..." if len(str(args)) > 150 else str(args)
print(f"[ SUPERVISOR ] calling {target_agent} with context: {context}")
elif tool_name == 'transfer_back_to_supervisor':
print(f"[ {current_agent} ] completed task, transferring back to supervisor")
else:
print(f"[ {current_agent} ] using tool: {tool_name}")
args = getattr(tool_call, 'args', {})
if args:
args_preview = str(args)[:100] + "..." if len(str(args)) > 100 else str(args)
print(f"[ {current_agent} ] tool arguments: {args_preview}")
elif content:
# Final response from agent
if len(content) > 200:
preview = content[:200] + "..."
print(f"[ {current_agent} ] response preview: {preview}")
print(f"[ {current_agent} ] (full response length: {len(content)} characters)")
else:
print(f"[ {current_agent} ] response: {content}")
else:
print(f"[ {current_agent} ] {message_type}: {getattr(last_message, 'content', 'No content')[:100]}")
else:
print(f"[ {current_agent} ] no message data available")
else:
print("[ SYSTEM ] processing chunk with keys:", list(chunk.keys())[:3])
else:
print(f"[ SYSTEM ] received {type(chunk).__name__}: {str(chunk)[:100]}{'...' if len(str(chunk)) > 100 else ''}")
except Exception as e:
print(f"[ ERROR ] processing step {step_count}: {e}")
print(f"[ DEBUG ] chunk type: {type(chunk)}")
if hasattr(chunk, '__dict__'):
print(f"[ DEBUG ] chunk attributes: {list(chunk.__dict__.keys())}")
print(f"{'='*60}")
print(f"NOTE: Shell command output may appear below before the next step")