agent-pard0x/log_analyzer.py
2025-06-25 15:27:47 +02:00

143 lines
5.1 KiB
Python

import os
import re
from collections import Counter
from typing import List, Dict, Any
from langchain_core.tools import tool
@tool
def analyze_log_file(file_path: str, analysis_type: str = "error_patterns") -> Dict[str, Any]:
"""
Analyze log files for common sysadmin debugging patterns.
Args:
file_path: Path to the log file (relative to loghub directory)
analysis_type: Type of analysis - "error_patterns", "frequency", "timeline", or "summary"
Returns:
Dictionary with analysis results
"""
try:
# Construct full path
if not file_path.startswith('/'):
full_path = f"loghub/{file_path}"
else:
full_path = file_path
if not os.path.exists(full_path):
return {"error": f"File not found: {full_path}"}
with open(full_path, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
if analysis_type == "error_patterns":
return _analyze_error_patterns(lines, file_path)
elif analysis_type == "frequency":
return _analyze_frequency(lines, file_path)
elif analysis_type == "timeline":
return _analyze_timeline(lines, file_path)
elif analysis_type == "summary":
return _analyze_summary(lines, file_path)
else:
return {"error": f"Unknown analysis type: {analysis_type}"}
except Exception as e:
return {"error": f"Error analyzing file: {str(e)}"}
def _analyze_error_patterns(lines: List[str], file_path: str) -> Dict[str, Any]:
"""Analyze error patterns in log lines."""
error_keywords = ['error', 'fail', 'exception', 'critical', 'fatal', 'denied', 'refused', 'timeout']
error_lines = []
error_counts = Counter()
for i, line in enumerate(lines, 1):
line_lower = line.lower()
for keyword in error_keywords:
if keyword in line_lower:
error_lines.append(f"Line {i}: {line.strip()}")
error_counts[keyword] += 1
break
return {
"file": file_path,
"analysis_type": "error_patterns",
"total_lines": len(lines),
"error_lines_count": len(error_lines),
"error_keywords_frequency": dict(error_counts.most_common()),
"sample_errors": error_lines[:10], # First 10 error lines
"summary": f"Found {len(error_lines)} error-related lines out of {len(lines)} total lines"
}
def _analyze_frequency(lines: List[str], file_path: str) -> Dict[str, Any]:
"""Analyze frequency patterns in logs."""
# Extract common patterns (simplified)
patterns = Counter()
for line in lines:
# Remove timestamps and specific values for pattern matching
cleaned = re.sub(r'\d+', 'NUM', line)
cleaned = re.sub(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', 'IP', cleaned)
cleaned = re.sub(r'[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}', 'UUID', cleaned)
patterns[cleaned.strip()] += 1
return {
"file": file_path,
"analysis_type": "frequency",
"total_lines": len(lines),
"unique_patterns": len(patterns),
"most_common_patterns": [{"pattern": p, "count": c} for p, c in patterns.most_common(10)],
"summary": f"Found {len(patterns)} unique patterns in {len(lines)} lines"
}
def _analyze_timeline(lines: List[str], file_path: str) -> Dict[str, Any]:
"""Analyze timeline patterns in logs."""
timestamps = []
# Try to extract timestamps (simplified for demo)
timestamp_patterns = [
r'(\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})', # Jun 14 15:16:01
r'(\[\w{3}\s+\w{3}\s+\d{2}\s+\d{2}:\d{2}:\d{2}\s+\d{4}\])', # [Sun Dec 04 04:47:44 2005]
]
for line in lines[:100]: # Sample first 100 lines for demo
for pattern in timestamp_patterns:
match = re.search(pattern, line)
if match:
timestamps.append(match.group(1))
break
return {
"file": file_path,
"analysis_type": "timeline",
"total_lines": len(lines),
"timestamps_found": len(timestamps),
"sample_timestamps": timestamps[:10],
"summary": f"Extracted {len(timestamps)} timestamps from first 100 lines"
}
def _analyze_summary(lines: List[str], file_path: str) -> Dict[str, Any]:
"""Provide a general summary of the log file."""
total_lines = len(lines)
# Basic statistics
avg_line_length = sum(len(line) for line in lines) / total_lines if total_lines > 0 else 0
empty_lines = sum(1 for line in lines if not line.strip())
# Sample content
sample_lines = [line.strip() for line in lines[:5] if line.strip()]
return {
"file": file_path,
"analysis_type": "summary",
"total_lines": total_lines,
"empty_lines": empty_lines,
"average_line_length": round(avg_line_length, 2),
"sample_content": sample_lines,
"summary": f"Log file with {total_lines} lines, average length {avg_line_length:.1f} characters"
}