Skip to main content

Monitoring & Logging

Overview

JobHive implements comprehensive monitoring and logging across all system components to ensure reliability, performance, and security. The monitoring stack includes application performance monitoring (APM), infrastructure monitoring, log aggregation, and alerting systems.

Monitoring Architecture

Monitoring Stack Overview

┌─────────────────────────────────────────────────────────────┐
│                    DataDog Dashboard                        │
│              (Unified Monitoring Interface)                 │
└─────────────────┬───────────────────────────────────────────┘

    ┌─────────────┼─────────────┐
    │             │             │
    ▼             ▼             ▼
┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐
│Application│ │Infrastructure│ │Logs & │ │Business │
│Metrics   │  │Metrics      │  │Events │  │Metrics │
└─────────┘  └─────────┘  └─────────┘  └─────────┘
    │             │             │          │
    ▼             ▼             ▼          ▼
┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐
│APM      │  │System   │  │Log      │  │Custom   │
│Traces   │  │Metrics  │  │Stream   │  │Analytics│
└─────────┘  └─────────┘  └─────────┘  └─────────┘

Application Performance Monitoring (APM)

DataDog APM Integration

# settings/production.py
import ddtrace
from ddtrace import patch

# Patch Django and other libraries
patch(django=True, psycopg=True, redis=True, celery=True)

# DataDog configuration
DATADOG_TRACE = {
    'DEFAULT_SERVICE': 'jobhive-api',
    'TAGS': {
        'env': 'production',
        'version': '1.0.0',
        'component': 'backend'
    }
}

# Custom APM middleware
class DataDogTracingMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response

    def __call__(self, request):
        # Add custom tags to trace
        span = ddtrace.tracer.current_span()
        if span:
            span.set_tag('user.id', getattr(request.user, 'id', None))
            span.set_tag('user.role', getattr(request.user, 'role', None))
            span.set_tag('http.url', request.build_absolute_uri())
            
        response = self.get_response(request)
        
        # Add response tags
        if span:
            span.set_tag('http.status_code', response.status_code)
            
        return response

Custom Performance Metrics

from datadog import DogStatsdClient
import time
import functools

# Initialize DataDog client
statsd = DogStatsdClient(host='localhost', port=8125, namespace='jobhive')

class PerformanceMetrics:
    @staticmethod
    def track_interview_session_performance(func):
        """Decorator to track interview session performance."""
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            start_time = time.time()
            
            try:
                result = func(*args, **kwargs)
                
                # Track success
                statsd.increment('interview.session.success', 
                               tags=['method:' + func.__name__])
                
                # Track execution time
                execution_time = time.time() - start_time
                statsd.timing('interview.session.duration', 
                            execution_time * 1000,  # Convert to milliseconds
                            tags=['method:' + func.__name__])
                
                return result
                
            except Exception as e:
                # Track errors
                statsd.increment('interview.session.error', 
                               tags=['method:' + func.__name__, 
                                   'error_type:' + type(e).__name__])
                raise
                
        return wrapper
    
    @staticmethod
    def track_ai_processing_performance(component):
        """Track AI component processing performance."""
        def decorator(func):
            @functools.wraps(func)
            def wrapper(*args, **kwargs):
                start_time = time.time()
                
                try:
                    result = func(*args, **kwargs)
                    
                    # Track processing time
                    processing_time = time.time() - start_time
                    statsd.timing(f'ai.{component}.processing_time', 
                                processing_time * 1000,
                                tags=[f'component:{component}'])
                    
                    # Track success rate
                    statsd.increment(f'ai.{component}.success',
                                   tags=[f'component:{component}'])
                    
                    return result
                    
                except Exception as e:
                    # Track AI errors
                    statsd.increment(f'ai.{component}.error',
                                   tags=[f'component:{component}', 
                                       'error_type:' + type(e).__name__])
                    raise
                    
            return wrapper
        return decorator

# Usage in views and services
class InterviewSessionViewSet(ModelViewSet):
    @PerformanceMetrics.track_interview_session_performance
    def create(self, request):
        # Implementation
        pass
    
    @PerformanceMetrics.track_interview_session_performance
    def update(self, request, pk=None):
        # Implementation
        pass

class SentimentAnalysisAgent:
    @PerformanceMetrics.track_ai_processing_performance('sentiment_analysis')
    def analyze_sentiment(self, text):
        # AI processing implementation
        pass

Business Metrics Tracking

class BusinessMetricsCollector:
    def __init__(self):
        self.statsd = DogStatsdClient(host='localhost', port=8125, namespace='jobhive.business')
    
    def track_user_engagement(self, user, action):
        """Track user engagement metrics."""
        tags = [
            f'user_role:{user.role}',
            f'user_plan:{user.subscription.plan.name}',
            f'action:{action}'
        ]
        
        self.statsd.increment('user.engagement', tags=tags)
        
        # Track daily active users
        cache_key = f'dau:{user.id}:{timezone.now().date()}'
        if not cache.get(cache_key):
            cache.set(cache_key, True, timeout=86400)  # 24 hours
            self.statsd.increment('user.daily_active', tags=[f'role:{user.role}'])
    
    def track_interview_completion(self, interview_session):
        """Track interview completion metrics."""
        tags = [
            f'interview_type:{"practice" if interview_session.is_practice else "real"}',
            f'completion_rate:{int(interview_session.completion_percentage)}',
            f'user_role:{interview_session.user.role}'
        ]
        
        self.statsd.increment('interview.completed', tags=tags)
        
        # Track completion quality
        if interview_session.completion_percentage >= 80:
            self.statsd.increment('interview.high_quality_completion', tags=tags)
    
    def track_subscription_metrics(self, subscription, event_type):
        """Track subscription-related metrics."""
        tags = [
            f'plan:{subscription.plan.name}',
            f'plan_type:{subscription.plan.plan_type}',
            f'event:{event_type}'
        ]
        
        self.statsd.increment('subscription.event', tags=tags)
        
        if event_type == 'created':
            # Track monthly recurring revenue impact
            monthly_value = float(subscription.plan.price)
            if subscription.plan.interval == 'year':
                monthly_value = monthly_value / 12
                
            self.statsd.gauge('subscription.mrr', monthly_value, tags=tags)
    
    def track_ai_accuracy(self, component, predicted_score, actual_score):
        """Track AI prediction accuracy."""
        accuracy = 1 - abs(predicted_score - actual_score) / max(predicted_score, actual_score)
        
        self.statsd.gauge(f'ai.{component}.accuracy', accuracy, 
                         tags=[f'component:{component}'])

Infrastructure Monitoring

System Metrics Collection

# DataDog agent configuration
datadog_agent_config:
  api_key: ${DATADOG_API_KEY}
  site: datadoghq.com
  
  # Enable infrastructure monitoring
  process_agent:
    enabled: true
  
  # Enable log collection
  logs_enabled: true
  
  # Enable network monitoring
  network_monitoring:
    enabled: true
  
  # Custom checks
  conf.d:
    postgres.yaml:
      init_config:
      instances:
        - host: ${DATABASE_HOST}
          port: 5432
          username: datadog
          password: ${DATADOG_DB_PASSWORD}
          tags:
            - service:jobhive-db
            - env:production
    
    redis.yaml:
      init_config:
      instances:
        - host: ${REDIS_HOST}
          port: 6379
          password: ${REDIS_PASSWORD}
          tags:
            - service:jobhive-cache
            - env:production
    
    http_check.yaml:
      init_config:
      instances:
        - name: jobhive-api-health
          url: https://api.jobhive.com/health/
          timeout: 10
          tags:
            - service:jobhive-api
            - env:production

Custom Infrastructure Checks

class InfrastructureHealthChecker:
    def __init__(self):
        self.statsd = DogStatsdClient(host='localhost', port=8125, namespace='jobhive.infrastructure')
    
    def check_database_health(self):
        """Check database connection and performance."""
        try:
            start_time = time.time()
            
            # Test connection
            with connection.cursor() as cursor:
                cursor.execute("SELECT 1")
                result = cursor.fetchone()
            
            # Measure connection time
            connection_time = (time.time() - start_time) * 1000
            self.statsd.timing('database.connection_time', connection_time)
            
            # Check active connections
            with connection.cursor() as cursor:
                cursor.execute("""
                    SELECT count(*) 
                    FROM pg_stat_activity 
                    WHERE state = 'active'
                """)
                active_connections = cursor.fetchone()[0]
            
            self.statsd.gauge('database.active_connections', active_connections)
            
            # Database health status
            self.statsd.service_check('database.health', 
                                    DogStatsdClient.OK,
                                    tags=['database:postgresql'])
            
        except Exception as e:
            logger.error(f"Database health check failed: {e}")
            self.statsd.service_check('database.health', 
                                    DogStatsdClient.CRITICAL,
                                    tags=['database:postgresql'])
    
    def check_redis_health(self):
        """Check Redis connection and performance."""
        try:
            redis_client = redis.Redis.from_url(settings.REDIS_URL)
            
            start_time = time.time()
            redis_client.ping()
            response_time = (time.time() - start_time) * 1000
            
            self.statsd.timing('redis.response_time', response_time)
            
            # Check memory usage
            info = redis_client.info('memory')
            memory_usage = info['used_memory']
            max_memory = info.get('maxmemory', 0)
            
            self.statsd.gauge('redis.memory_used', memory_usage)
            
            if max_memory > 0:
                memory_percentage = (memory_usage / max_memory) * 100
                self.statsd.gauge('redis.memory_percentage', memory_percentage)
            
            self.statsd.service_check('redis.health', 
                                    DogStatsdClient.OK,
                                    tags=['cache:redis'])
            
        except Exception as e:
            logger.error(f"Redis health check failed: {e}")
            self.statsd.service_check('redis.health', 
                                    DogStatsdClient.CRITICAL,
                                    tags=['cache:redis'])
    
    def check_celery_health(self):
        """Check Celery worker health."""
        try:
            from celery import current_app
            
            # Check active workers
            inspect = current_app.control.inspect()
            stats = inspect.stats()
            
            if stats:
                worker_count = len(stats)
                self.statsd.gauge('celery.active_workers', worker_count)
                
                # Check queue lengths
                active_queues = inspect.active_queues()
                for worker, queues in active_queues.items():
                    for queue_info in queues:
                        queue_name = queue_info['name']
                        # This would require additional queue length checking
                        # Implementation depends on broker (Redis/RabbitMQ)
                
                self.statsd.service_check('celery.health', 
                                        DogStatsdClient.OK,
                                        tags=['service:celery'])
            else:
                self.statsd.service_check('celery.health', 
                                        DogStatsdClient.CRITICAL,
                                        tags=['service:celery'])
                
        except Exception as e:
            logger.error(f"Celery health check failed: {e}")
            self.statsd.service_check('celery.health', 
                                    DogStatsdClient.CRITICAL,
                                    tags=['service:celery'])

Comprehensive Logging Strategy

Structured Logging Configuration

# settings/logging.py
import os
import sys
from pythonjsonlogger import jsonlogger

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'json': {
            '()': jsonlogger.JsonFormatter,
            'format': '%(asctime)s %(name)s %(levelname)s %(pathname)s %(funcName)s %(lineno)d %(message)s'
        },
        'verbose': {
            'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
            'style': '{',
        },
        'simple': {
            'format': '{levelname} {message}',
            'style': '{',
        },
    },
    'handlers': {
        'console': {
            'level': 'INFO',
            'class': 'logging.StreamHandler',
            'formatter': 'json',
            'stream': sys.stdout,
        },
        'file': {
            'level': 'INFO',
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': '/var/log/jobhive/application.log',
            'maxBytes': 50 * 1024 * 1024,  # 50MB
            'backupCount': 10,
            'formatter': 'json',
        },
        'security': {
            'level': 'WARNING',
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': '/var/log/jobhive/security.log',
            'maxBytes': 50 * 1024 * 1024,  # 50MB
            'backupCount': 20,
            'formatter': 'json',
        },
        'datadog': {
            'level': 'INFO',
            'class': 'datadog.DogStatsdLogHandler',
            'formatter': 'json',
        }
    },
    'loggers': {
        'django': {
            'handlers': ['console', 'file'],
            'level': 'INFO',
            'propagate': False,
        },
        'jobhive': {
            'handlers': ['console', 'file', 'datadog'],
            'level': 'INFO',
            'propagate': False,
        },
        'jobhive.security': {
            'handlers': ['console', 'security', 'datadog'],
            'level': 'WARNING',
            'propagate': False,
        },
        'jobhive.ai': {
            'handlers': ['console', 'file', 'datadog'],
            'level': 'INFO',
            'propagate': False,
        },
        'celery': {
            'handlers': ['console', 'file'],
            'level': 'INFO',
            'propagate': False,
        },
    },
    'root': {
        'handlers': ['console'],
        'level': 'WARNING',
    }
}

Application Event Logging

import logging
import json
from datetime import datetime
from django.contrib.auth import get_user_model

logger = logging.getLogger('jobhive')
security_logger = logging.getLogger('jobhive.security')
ai_logger = logging.getLogger('jobhive.ai')

class EventLogger:
    @staticmethod
    def log_user_action(user, action, resource=None, details=None):
        """Log user actions for audit trail."""
        log_data = {
            'event_type': 'user_action',
            'user_id': user.id if user else None,
            'user_email': user.email if user else None,
            'user_role': user.role if user else None,
            'action': action,
            'resource': resource,
            'details': details or {},
            'timestamp': datetime.utcnow().isoformat(),
            'ip_address': getattr(user, 'current_ip', None)
        }
        
        logger.info('User action', extra=log_data)
    
    @staticmethod
    def log_interview_event(interview_session, event_type, details=None):
        """Log interview-related events."""
        log_data = {
            'event_type': 'interview_event',
            'interview_session_id': str(interview_session.session_id),
            'user_id': interview_session.user.id,
            'job_id': interview_session.job.id if interview_session.job else None,
            'interview_type': 'practice' if interview_session.is_practice else 'real',
            'event': event_type,
            'details': details or {},
            'timestamp': datetime.utcnow().isoformat()
        }
        
        logger.info('Interview event', extra=log_data)
    
    @staticmethod
    def log_ai_processing(component, processing_time, accuracy=None, error=None):
        """Log AI processing events."""
        log_data = {
            'event_type': 'ai_processing',
            'component': component,
            'processing_time_ms': processing_time * 1000,
            'accuracy': accuracy,
            'error': str(error) if error else None,
            'timestamp': datetime.utcnow().isoformat()
        }
        
        if error:
            ai_logger.error('AI processing error', extra=log_data)
        else:
            ai_logger.info('AI processing completed', extra=log_data)
    
    @staticmethod
    def log_security_event(event_type, severity, details=None, user=None):
        """Log security-related events."""
        log_data = {
            'event_type': 'security_event',
            'security_event_type': event_type,
            'severity': severity,
            'user_id': user.id if user else None,
            'user_email': user.email if user else None,
            'details': details or {},
            'timestamp': datetime.utcnow().isoformat()
        }
        
        if severity in ['high', 'critical']:
            security_logger.critical('Security event', extra=log_data)
        elif severity == 'medium':
            security_logger.warning('Security event', extra=log_data)
        else:
            security_logger.info('Security event', extra=log_data)

# Usage examples in views and services
class InterviewSessionViewSet(ModelViewSet):
    def create(self, request):
        try:
            interview_session = self.perform_create(serializer)
            
            # Log successful creation
            EventLogger.log_user_action(
                user=request.user,
                action='create_interview_session',
                resource=f'interview_session:{interview_session.session_id}',
                details={'is_practice': interview_session.is_practice}
            )
            
            EventLogger.log_interview_event(
                interview_session=interview_session,
                event_type='session_created'
            )
            
        except Exception as e:
            # Log error
            EventLogger.log_user_action(
                user=request.user,
                action='create_interview_session_failed',
                details={'error': str(e)}
            )
            raise

Error Tracking and Alerting

import sentry_sdk
from sentry_sdk.integrations.django import DjangoIntegration
from sentry_sdk.integrations.celery import CeleryIntegration
from sentry_sdk.integrations.redis import RedisIntegration

# Sentry configuration for error tracking
sentry_sdk.init(
    dsn=settings.SENTRY_DSN,
    integrations=[
        DjangoIntegration(
            transaction_style='url',
            middleware_spans=True,
            signals_spans=True,
            cache_spans=True,
        ),
        CeleryIntegration(
            monitor_beat_tasks=True,
            propagate_traces=True,
        ),
        RedisIntegration(),
    ],
    traces_sample_rate=0.1,  # Capture 10% of transactions for performance monitoring
    send_default_pii=False,  # Don't send personally identifiable information
    environment=settings.ENVIRONMENT,
    release=settings.VERSION,
    before_send=before_send_filter,
)

def before_send_filter(event, hint):
    """Filter sensitive data before sending to Sentry."""
    
    # Remove sensitive keys from extra data
    sensitive_keys = ['password', 'token', 'secret', 'key', 'authorization']
    
    if 'extra' in event:
        for key in list(event['extra'].keys()):
            if any(sensitive in key.lower() for sensitive in sensitive_keys):
                event['extra'][key] = '[Filtered]'
    
    # Filter request data
    if 'request' in event and 'data' in event['request']:
        data = event['request']['data']
        if isinstance(data, dict):
            for key in list(data.keys()):
                if any(sensitive in key.lower() for sensitive in sensitive_keys):
                    data[key] = '[Filtered]'
    
    return event

class ErrorTracker:
    @staticmethod
    def track_error(error, context=None, user=None):
        """Track application errors with context."""
        
        with sentry_sdk.push_scope() as scope:
            # Add user context
            if user:
                scope.user = {
                    'id': user.id,
                    'email': user.email,
                    'role': user.role
                }
            
            # Add custom context
            if context:
                for key, value in context.items():
                    scope.set_context(key, value)
            
            # Capture exception
            sentry_sdk.capture_exception(error)
    
    @staticmethod
    def track_performance_issue(transaction_name, duration, context=None):
        """Track performance issues."""
        
        with sentry_sdk.start_transaction(
            op="performance_issue",
            name=transaction_name
        ) as transaction:
            
            transaction.set_tag("duration_ms", duration * 1000)
            
            if context:
                for key, value in context.items():
                    transaction.set_tag(key, value)
            
            # Mark as performance issue if duration exceeds threshold
            if duration > 5.0:  # 5 seconds
                sentry_sdk.capture_message(
                    f"Performance issue: {transaction_name} took {duration:.2f}s",
                    level="warning"
                )

Real-Time Monitoring Dashboards

Custom DataDog Dashboards

from datadog_api_client import ApiClient, Configuration
from datadog_api_client.v1.api.dashboards_api import DashboardsApi
from datadog_api_client.v1.model.dashboard import Dashboard

class DashboardManager:
    def __init__(self):
        configuration = Configuration()
        configuration.api_key['apikey'] = settings.DATADOG_API_KEY
        configuration.api_key['appkey'] = settings.DATADOG_APP_KEY
        
        self.api_client = ApiClient(configuration)
        self.dashboards_api = DashboardsApi(self.api_client)
    
    def create_business_dashboard(self):
        """Create business metrics dashboard."""
        
        dashboard_definition = {
            "title": "JobHive Business Metrics",
            "description": "Key business metrics and KPIs",
            "widgets": [
                {
                    "definition": {
                        "type": "timeseries",
                        "requests": [
                            {
                                "q": "sum:jobhive.business.user.daily_active{*}",
                                "display_type": "line"
                            }
                        ],
                        "title": "Daily Active Users"
                    }
                },
                {
                    "definition": {
                        "type": "query_value",
                        "requests": [
                            {
                                "q": "sum:jobhive.business.subscription.mrr{*}",
                                "aggregator": "last"
                            }
                        ],
                        "title": "Monthly Recurring Revenue"
                    }
                },
                {
                    "definition": {
                        "type": "timeseries",
                        "requests": [
                            {
                                "q": "sum:jobhive.business.interview.completed{*} by {interview_type}",
                                "display_type": "bars"
                            }
                        ],
                        "title": "Interview Completions"
                    }
                }
            ],
            "layout_type": "ordered"
        }
        
        dashboard = Dashboard(**dashboard_definition)
        result = self.dashboards_api.create_dashboard(dashboard)
        
        return result
    
    def create_technical_dashboard(self):
        """Create technical monitoring dashboard."""
        
        dashboard_definition = {
            "title": "JobHive Technical Metrics",
            "description": "System performance and health metrics",
            "widgets": [
                {
                    "definition": {
                        "type": "timeseries",
                        "requests": [
                            {
                                "q": "avg:trace.django.request.duration{service:jobhive-api}",
                                "display_type": "line"
                            }
                        ],
                        "title": "API Response Time"
                    }
                },
                {
                    "definition": {
                        "type": "timeseries",
                        "requests": [
                            {
                                "q": "sum:trace.django.request.hits{service:jobhive-api} by {http.status_code}",
                                "display_type": "bars"
                            }
                        ],
                        "title": "Request Volume by Status Code"
                    }
                },
                {
                    "definition": {
                        "type": "timeseries",
                        "requests": [
                            {
                                "q": "avg:postgresql.connections{service:jobhive-db}",
                                "display_type": "line"
                            }
                        ],
                        "title": "Database Connections"
                    }
                }
            ],
            "layout_type": "ordered"
        }
        
        dashboard = Dashboard(**dashboard_definition)
        result = self.dashboards_api.create_dashboard(dashboard)
        
        return result

Alert Configuration

from datadog_api_client.v1.api.monitors_api import MonitorsApi
from datadog_api_client.v1.model.monitor import Monitor

class AlertingManager:
    def __init__(self):
        configuration = Configuration()
        configuration.api_key['apikey'] = settings.DATADOG_API_KEY
        configuration.api_key['appkey'] = settings.DATADOG_APP_KEY
        
        self.api_client = ApiClient(configuration)
        self.monitors_api = MonitorsApi(self.api_client)
    
    def create_performance_alerts(self):
        """Create performance-related alerts."""
        
        alerts = [
            {
                "name": "High API Response Time",
                "query": "avg(last_5m):avg:trace.django.request.duration{service:jobhive-api} > 2",
                "message": "API response time is above 2 seconds. @pagerduty-jobhive",
                "type": "metric alert",
                "options": {
                    "thresholds": {
                        "critical": 2.0,
                        "warning": 1.5
                    },
                    "notify_audit": False,
                    "require_full_window": True,
                    "notify_no_data": True,
                    "no_data_timeframe": 10
                }
            },
            {
                "name": "High Error Rate",
                "query": "sum(last_5m):sum:trace.django.request.errors{service:jobhive-api}.as_rate() > 0.05",
                "message": "Error rate is above 5%. @slack-alerts @pagerduty-jobhive",
                "type": "metric alert",
                "options": {
                    "thresholds": {
                        "critical": 0.05,
                        "warning": 0.02
                    }
                }
            },
            {
                "name": "Database Connection Pool Exhaustion",
                "query": "avg(last_2m):avg:postgresql.connections{service:jobhive-db} > 180",
                "message": "Database connection pool is nearly exhausted. @pagerduty-jobhive",
                "type": "metric alert",
                "options": {
                    "thresholds": {
                        "critical": 180,
                        "warning": 150
                    }
                }
            }
        ]
        
        created_monitors = []
        for alert_config in alerts:
            monitor = Monitor(**alert_config)
            result = self.monitors_api.create_monitor(monitor)
            created_monitors.append(result)
        
        return created_monitors

Log Analysis and Insights

Log Processing Pipeline

import re
import json
from collections import defaultdict, Counter
from datetime import datetime, timedelta

class LogAnalyzer:
    def __init__(self):
        self.log_patterns = {
            'api_request': re.compile(r'HTTP_(\d{3}).*?"(\w+)\s([^"]+)"'),
            'user_action': re.compile(r'event_type.*?user_action.*?user_id.*?(\d+).*?action.*?"([^"]+)"'),
            'error': re.compile(r'ERROR.*?(\w+Error).*?([^\n]+)'),
            'performance': re.compile(r'processing_time_ms.*?(\d+(?:\.\d+)?)')
        }
    
    def analyze_request_patterns(self, log_file_path, hours=24):
        """Analyze API request patterns."""
        
        request_stats = defaultdict(lambda: defaultdict(int))
        error_patterns = Counter()
        performance_stats = []
        
        cutoff_time = datetime.now() - timedelta(hours=hours)
        
        with open(log_file_path, 'r') as log_file:
            for line in log_file:
                try:
                    log_entry = json.loads(line)
                    timestamp = datetime.fromisoformat(log_entry.get('asctime', ''))
                    
                    if timestamp < cutoff_time:
                        continue
                    
                    # Analyze API requests
                    if 'HTTP' in line and 'django.request' in line:
                        method = log_entry.get('method', 'UNKNOWN')
                        status_code = log_entry.get('status_code', 0)
                        path = log_entry.get('pathname', 'unknown')
                        
                        request_stats[f"{method} {path}"][status_code] += 1
                    
                    # Analyze errors
                    if log_entry.get('levelname') == 'ERROR':
                        error_type = log_entry.get('exc_info', {}).get('type', 'UnknownError')
                        error_patterns[error_type] += 1
                    
                    # Analyze performance
                    if 'processing_time_ms' in line:
                        processing_time = log_entry.get('processing_time_ms', 0)
                        component = log_entry.get('component', 'unknown')
                        performance_stats.append({
                            'component': component,
                            'processing_time': processing_time,
                            'timestamp': timestamp
                        })
                
                except (json.JSONDecodeError, ValueError):
                    continue
        
        return {
            'request_patterns': dict(request_stats),
            'error_patterns': dict(error_patterns),
            'performance_stats': performance_stats
        }
    
    def generate_performance_report(self, performance_stats):
        """Generate performance analysis report."""
        
        component_stats = defaultdict(list)
        for stat in performance_stats:
            component_stats[stat['component']].append(stat['processing_time'])
        
        performance_report = {}
        for component, times in component_stats.items():
            if times:
                performance_report[component] = {
                    'avg_time': sum(times) / len(times),
                    'max_time': max(times),
                    'min_time': min(times),
                    'count': len(times),
                    'p95': sorted(times)[int(len(times) * 0.95)] if len(times) >= 20 else max(times)
                }
        
        return performance_report
    
    def detect_anomalies(self, metrics, window_hours=24):
        """Detect anomalies in metrics."""
        
        anomalies = []
        
        # Define thresholds
        thresholds = {
            'high_error_rate': 0.05,  # 5% error rate
            'slow_response_time': 2000,  # 2 seconds
            'high_failure_rate': 0.1  # 10% failure rate
        }
        
        # Check error rates
        total_requests = sum(sum(status_codes.values()) for status_codes in metrics['request_patterns'].values())
        error_requests = sum(
            count for endpoint_stats in metrics['request_patterns'].values()
            for status_code, count in endpoint_stats.items()
            if status_code >= 400
        )
        
        if total_requests > 0:
            error_rate = error_requests / total_requests
            if error_rate > thresholds['high_error_rate']:
                anomalies.append({
                    'type': 'high_error_rate',
                    'value': error_rate,
                    'threshold': thresholds['high_error_rate'],
                    'severity': 'high' if error_rate > 0.1 else 'medium'
                })
        
        # Check performance anomalies
        performance_report = self.generate_performance_report(metrics['performance_stats'])
        for component, stats in performance_report.items():
            if stats['p95'] > thresholds['slow_response_time']:
                anomalies.append({
                    'type': 'slow_response_time',
                    'component': component,
                    'value': stats['p95'],
                    'threshold': thresholds['slow_response_time'],
                    'severity': 'medium'
                })
        
        return anomalies

Automated Log Processing

from celery import shared_task
import boto3

@shared_task
def process_application_logs():
    """Process and analyze application logs periodically."""
    
    log_analyzer = LogAnalyzer()
    
    # Analyze logs from multiple sources
    log_files = [
        '/var/log/jobhive/application.log',
        '/var/log/jobhive/security.log'
    ]
    
    combined_analysis = {
        'request_patterns': defaultdict(lambda: defaultdict(int)),
        'error_patterns': Counter(),
        'performance_stats': []
    }
    
    for log_file in log_files:
        try:
            analysis = log_analyzer.analyze_request_patterns(log_file)
            
            # Combine results
            for endpoint, stats in analysis['request_patterns'].items():
                for status_code, count in stats.items():
                    combined_analysis['request_patterns'][endpoint][status_code] += count
            
            combined_analysis['error_patterns'].update(analysis['error_patterns'])
            combined_analysis['performance_stats'].extend(analysis['performance_stats'])
            
        except FileNotFoundError:
            continue
    
    # Detect anomalies
    anomalies = log_analyzer.detect_anomalies(combined_analysis)
    
    # Store analysis results
    LogAnalysisResult.objects.create(
        analysis_date=timezone.now().date(),
        request_patterns=dict(combined_analysis['request_patterns']),
        error_patterns=dict(combined_analysis['error_patterns']),
        performance_summary=log_analyzer.generate_performance_report(
            combined_analysis['performance_stats']
        ),
        anomalies=anomalies
    )
    
    # Send alerts for critical anomalies
    critical_anomalies = [a for a in anomalies if a.get('severity') == 'high']
    if critical_anomalies:
        send_anomaly_alerts.delay(critical_anomalies)
    
    return len(anomalies)

@shared_task
def send_anomaly_alerts(anomalies):
    """Send alerts for detected anomalies."""
    
    # Format alert message
    alert_message = "Critical anomalies detected:\n\n"
    
    for anomaly in anomalies:
        alert_message += f"- {anomaly['type']}: {anomaly['value']:.3f} (threshold: {anomaly['threshold']})\n"
    
    # Send to Slack/email/PagerDuty
    # Implementation depends on notification preferences
    pass
This comprehensive monitoring and logging system provides complete visibility into JobHive’s operations, enabling proactive issue detection, performance optimization, and reliable system maintenance.