From 2f9beb96cb6fac28fb27802e8992a2bb176eb8f9 Mon Sep 17 00:00:00 2001 From: Gaetan Hurel Date: Wed, 25 Jun 2025 15:27:47 +0200 Subject: [PATCH] add custom log analysis tools --- log_analyzer.py | 142 ++++++++++++++++++++++++++++++++++++++++++++++++ main.py | 3 +- 2 files changed, 144 insertions(+), 1 deletion(-) create mode 100644 log_analyzer.py diff --git a/log_analyzer.py b/log_analyzer.py new file mode 100644 index 0000000..ad7149d --- /dev/null +++ b/log_analyzer.py @@ -0,0 +1,142 @@ +import os +import re +from collections import Counter +from typing import List, Dict, Any +from langchain_core.tools import tool + + +@tool +def analyze_log_file(file_path: str, analysis_type: str = "error_patterns") -> Dict[str, Any]: + """ + Analyze log files for common sysadmin debugging patterns. + + Args: + file_path: Path to the log file (relative to loghub directory) + analysis_type: Type of analysis - "error_patterns", "frequency", "timeline", or "summary" + + Returns: + Dictionary with analysis results + """ + try: + # Construct full path + if not file_path.startswith('/'): + full_path = f"loghub/{file_path}" + else: + full_path = file_path + + if not os.path.exists(full_path): + return {"error": f"File not found: {full_path}"} + + with open(full_path, 'r', encoding='utf-8', errors='ignore') as f: + lines = f.readlines() + + if analysis_type == "error_patterns": + return _analyze_error_patterns(lines, file_path) + elif analysis_type == "frequency": + return _analyze_frequency(lines, file_path) + elif analysis_type == "timeline": + return _analyze_timeline(lines, file_path) + elif analysis_type == "summary": + return _analyze_summary(lines, file_path) + else: + return {"error": f"Unknown analysis type: {analysis_type}"} + + except Exception as e: + return {"error": f"Error analyzing file: {str(e)}"} + + +def _analyze_error_patterns(lines: List[str], file_path: str) -> Dict[str, Any]: + """Analyze error patterns in log lines.""" + error_keywords = ['error', 'fail', 'exception', 'critical', 'fatal', 'denied', 'refused', 'timeout'] + + error_lines = [] + error_counts = Counter() + + for i, line in enumerate(lines, 1): + line_lower = line.lower() + for keyword in error_keywords: + if keyword in line_lower: + error_lines.append(f"Line {i}: {line.strip()}") + error_counts[keyword] += 1 + break + + return { + "file": file_path, + "analysis_type": "error_patterns", + "total_lines": len(lines), + "error_lines_count": len(error_lines), + "error_keywords_frequency": dict(error_counts.most_common()), + "sample_errors": error_lines[:10], # First 10 error lines + "summary": f"Found {len(error_lines)} error-related lines out of {len(lines)} total lines" + } + + +def _analyze_frequency(lines: List[str], file_path: str) -> Dict[str, Any]: + """Analyze frequency patterns in logs.""" + # Extract common patterns (simplified) + patterns = Counter() + + for line in lines: + # Remove timestamps and specific values for pattern matching + cleaned = re.sub(r'\d+', 'NUM', line) + cleaned = re.sub(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', 'IP', cleaned) + cleaned = re.sub(r'[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}', 'UUID', cleaned) + patterns[cleaned.strip()] += 1 + + return { + "file": file_path, + "analysis_type": "frequency", + "total_lines": len(lines), + "unique_patterns": len(patterns), + "most_common_patterns": [{"pattern": p, "count": c} for p, c in patterns.most_common(10)], + "summary": f"Found {len(patterns)} unique patterns in {len(lines)} lines" + } + + +def _analyze_timeline(lines: List[str], file_path: str) -> Dict[str, Any]: + """Analyze timeline patterns in logs.""" + timestamps = [] + + # Try to extract timestamps (simplified for demo) + timestamp_patterns = [ + r'(\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})', # Jun 14 15:16:01 + r'(\[\w{3}\s+\w{3}\s+\d{2}\s+\d{2}:\d{2}:\d{2}\s+\d{4}\])', # [Sun Dec 04 04:47:44 2005] + ] + + for line in lines[:100]: # Sample first 100 lines for demo + for pattern in timestamp_patterns: + match = re.search(pattern, line) + if match: + timestamps.append(match.group(1)) + break + + return { + "file": file_path, + "analysis_type": "timeline", + "total_lines": len(lines), + "timestamps_found": len(timestamps), + "sample_timestamps": timestamps[:10], + "summary": f"Extracted {len(timestamps)} timestamps from first 100 lines" + } + + +def _analyze_summary(lines: List[str], file_path: str) -> Dict[str, Any]: + """Provide a general summary of the log file.""" + total_lines = len(lines) + + # Basic statistics + avg_line_length = sum(len(line) for line in lines) / total_lines if total_lines > 0 else 0 + empty_lines = sum(1 for line in lines if not line.strip()) + + # Sample content + sample_lines = [line.strip() for line in lines[:5] if line.strip()] + + return { + "file": file_path, + "analysis_type": "summary", + "total_lines": total_lines, + "empty_lines": empty_lines, + "average_line_length": round(avg_line_length, 2), + "sample_content": sample_lines, + "summary": f"Log file with {total_lines} lines, average length {avg_line_length:.1f} characters" + } diff --git a/main.py b/main.py index ed55744..851b396 100644 --- a/main.py +++ b/main.py @@ -6,6 +6,7 @@ from langchain_community.tools.shell.tool import ShellTool from langgraph.graph import StateGraph, START, END from langgraph.graph.message import add_messages from langgraph.prebuilt import ToolNode, tools_condition +from log_analyzer import analyze_log_file class State(TypedDict): @@ -27,7 +28,7 @@ def create_chatbot(): # Define the tools shell_tool = ShellTool() - tools = [shell_tool] + tools = [shell_tool, analyze_log_file] # Bind tools to the LLM so it knows how to use them llm_with_tools = llm.bind_tools(tools)