"""Utility functions for the multi-agent system.""" def explain_supervisor_pattern(): """Explain how the LangGraph supervisor pattern works.""" print("🏗️ MULTI-AGENT SUPERVISOR PATTERN EXPLANATION:") print("=" * 60) print("1. 🎯 SUPERVISOR: Receives user query and decides which agent to delegate to") print("2. 🔄 TRANSFER: Uses transfer tools (e.g., transfer_to_system_info_worker)") print("3. 🤖 AGENT: Specialized agent executes its task with its own prompt/tools") print("4. 🔙 RETURN: Agent uses transfer_back_to_supervisor when done") print("5. 🧠 DECISION: Supervisor analyzes results and decides next agent or final response") print() print("📋 WHAT 'Successfully transferred' MEANS:") print(" - It's the response from a transfer tool call") print(" - Indicates control handoff between supervisor and agent") print(" - Each agent gets the full conversation context") print(" - Agent's prompt guides how it processes that context") print() print("🔍 SUPERVISOR PROMPT (from config.py):") print(" - Defines available agents and their specialties") print(" - Guides delegation strategy (start with system_info & service_inventory)") print(" - Agent prompts are in agents/*.py files") print("=" * 60) print() def print_step_info(step_count: int, chunk): """Print formatted step information during streaming with clear agent actions.""" print(f"\n{'='*60}") print(f"STEP {step_count}") print(f"{'='*60}") try: if isinstance(chunk, dict): # Look for agent names in the chunk keys agent_names = [key for key in chunk.keys() if key in [ 'system_info_worker', 'service_inventory_worker', 'mariadb_analyzer', 'nginx_analyzer', 'phpfpm_analyzer', 'network_diag', 'cert_checker', 'risk_scorer', 'remediation_worker', 'harmonizer_worker', 'supervisor' ]] if agent_names: current_agent = agent_names[0].upper() agent_data = chunk[agent_names[0]] if 'messages' in agent_data and agent_data['messages']: last_message = agent_data['messages'][-1] message_type = type(last_message).__name__ # Handle different message types with clear formatting if message_type == 'HumanMessage': # This is typically the user query or supervisor instruction content = getattr(last_message, 'content', '') if current_agent == 'SUPERVISOR': print(f"[ SUPERVISOR ] received user query: {content[:100]}{'...' if len(content) > 100 else ''}") else: print(f"[ {current_agent} ] received prompt from supervisor: {content[:100]}{'...' if len(content) > 100 else ''}") elif message_type == 'ToolMessage': # Result of tool execution tool_name = getattr(last_message, 'name', 'unknown') content = getattr(last_message, 'content', '') if "Successfully transferred" in content: if tool_name.startswith('transfer_to_'): target_agent = tool_name.replace('transfer_to_', '').upper() print(f"[ SUPERVISOR ] successfully transferred control to {target_agent}") print(f"[ SUPERVISOR ] {target_agent} will now analyze the situation and execute necessary commands") print(f"[ SUPERVISOR ] (Any shell command output below belongs to {target_agent})") elif tool_name == 'transfer_back_to_supervisor': print(f"[ {current_agent} ] completed analysis and transferred control back to supervisor") print(f"[ {current_agent} ] (Any shell command output above was from {current_agent})") # Show the result being sent back to supervisor # Look for the last AIMessage before this transfer to get the result if 'messages' in agent_data and len(agent_data['messages']) > 1: print(f"[ DEBUG ] {current_agent} has {len(agent_data['messages'])} messages") # Look for the most recent AIMessage with content found_result = False for i, msg in enumerate(reversed(agent_data['messages'][:-1])): # Exclude current ToolMessage msg_type = type(msg).__name__ print(f"[ DEBUG ] Message {i}: {msg_type}, has_content: {hasattr(msg, 'content')}") if msg_type == 'AIMessage' and hasattr(msg, 'content') and msg.content: result_content = msg.content.strip() if result_content and not result_content.startswith("I'll") and "transfer" not in result_content.lower(): found_result = True if len(result_content) > 300: preview = result_content[:300] + "..." print(f"[ {current_agent} ] 📊 ANALYSIS SUMMARY (preview): {preview}") print(f"[ {current_agent} ] (full result length: {len(result_content)} characters)") else: print(f"[ {current_agent} ] 📊 ANALYSIS SUMMARY: {result_content}") break else: print(f"[ DEBUG ] Skipping AIMessage: '{result_content[:100]}...'") if not found_result: print(f"[ WARNING ] {current_agent} transferred back without providing analysis summary!") print(f"[ WARNING ] This agent may need prompt improvements") else: print(f"[ WARNING ] {current_agent} has no message history to analyze") else: # Other tool execution result if len(content) > 200: preview = content[:200] + "..." print(f"[ {current_agent} ] tool result preview: {preview}") print(f"[ {current_agent} ] (full result length: {len(content)} characters)") else: print(f"[ {current_agent} ] tool result: {content}") elif message_type == 'AIMessage': # Agent is responding or making tool calls content = getattr(last_message, 'content', '') tool_calls = getattr(last_message, 'tool_calls', []) if tool_calls: for tool_call in tool_calls: tool_name = getattr(tool_call, 'name', 'unknown') if tool_name.startswith('transfer_to_'): target_agent = tool_name.replace('transfer_to_', '').upper() args = getattr(tool_call, 'args', {}) context = str(args)[:150] + "..." if len(str(args)) > 150 else str(args) print(f"[ SUPERVISOR ] calling {target_agent} with context: {context}") elif tool_name == 'transfer_back_to_supervisor': print(f"[ {current_agent} ] completed task, transferring back to supervisor") else: print(f"[ {current_agent} ] using tool: {tool_name}") args = getattr(tool_call, 'args', {}) if args: args_preview = str(args)[:100] + "..." if len(str(args)) > 100 else str(args) print(f"[ {current_agent} ] tool arguments: {args_preview}") elif content: # Final response from agent if len(content) > 200: preview = content[:200] + "..." print(f"[ {current_agent} ] response preview: {preview}") print(f"[ {current_agent} ] (full response length: {len(content)} characters)") else: print(f"[ {current_agent} ] response: {content}") else: print(f"[ {current_agent} ] {message_type}: {getattr(last_message, 'content', 'No content')[:100]}") else: print(f"[ {current_agent} ] no message data available") else: print("[ SYSTEM ] processing chunk with keys:", list(chunk.keys())[:3]) else: print(f"[ SYSTEM ] received {type(chunk).__name__}: {str(chunk)[:100]}{'...' if len(str(chunk)) > 100 else ''}") except Exception as e: print(f"[ ERROR ] processing step {step_count}: {e}") print(f"[ DEBUG ] chunk type: {type(chunk)}") if hasattr(chunk, '__dict__'): print(f"[ DEBUG ] chunk attributes: {list(chunk.__dict__.keys())}") print(f"{'='*60}") print(f"NOTE: Shell command output may appear below before the next step")