Monitoring & Logging
Overview
JobHive implements comprehensive monitoring and logging across all system components to ensure reliability, performance, and security. The monitoring stack includes application performance monitoring (APM), infrastructure monitoring, log aggregation, and alerting systems.Monitoring Architecture
Monitoring Stack Overview
Copy
┌─────────────────────────────────────────────────────────────┐
│ DataDog Dashboard │
│ (Unified Monitoring Interface) │
└─────────────────┬───────────────────────────────────────────┘
│
┌─────────────┼─────────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│Application│ │Infrastructure│ │Logs & │ │Business │
│Metrics │ │Metrics │ │Events │ │Metrics │
└─────────┘ └─────────┘ └─────────┘ └─────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│APM │ │System │ │Log │ │Custom │
│Traces │ │Metrics │ │Stream │ │Analytics│
└─────────┘ └─────────┘ └─────────┘ └─────────┘
Application Performance Monitoring (APM)
DataDog APM Integration
Copy
# settings/production.py
import ddtrace
from ddtrace import patch
# Patch Django and other libraries
patch(django=True, psycopg=True, redis=True, celery=True)
# DataDog configuration
DATADOG_TRACE = {
'DEFAULT_SERVICE': 'jobhive-api',
'TAGS': {
'env': 'production',
'version': '1.0.0',
'component': 'backend'
}
}
# Custom APM middleware
class DataDogTracingMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
# Add custom tags to trace
span = ddtrace.tracer.current_span()
if span:
span.set_tag('user.id', getattr(request.user, 'id', None))
span.set_tag('user.role', getattr(request.user, 'role', None))
span.set_tag('http.url', request.build_absolute_uri())
response = self.get_response(request)
# Add response tags
if span:
span.set_tag('http.status_code', response.status_code)
return response
Custom Performance Metrics
Copy
from datadog import DogStatsdClient
import time
import functools
# Initialize DataDog client
statsd = DogStatsdClient(host='localhost', port=8125, namespace='jobhive')
class PerformanceMetrics:
@staticmethod
def track_interview_session_performance(func):
"""Decorator to track interview session performance."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
# Track success
statsd.increment('interview.session.success',
tags=['method:' + func.__name__])
# Track execution time
execution_time = time.time() - start_time
statsd.timing('interview.session.duration',
execution_time * 1000, # Convert to milliseconds
tags=['method:' + func.__name__])
return result
except Exception as e:
# Track errors
statsd.increment('interview.session.error',
tags=['method:' + func.__name__,
'error_type:' + type(e).__name__])
raise
return wrapper
@staticmethod
def track_ai_processing_performance(component):
"""Track AI component processing performance."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
# Track processing time
processing_time = time.time() - start_time
statsd.timing(f'ai.{component}.processing_time',
processing_time * 1000,
tags=[f'component:{component}'])
# Track success rate
statsd.increment(f'ai.{component}.success',
tags=[f'component:{component}'])
return result
except Exception as e:
# Track AI errors
statsd.increment(f'ai.{component}.error',
tags=[f'component:{component}',
'error_type:' + type(e).__name__])
raise
return wrapper
return decorator
# Usage in views and services
class InterviewSessionViewSet(ModelViewSet):
@PerformanceMetrics.track_interview_session_performance
def create(self, request):
# Implementation
pass
@PerformanceMetrics.track_interview_session_performance
def update(self, request, pk=None):
# Implementation
pass
class SentimentAnalysisAgent:
@PerformanceMetrics.track_ai_processing_performance('sentiment_analysis')
def analyze_sentiment(self, text):
# AI processing implementation
pass
Business Metrics Tracking
Copy
class BusinessMetricsCollector:
def __init__(self):
self.statsd = DogStatsdClient(host='localhost', port=8125, namespace='jobhive.business')
def track_user_engagement(self, user, action):
"""Track user engagement metrics."""
tags = [
f'user_role:{user.role}',
f'user_plan:{user.subscription.plan.name}',
f'action:{action}'
]
self.statsd.increment('user.engagement', tags=tags)
# Track daily active users
cache_key = f'dau:{user.id}:{timezone.now().date()}'
if not cache.get(cache_key):
cache.set(cache_key, True, timeout=86400) # 24 hours
self.statsd.increment('user.daily_active', tags=[f'role:{user.role}'])
def track_interview_completion(self, interview_session):
"""Track interview completion metrics."""
tags = [
f'interview_type:{"practice" if interview_session.is_practice else "real"}',
f'completion_rate:{int(interview_session.completion_percentage)}',
f'user_role:{interview_session.user.role}'
]
self.statsd.increment('interview.completed', tags=tags)
# Track completion quality
if interview_session.completion_percentage >= 80:
self.statsd.increment('interview.high_quality_completion', tags=tags)
def track_subscription_metrics(self, subscription, event_type):
"""Track subscription-related metrics."""
tags = [
f'plan:{subscription.plan.name}',
f'plan_type:{subscription.plan.plan_type}',
f'event:{event_type}'
]
self.statsd.increment('subscription.event', tags=tags)
if event_type == 'created':
# Track monthly recurring revenue impact
monthly_value = float(subscription.plan.price)
if subscription.plan.interval == 'year':
monthly_value = monthly_value / 12
self.statsd.gauge('subscription.mrr', monthly_value, tags=tags)
def track_ai_accuracy(self, component, predicted_score, actual_score):
"""Track AI prediction accuracy."""
accuracy = 1 - abs(predicted_score - actual_score) / max(predicted_score, actual_score)
self.statsd.gauge(f'ai.{component}.accuracy', accuracy,
tags=[f'component:{component}'])
Infrastructure Monitoring
System Metrics Collection
Copy
# DataDog agent configuration
datadog_agent_config:
api_key: ${DATADOG_API_KEY}
site: datadoghq.com
# Enable infrastructure monitoring
process_agent:
enabled: true
# Enable log collection
logs_enabled: true
# Enable network monitoring
network_monitoring:
enabled: true
# Custom checks
conf.d:
postgres.yaml:
init_config:
instances:
- host: ${DATABASE_HOST}
port: 5432
username: datadog
password: ${DATADOG_DB_PASSWORD}
tags:
- service:jobhive-db
- env:production
redis.yaml:
init_config:
instances:
- host: ${REDIS_HOST}
port: 6379
password: ${REDIS_PASSWORD}
tags:
- service:jobhive-cache
- env:production
http_check.yaml:
init_config:
instances:
- name: jobhive-api-health
url: https://api.jobhive.com/health/
timeout: 10
tags:
- service:jobhive-api
- env:production
Custom Infrastructure Checks
Copy
class InfrastructureHealthChecker:
def __init__(self):
self.statsd = DogStatsdClient(host='localhost', port=8125, namespace='jobhive.infrastructure')
def check_database_health(self):
"""Check database connection and performance."""
try:
start_time = time.time()
# Test connection
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
result = cursor.fetchone()
# Measure connection time
connection_time = (time.time() - start_time) * 1000
self.statsd.timing('database.connection_time', connection_time)
# Check active connections
with connection.cursor() as cursor:
cursor.execute("""
SELECT count(*)
FROM pg_stat_activity
WHERE state = 'active'
""")
active_connections = cursor.fetchone()[0]
self.statsd.gauge('database.active_connections', active_connections)
# Database health status
self.statsd.service_check('database.health',
DogStatsdClient.OK,
tags=['database:postgresql'])
except Exception as e:
logger.error(f"Database health check failed: {e}")
self.statsd.service_check('database.health',
DogStatsdClient.CRITICAL,
tags=['database:postgresql'])
def check_redis_health(self):
"""Check Redis connection and performance."""
try:
redis_client = redis.Redis.from_url(settings.REDIS_URL)
start_time = time.time()
redis_client.ping()
response_time = (time.time() - start_time) * 1000
self.statsd.timing('redis.response_time', response_time)
# Check memory usage
info = redis_client.info('memory')
memory_usage = info['used_memory']
max_memory = info.get('maxmemory', 0)
self.statsd.gauge('redis.memory_used', memory_usage)
if max_memory > 0:
memory_percentage = (memory_usage / max_memory) * 100
self.statsd.gauge('redis.memory_percentage', memory_percentage)
self.statsd.service_check('redis.health',
DogStatsdClient.OK,
tags=['cache:redis'])
except Exception as e:
logger.error(f"Redis health check failed: {e}")
self.statsd.service_check('redis.health',
DogStatsdClient.CRITICAL,
tags=['cache:redis'])
def check_celery_health(self):
"""Check Celery worker health."""
try:
from celery import current_app
# Check active workers
inspect = current_app.control.inspect()
stats = inspect.stats()
if stats:
worker_count = len(stats)
self.statsd.gauge('celery.active_workers', worker_count)
# Check queue lengths
active_queues = inspect.active_queues()
for worker, queues in active_queues.items():
for queue_info in queues:
queue_name = queue_info['name']
# This would require additional queue length checking
# Implementation depends on broker (Redis/RabbitMQ)
self.statsd.service_check('celery.health',
DogStatsdClient.OK,
tags=['service:celery'])
else:
self.statsd.service_check('celery.health',
DogStatsdClient.CRITICAL,
tags=['service:celery'])
except Exception as e:
logger.error(f"Celery health check failed: {e}")
self.statsd.service_check('celery.health',
DogStatsdClient.CRITICAL,
tags=['service:celery'])
Comprehensive Logging Strategy
Structured Logging Configuration
Copy
# settings/logging.py
import os
import sys
from pythonjsonlogger import jsonlogger
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'json': {
'()': jsonlogger.JsonFormatter,
'format': '%(asctime)s %(name)s %(levelname)s %(pathname)s %(funcName)s %(lineno)d %(message)s'
},
'verbose': {
'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
'style': '{',
},
'simple': {
'format': '{levelname} {message}',
'style': '{',
},
},
'handlers': {
'console': {
'level': 'INFO',
'class': 'logging.StreamHandler',
'formatter': 'json',
'stream': sys.stdout,
},
'file': {
'level': 'INFO',
'class': 'logging.handlers.RotatingFileHandler',
'filename': '/var/log/jobhive/application.log',
'maxBytes': 50 * 1024 * 1024, # 50MB
'backupCount': 10,
'formatter': 'json',
},
'security': {
'level': 'WARNING',
'class': 'logging.handlers.RotatingFileHandler',
'filename': '/var/log/jobhive/security.log',
'maxBytes': 50 * 1024 * 1024, # 50MB
'backupCount': 20,
'formatter': 'json',
},
'datadog': {
'level': 'INFO',
'class': 'datadog.DogStatsdLogHandler',
'formatter': 'json',
}
},
'loggers': {
'django': {
'handlers': ['console', 'file'],
'level': 'INFO',
'propagate': False,
},
'jobhive': {
'handlers': ['console', 'file', 'datadog'],
'level': 'INFO',
'propagate': False,
},
'jobhive.security': {
'handlers': ['console', 'security', 'datadog'],
'level': 'WARNING',
'propagate': False,
},
'jobhive.ai': {
'handlers': ['console', 'file', 'datadog'],
'level': 'INFO',
'propagate': False,
},
'celery': {
'handlers': ['console', 'file'],
'level': 'INFO',
'propagate': False,
},
},
'root': {
'handlers': ['console'],
'level': 'WARNING',
}
}
Application Event Logging
Copy
import logging
import json
from datetime import datetime
from django.contrib.auth import get_user_model
logger = logging.getLogger('jobhive')
security_logger = logging.getLogger('jobhive.security')
ai_logger = logging.getLogger('jobhive.ai')
class EventLogger:
@staticmethod
def log_user_action(user, action, resource=None, details=None):
"""Log user actions for audit trail."""
log_data = {
'event_type': 'user_action',
'user_id': user.id if user else None,
'user_email': user.email if user else None,
'user_role': user.role if user else None,
'action': action,
'resource': resource,
'details': details or {},
'timestamp': datetime.utcnow().isoformat(),
'ip_address': getattr(user, 'current_ip', None)
}
logger.info('User action', extra=log_data)
@staticmethod
def log_interview_event(interview_session, event_type, details=None):
"""Log interview-related events."""
log_data = {
'event_type': 'interview_event',
'interview_session_id': str(interview_session.session_id),
'user_id': interview_session.user.id,
'job_id': interview_session.job.id if interview_session.job else None,
'interview_type': 'practice' if interview_session.is_practice else 'real',
'event': event_type,
'details': details or {},
'timestamp': datetime.utcnow().isoformat()
}
logger.info('Interview event', extra=log_data)
@staticmethod
def log_ai_processing(component, processing_time, accuracy=None, error=None):
"""Log AI processing events."""
log_data = {
'event_type': 'ai_processing',
'component': component,
'processing_time_ms': processing_time * 1000,
'accuracy': accuracy,
'error': str(error) if error else None,
'timestamp': datetime.utcnow().isoformat()
}
if error:
ai_logger.error('AI processing error', extra=log_data)
else:
ai_logger.info('AI processing completed', extra=log_data)
@staticmethod
def log_security_event(event_type, severity, details=None, user=None):
"""Log security-related events."""
log_data = {
'event_type': 'security_event',
'security_event_type': event_type,
'severity': severity,
'user_id': user.id if user else None,
'user_email': user.email if user else None,
'details': details or {},
'timestamp': datetime.utcnow().isoformat()
}
if severity in ['high', 'critical']:
security_logger.critical('Security event', extra=log_data)
elif severity == 'medium':
security_logger.warning('Security event', extra=log_data)
else:
security_logger.info('Security event', extra=log_data)
# Usage examples in views and services
class InterviewSessionViewSet(ModelViewSet):
def create(self, request):
try:
interview_session = self.perform_create(serializer)
# Log successful creation
EventLogger.log_user_action(
user=request.user,
action='create_interview_session',
resource=f'interview_session:{interview_session.session_id}',
details={'is_practice': interview_session.is_practice}
)
EventLogger.log_interview_event(
interview_session=interview_session,
event_type='session_created'
)
except Exception as e:
# Log error
EventLogger.log_user_action(
user=request.user,
action='create_interview_session_failed',
details={'error': str(e)}
)
raise
Error Tracking and Alerting
Copy
import sentry_sdk
from sentry_sdk.integrations.django import DjangoIntegration
from sentry_sdk.integrations.celery import CeleryIntegration
from sentry_sdk.integrations.redis import RedisIntegration
# Sentry configuration for error tracking
sentry_sdk.init(
dsn=settings.SENTRY_DSN,
integrations=[
DjangoIntegration(
transaction_style='url',
middleware_spans=True,
signals_spans=True,
cache_spans=True,
),
CeleryIntegration(
monitor_beat_tasks=True,
propagate_traces=True,
),
RedisIntegration(),
],
traces_sample_rate=0.1, # Capture 10% of transactions for performance monitoring
send_default_pii=False, # Don't send personally identifiable information
environment=settings.ENVIRONMENT,
release=settings.VERSION,
before_send=before_send_filter,
)
def before_send_filter(event, hint):
"""Filter sensitive data before sending to Sentry."""
# Remove sensitive keys from extra data
sensitive_keys = ['password', 'token', 'secret', 'key', 'authorization']
if 'extra' in event:
for key in list(event['extra'].keys()):
if any(sensitive in key.lower() for sensitive in sensitive_keys):
event['extra'][key] = '[Filtered]'
# Filter request data
if 'request' in event and 'data' in event['request']:
data = event['request']['data']
if isinstance(data, dict):
for key in list(data.keys()):
if any(sensitive in key.lower() for sensitive in sensitive_keys):
data[key] = '[Filtered]'
return event
class ErrorTracker:
@staticmethod
def track_error(error, context=None, user=None):
"""Track application errors with context."""
with sentry_sdk.push_scope() as scope:
# Add user context
if user:
scope.user = {
'id': user.id,
'email': user.email,
'role': user.role
}
# Add custom context
if context:
for key, value in context.items():
scope.set_context(key, value)
# Capture exception
sentry_sdk.capture_exception(error)
@staticmethod
def track_performance_issue(transaction_name, duration, context=None):
"""Track performance issues."""
with sentry_sdk.start_transaction(
op="performance_issue",
name=transaction_name
) as transaction:
transaction.set_tag("duration_ms", duration * 1000)
if context:
for key, value in context.items():
transaction.set_tag(key, value)
# Mark as performance issue if duration exceeds threshold
if duration > 5.0: # 5 seconds
sentry_sdk.capture_message(
f"Performance issue: {transaction_name} took {duration:.2f}s",
level="warning"
)
Real-Time Monitoring Dashboards
Custom DataDog Dashboards
Copy
from datadog_api_client import ApiClient, Configuration
from datadog_api_client.v1.api.dashboards_api import DashboardsApi
from datadog_api_client.v1.model.dashboard import Dashboard
class DashboardManager:
def __init__(self):
configuration = Configuration()
configuration.api_key['apikey'] = settings.DATADOG_API_KEY
configuration.api_key['appkey'] = settings.DATADOG_APP_KEY
self.api_client = ApiClient(configuration)
self.dashboards_api = DashboardsApi(self.api_client)
def create_business_dashboard(self):
"""Create business metrics dashboard."""
dashboard_definition = {
"title": "JobHive Business Metrics",
"description": "Key business metrics and KPIs",
"widgets": [
{
"definition": {
"type": "timeseries",
"requests": [
{
"q": "sum:jobhive.business.user.daily_active{*}",
"display_type": "line"
}
],
"title": "Daily Active Users"
}
},
{
"definition": {
"type": "query_value",
"requests": [
{
"q": "sum:jobhive.business.subscription.mrr{*}",
"aggregator": "last"
}
],
"title": "Monthly Recurring Revenue"
}
},
{
"definition": {
"type": "timeseries",
"requests": [
{
"q": "sum:jobhive.business.interview.completed{*} by {interview_type}",
"display_type": "bars"
}
],
"title": "Interview Completions"
}
}
],
"layout_type": "ordered"
}
dashboard = Dashboard(**dashboard_definition)
result = self.dashboards_api.create_dashboard(dashboard)
return result
def create_technical_dashboard(self):
"""Create technical monitoring dashboard."""
dashboard_definition = {
"title": "JobHive Technical Metrics",
"description": "System performance and health metrics",
"widgets": [
{
"definition": {
"type": "timeseries",
"requests": [
{
"q": "avg:trace.django.request.duration{service:jobhive-api}",
"display_type": "line"
}
],
"title": "API Response Time"
}
},
{
"definition": {
"type": "timeseries",
"requests": [
{
"q": "sum:trace.django.request.hits{service:jobhive-api} by {http.status_code}",
"display_type": "bars"
}
],
"title": "Request Volume by Status Code"
}
},
{
"definition": {
"type": "timeseries",
"requests": [
{
"q": "avg:postgresql.connections{service:jobhive-db}",
"display_type": "line"
}
],
"title": "Database Connections"
}
}
],
"layout_type": "ordered"
}
dashboard = Dashboard(**dashboard_definition)
result = self.dashboards_api.create_dashboard(dashboard)
return result
Alert Configuration
Copy
from datadog_api_client.v1.api.monitors_api import MonitorsApi
from datadog_api_client.v1.model.monitor import Monitor
class AlertingManager:
def __init__(self):
configuration = Configuration()
configuration.api_key['apikey'] = settings.DATADOG_API_KEY
configuration.api_key['appkey'] = settings.DATADOG_APP_KEY
self.api_client = ApiClient(configuration)
self.monitors_api = MonitorsApi(self.api_client)
def create_performance_alerts(self):
"""Create performance-related alerts."""
alerts = [
{
"name": "High API Response Time",
"query": "avg(last_5m):avg:trace.django.request.duration{service:jobhive-api} > 2",
"message": "API response time is above 2 seconds. @pagerduty-jobhive",
"type": "metric alert",
"options": {
"thresholds": {
"critical": 2.0,
"warning": 1.5
},
"notify_audit": False,
"require_full_window": True,
"notify_no_data": True,
"no_data_timeframe": 10
}
},
{
"name": "High Error Rate",
"query": "sum(last_5m):sum:trace.django.request.errors{service:jobhive-api}.as_rate() > 0.05",
"message": "Error rate is above 5%. @slack-alerts @pagerduty-jobhive",
"type": "metric alert",
"options": {
"thresholds": {
"critical": 0.05,
"warning": 0.02
}
}
},
{
"name": "Database Connection Pool Exhaustion",
"query": "avg(last_2m):avg:postgresql.connections{service:jobhive-db} > 180",
"message": "Database connection pool is nearly exhausted. @pagerduty-jobhive",
"type": "metric alert",
"options": {
"thresholds": {
"critical": 180,
"warning": 150
}
}
}
]
created_monitors = []
for alert_config in alerts:
monitor = Monitor(**alert_config)
result = self.monitors_api.create_monitor(monitor)
created_monitors.append(result)
return created_monitors
Log Analysis and Insights
Log Processing Pipeline
Copy
import re
import json
from collections import defaultdict, Counter
from datetime import datetime, timedelta
class LogAnalyzer:
def __init__(self):
self.log_patterns = {
'api_request': re.compile(r'HTTP_(\d{3}).*?"(\w+)\s([^"]+)"'),
'user_action': re.compile(r'event_type.*?user_action.*?user_id.*?(\d+).*?action.*?"([^"]+)"'),
'error': re.compile(r'ERROR.*?(\w+Error).*?([^\n]+)'),
'performance': re.compile(r'processing_time_ms.*?(\d+(?:\.\d+)?)')
}
def analyze_request_patterns(self, log_file_path, hours=24):
"""Analyze API request patterns."""
request_stats = defaultdict(lambda: defaultdict(int))
error_patterns = Counter()
performance_stats = []
cutoff_time = datetime.now() - timedelta(hours=hours)
with open(log_file_path, 'r') as log_file:
for line in log_file:
try:
log_entry = json.loads(line)
timestamp = datetime.fromisoformat(log_entry.get('asctime', ''))
if timestamp < cutoff_time:
continue
# Analyze API requests
if 'HTTP' in line and 'django.request' in line:
method = log_entry.get('method', 'UNKNOWN')
status_code = log_entry.get('status_code', 0)
path = log_entry.get('pathname', 'unknown')
request_stats[f"{method} {path}"][status_code] += 1
# Analyze errors
if log_entry.get('levelname') == 'ERROR':
error_type = log_entry.get('exc_info', {}).get('type', 'UnknownError')
error_patterns[error_type] += 1
# Analyze performance
if 'processing_time_ms' in line:
processing_time = log_entry.get('processing_time_ms', 0)
component = log_entry.get('component', 'unknown')
performance_stats.append({
'component': component,
'processing_time': processing_time,
'timestamp': timestamp
})
except (json.JSONDecodeError, ValueError):
continue
return {
'request_patterns': dict(request_stats),
'error_patterns': dict(error_patterns),
'performance_stats': performance_stats
}
def generate_performance_report(self, performance_stats):
"""Generate performance analysis report."""
component_stats = defaultdict(list)
for stat in performance_stats:
component_stats[stat['component']].append(stat['processing_time'])
performance_report = {}
for component, times in component_stats.items():
if times:
performance_report[component] = {
'avg_time': sum(times) / len(times),
'max_time': max(times),
'min_time': min(times),
'count': len(times),
'p95': sorted(times)[int(len(times) * 0.95)] if len(times) >= 20 else max(times)
}
return performance_report
def detect_anomalies(self, metrics, window_hours=24):
"""Detect anomalies in metrics."""
anomalies = []
# Define thresholds
thresholds = {
'high_error_rate': 0.05, # 5% error rate
'slow_response_time': 2000, # 2 seconds
'high_failure_rate': 0.1 # 10% failure rate
}
# Check error rates
total_requests = sum(sum(status_codes.values()) for status_codes in metrics['request_patterns'].values())
error_requests = sum(
count for endpoint_stats in metrics['request_patterns'].values()
for status_code, count in endpoint_stats.items()
if status_code >= 400
)
if total_requests > 0:
error_rate = error_requests / total_requests
if error_rate > thresholds['high_error_rate']:
anomalies.append({
'type': 'high_error_rate',
'value': error_rate,
'threshold': thresholds['high_error_rate'],
'severity': 'high' if error_rate > 0.1 else 'medium'
})
# Check performance anomalies
performance_report = self.generate_performance_report(metrics['performance_stats'])
for component, stats in performance_report.items():
if stats['p95'] > thresholds['slow_response_time']:
anomalies.append({
'type': 'slow_response_time',
'component': component,
'value': stats['p95'],
'threshold': thresholds['slow_response_time'],
'severity': 'medium'
})
return anomalies
Automated Log Processing
Copy
from celery import shared_task
import boto3
@shared_task
def process_application_logs():
"""Process and analyze application logs periodically."""
log_analyzer = LogAnalyzer()
# Analyze logs from multiple sources
log_files = [
'/var/log/jobhive/application.log',
'/var/log/jobhive/security.log'
]
combined_analysis = {
'request_patterns': defaultdict(lambda: defaultdict(int)),
'error_patterns': Counter(),
'performance_stats': []
}
for log_file in log_files:
try:
analysis = log_analyzer.analyze_request_patterns(log_file)
# Combine results
for endpoint, stats in analysis['request_patterns'].items():
for status_code, count in stats.items():
combined_analysis['request_patterns'][endpoint][status_code] += count
combined_analysis['error_patterns'].update(analysis['error_patterns'])
combined_analysis['performance_stats'].extend(analysis['performance_stats'])
except FileNotFoundError:
continue
# Detect anomalies
anomalies = log_analyzer.detect_anomalies(combined_analysis)
# Store analysis results
LogAnalysisResult.objects.create(
analysis_date=timezone.now().date(),
request_patterns=dict(combined_analysis['request_patterns']),
error_patterns=dict(combined_analysis['error_patterns']),
performance_summary=log_analyzer.generate_performance_report(
combined_analysis['performance_stats']
),
anomalies=anomalies
)
# Send alerts for critical anomalies
critical_anomalies = [a for a in anomalies if a.get('severity') == 'high']
if critical_anomalies:
send_anomaly_alerts.delay(critical_anomalies)
return len(anomalies)
@shared_task
def send_anomaly_alerts(anomalies):
"""Send alerts for detected anomalies."""
# Format alert message
alert_message = "Critical anomalies detected:\n\n"
for anomaly in anomalies:
alert_message += f"- {anomaly['type']}: {anomaly['value']:.3f} (threshold: {anomaly['threshold']})\n"
# Send to Slack/email/PagerDuty
# Implementation depends on notification preferences
pass
