143 lines
5.1 KiB
Python
143 lines
5.1 KiB
Python
import os
|
|
import re
|
|
from collections import Counter
|
|
from typing import List, Dict, Any
|
|
from langchain_core.tools import tool
|
|
|
|
|
|
@tool
|
|
def analyze_log_file(file_path: str, analysis_type: str = "error_patterns") -> Dict[str, Any]:
|
|
"""
|
|
Analyze log files for common sysadmin debugging patterns.
|
|
|
|
Args:
|
|
file_path: Path to the log file (relative to loghub directory)
|
|
analysis_type: Type of analysis - "error_patterns", "frequency", "timeline", or "summary"
|
|
|
|
Returns:
|
|
Dictionary with analysis results
|
|
"""
|
|
try:
|
|
# Construct full path
|
|
if not file_path.startswith('/'):
|
|
full_path = f"loghub/{file_path}"
|
|
else:
|
|
full_path = file_path
|
|
|
|
if not os.path.exists(full_path):
|
|
return {"error": f"File not found: {full_path}"}
|
|
|
|
with open(full_path, 'r', encoding='utf-8', errors='ignore') as f:
|
|
lines = f.readlines()
|
|
|
|
if analysis_type == "error_patterns":
|
|
return _analyze_error_patterns(lines, file_path)
|
|
elif analysis_type == "frequency":
|
|
return _analyze_frequency(lines, file_path)
|
|
elif analysis_type == "timeline":
|
|
return _analyze_timeline(lines, file_path)
|
|
elif analysis_type == "summary":
|
|
return _analyze_summary(lines, file_path)
|
|
else:
|
|
return {"error": f"Unknown analysis type: {analysis_type}"}
|
|
|
|
except Exception as e:
|
|
return {"error": f"Error analyzing file: {str(e)}"}
|
|
|
|
|
|
def _analyze_error_patterns(lines: List[str], file_path: str) -> Dict[str, Any]:
|
|
"""Analyze error patterns in log lines."""
|
|
error_keywords = ['error', 'fail', 'exception', 'critical', 'fatal', 'denied', 'refused', 'timeout']
|
|
|
|
error_lines = []
|
|
error_counts = Counter()
|
|
|
|
for i, line in enumerate(lines, 1):
|
|
line_lower = line.lower()
|
|
for keyword in error_keywords:
|
|
if keyword in line_lower:
|
|
error_lines.append(f"Line {i}: {line.strip()}")
|
|
error_counts[keyword] += 1
|
|
break
|
|
|
|
return {
|
|
"file": file_path,
|
|
"analysis_type": "error_patterns",
|
|
"total_lines": len(lines),
|
|
"error_lines_count": len(error_lines),
|
|
"error_keywords_frequency": dict(error_counts.most_common()),
|
|
"sample_errors": error_lines[:10], # First 10 error lines
|
|
"summary": f"Found {len(error_lines)} error-related lines out of {len(lines)} total lines"
|
|
}
|
|
|
|
|
|
def _analyze_frequency(lines: List[str], file_path: str) -> Dict[str, Any]:
|
|
"""Analyze frequency patterns in logs."""
|
|
# Extract common patterns (simplified)
|
|
patterns = Counter()
|
|
|
|
for line in lines:
|
|
# Remove timestamps and specific values for pattern matching
|
|
cleaned = re.sub(r'\d+', 'NUM', line)
|
|
cleaned = re.sub(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', 'IP', cleaned)
|
|
cleaned = re.sub(r'[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}', 'UUID', cleaned)
|
|
patterns[cleaned.strip()] += 1
|
|
|
|
return {
|
|
"file": file_path,
|
|
"analysis_type": "frequency",
|
|
"total_lines": len(lines),
|
|
"unique_patterns": len(patterns),
|
|
"most_common_patterns": [{"pattern": p, "count": c} for p, c in patterns.most_common(10)],
|
|
"summary": f"Found {len(patterns)} unique patterns in {len(lines)} lines"
|
|
}
|
|
|
|
|
|
def _analyze_timeline(lines: List[str], file_path: str) -> Dict[str, Any]:
|
|
"""Analyze timeline patterns in logs."""
|
|
timestamps = []
|
|
|
|
# Try to extract timestamps (simplified for demo)
|
|
timestamp_patterns = [
|
|
r'(\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})', # Jun 14 15:16:01
|
|
r'(\[\w{3}\s+\w{3}\s+\d{2}\s+\d{2}:\d{2}:\d{2}\s+\d{4}\])', # [Sun Dec 04 04:47:44 2005]
|
|
]
|
|
|
|
for line in lines[:100]: # Sample first 100 lines for demo
|
|
for pattern in timestamp_patterns:
|
|
match = re.search(pattern, line)
|
|
if match:
|
|
timestamps.append(match.group(1))
|
|
break
|
|
|
|
return {
|
|
"file": file_path,
|
|
"analysis_type": "timeline",
|
|
"total_lines": len(lines),
|
|
"timestamps_found": len(timestamps),
|
|
"sample_timestamps": timestamps[:10],
|
|
"summary": f"Extracted {len(timestamps)} timestamps from first 100 lines"
|
|
}
|
|
|
|
|
|
def _analyze_summary(lines: List[str], file_path: str) -> Dict[str, Any]:
|
|
"""Provide a general summary of the log file."""
|
|
total_lines = len(lines)
|
|
|
|
# Basic statistics
|
|
avg_line_length = sum(len(line) for line in lines) / total_lines if total_lines > 0 else 0
|
|
empty_lines = sum(1 for line in lines if not line.strip())
|
|
|
|
# Sample content
|
|
sample_lines = [line.strip() for line in lines[:5] if line.strip()]
|
|
|
|
return {
|
|
"file": file_path,
|
|
"analysis_type": "summary",
|
|
"total_lines": total_lines,
|
|
"empty_lines": empty_lines,
|
|
"average_line_length": round(avg_line_length, 2),
|
|
"sample_content": sample_lines,
|
|
"summary": f"Log file with {total_lines} lines, average length {avg_line_length:.1f} characters"
|
|
}
|