import os import re from collections import Counter from typing import List, Dict, Any from langchain_core.tools import tool @tool def analyze_log_file(file_path: str, analysis_type: str = "error_patterns") -> Dict[str, Any]: """ Analyze log files for common sysadmin debugging patterns. Args: file_path: Path to the log file (relative to loghub directory) analysis_type: Type of analysis - "error_patterns", "frequency", "timeline", or "summary" Returns: Dictionary with analysis results """ try: # Construct full path if not file_path.startswith('/'): full_path = f"loghub/{file_path}" else: full_path = file_path if not os.path.exists(full_path): return {"error": f"File not found: {full_path}"} with open(full_path, 'r', encoding='utf-8', errors='ignore') as f: lines = f.readlines() if analysis_type == "error_patterns": return _analyze_error_patterns(lines, file_path) elif analysis_type == "frequency": return _analyze_frequency(lines, file_path) elif analysis_type == "timeline": return _analyze_timeline(lines, file_path) elif analysis_type == "summary": return _analyze_summary(lines, file_path) else: return {"error": f"Unknown analysis type: {analysis_type}"} except Exception as e: return {"error": f"Error analyzing file: {str(e)}"} def _analyze_error_patterns(lines: List[str], file_path: str) -> Dict[str, Any]: """Analyze error patterns in log lines.""" error_keywords = ['error', 'fail', 'exception', 'critical', 'fatal', 'denied', 'refused', 'timeout'] error_lines = [] error_counts = Counter() for i, line in enumerate(lines, 1): line_lower = line.lower() for keyword in error_keywords: if keyword in line_lower: error_lines.append(f"Line {i}: {line.strip()}") error_counts[keyword] += 1 break return { "file": file_path, "analysis_type": "error_patterns", "total_lines": len(lines), "error_lines_count": len(error_lines), "error_keywords_frequency": dict(error_counts.most_common()), "sample_errors": error_lines[:10], # First 10 error lines "summary": f"Found {len(error_lines)} error-related lines out of {len(lines)} total lines" } def _analyze_frequency(lines: List[str], file_path: str) -> Dict[str, Any]: """Analyze frequency patterns in logs.""" # Extract common patterns (simplified) patterns = Counter() for line in lines: # Remove timestamps and specific values for pattern matching cleaned = re.sub(r'\d+', 'NUM', line) cleaned = re.sub(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', 'IP', cleaned) cleaned = re.sub(r'[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}', 'UUID', cleaned) patterns[cleaned.strip()] += 1 return { "file": file_path, "analysis_type": "frequency", "total_lines": len(lines), "unique_patterns": len(patterns), "most_common_patterns": [{"pattern": p, "count": c} for p, c in patterns.most_common(10)], "summary": f"Found {len(patterns)} unique patterns in {len(lines)} lines" } def _analyze_timeline(lines: List[str], file_path: str) -> Dict[str, Any]: """Analyze timeline patterns in logs.""" timestamps = [] # Try to extract timestamps (simplified for demo) timestamp_patterns = [ r'(\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})', # Jun 14 15:16:01 r'(\[\w{3}\s+\w{3}\s+\d{2}\s+\d{2}:\d{2}:\d{2}\s+\d{4}\])', # [Sun Dec 04 04:47:44 2005] ] for line in lines[:100]: # Sample first 100 lines for demo for pattern in timestamp_patterns: match = re.search(pattern, line) if match: timestamps.append(match.group(1)) break return { "file": file_path, "analysis_type": "timeline", "total_lines": len(lines), "timestamps_found": len(timestamps), "sample_timestamps": timestamps[:10], "summary": f"Extracted {len(timestamps)} timestamps from first 100 lines" } def _analyze_summary(lines: List[str], file_path: str) -> Dict[str, Any]: """Provide a general summary of the log file.""" total_lines = len(lines) # Basic statistics avg_line_length = sum(len(line) for line in lines) / total_lines if total_lines > 0 else 0 empty_lines = sum(1 for line in lines if not line.strip()) # Sample content sample_lines = [line.strip() for line in lines[:5] if line.strip()] return { "file": file_path, "analysis_type": "summary", "total_lines": total_lines, "empty_lines": empty_lines, "average_line_length": round(avg_line_length, 2), "sample_content": sample_lines, "summary": f"Log file with {total_lines} lines, average length {avg_line_length:.1f} characters" }