import os from langchain.chat_models import init_chat_model from langchain_community.tools.shell.tool import ShellTool from langgraph.prebuilt import create_react_agent from langchain_core.messages import HumanMessage from log_analyzer import analyze_log_file def create_agent(): """Create and return a ReAct agent with shell and log analysis capabilities.""" # Initialize the chat model (using OpenAI GPT-4) # Make sure you have set your OPENAI_API_KEY environment variable llm = init_chat_model("openai:gpt-4o-mini") # Define the tools available to the agent shell_tool = ShellTool() tools = [shell_tool, analyze_log_file] # Create a ReAct agent with system prompt system_prompt = """You are a helpful assistant with access to shell commands and log analysis capabilities. You can: 1. Execute shell commands using the shell tool to interact with the system 2. Analyze log files using the analyze_log_file tool to help with debugging and system administration tasks The log analyzer can process files in the loghub directory with different analysis types: - "error_patterns": Find and categorize error messages - "frequency": Analyze frequency of different log patterns - "timeline": Show chronological patterns of events - "summary": Provide an overall summary of the log file When helping users: - Be thorough in your analysis - Explain what you're doing and why - Use appropriate tools based on the user's request - If analyzing logs, suggest which analysis type might be most helpful - Always be cautious with shell commands and explain what they do Available log files are in the loghub directory with subdirectories for different systems like: Android, Apache, BGL, Hadoop, HDFS, HealthApp, HPC, Linux, Mac, OpenSSH, OpenStack, Proxifier, Spark, Thunderbird, Windows, Zookeeper """ # Create the ReAct agent agent = create_react_agent( llm, tools, prompt=system_prompt ) return agent def stream_agent_updates(agent, user_input: str, conversation_history: list): """Stream agent updates for a user input with conversation history.""" # Create a human message message = HumanMessage(content=user_input) # Add the new message to conversation history conversation_history.append(message) print("\nAgent: ", end="", flush=True) # Use the agent's stream method to get real-time updates with full conversation final_response = "" tool_calls_made = False for event in agent.stream({"messages": conversation_history}, stream_mode="updates"): for node_name, node_output in event.items(): if node_name == "agent" and "messages" in node_output: last_message = node_output["messages"][-1] # Check if this is a tool call if hasattr(last_message, 'tool_calls') and last_message.tool_calls: tool_calls_made = True for tool_call in last_message.tool_calls: print(f"\nšŸ”§ Using tool: {tool_call['name']}") if tool_call.get('args'): print(f" Args: {tool_call['args']}") # Check if this is the final response (no tool calls) elif hasattr(last_message, 'content') and last_message.content and not getattr(last_message, 'tool_calls', None): final_response = last_message.content elif node_name == "tools" and "messages" in node_output: # Show tool results for msg in node_output["messages"]: if hasattr(msg, 'content'): print(f"\nšŸ“‹ Tool result: {msg.content[:200]}{'...' if len(msg.content) > 200 else ''}") # Print the final response if final_response: if tool_calls_made: print(f"\n\n{final_response}") else: print(final_response) # Add the agent's response to conversation history from langchain_core.messages import AIMessage conversation_history.append(AIMessage(content=final_response)) else: print("No response generated.") print() # Add newline def visualize_agent(agent): """Display the agent's graph structure.""" try: print("\nšŸ“Š Agent Graph Structure:") print("=" * 40) # Get the graph and display its structure graph = agent.get_graph() # Print nodes print("Nodes:") for node_id in graph.nodes: print(f" - {node_id}") # Print edges print("\nEdges:") for edge in graph.edges: print(f" - {edge}") print("=" * 40) print("This agent follows the ReAct (Reasoning and Acting) pattern:") print("1. Receives user input") print("2. Reasons about what tools to use") print("3. Executes tools when needed") print("4. Provides final response") print("=" * 40) except Exception as e: print(f"Could not visualize agent: {e}") def main(): # Check if required API keys are set if not os.getenv("OPENAI_API_KEY"): print("Please set your OPENAI_API_KEY environment variable.") print("You can set it by running: export OPENAI_API_KEY='your-api-key-here'") return print("šŸ¤– LangGraph Log Analysis Agent") print("Type 'quit', 'exit', or 'q' to exit the chat.") print("Type 'help' or 'h' for help and examples.") print("Type 'graph' to see the agent structure.") print("Type 'clear' or 'reset' to clear conversation history.") print("āš ļø WARNING: This agent has shell access - use with caution!") print("šŸ“Š Available log analysis capabilities:") print(" - Analyze log files in the loghub directory") print(" - Execute shell commands for system administration") print(" - Help with debugging and troubleshooting") print("-" * 60) # Create the agent try: agent = create_agent() print("āœ… Log Analysis Agent initialized successfully!") print("šŸ’” Try asking: 'Analyze the Apache logs for error patterns'") print("šŸ’” Or: 'List the available log files in the loghub directory'") # Show agent structure visualize_agent(agent) except Exception as e: print(f"āŒ Error initializing agent: {e}") return # Start the chat loop conversation_history = [] # Initialize conversation history while True: try: user_input = input("\nUser: ") if user_input.lower() in ["quit", "exit", "q"]: print("šŸ‘‹ Goodbye!") break elif user_input.lower() in ["help", "h"]: print("\nšŸ†˜ Help:") print("Commands:") print(" - quit/exit/q: Exit the agent") print(" - help/h: Show this help") print(" - graph: Show agent structure") print("\nExample queries:") print(" - 'Analyze the Apache logs for error patterns'") print(" - 'Show me a summary of the HDFS logs'") print(" - 'List all available log files'") print(" - 'Find error patterns in Linux logs'") print(" - 'Check disk usage on the system'") print(" - 'clear': Clear conversation history") continue elif user_input.lower() in ["graph", "structure"]: visualize_agent(agent) continue elif user_input.lower() in ["clear", "reset"]: conversation_history = [] print("šŸ—‘ļø Conversation history cleared!") continue if user_input.strip(): stream_agent_updates(agent, user_input, conversation_history) else: print("Please enter a message.") except KeyboardInterrupt: print("\nšŸ‘‹ Goodbye!") break except Exception as e: print(f"āŒ Error: {e}") if __name__ == "__main__": main()