Skip to main content

Analytics & Metrics

Overview

JobHive provides comprehensive analytics and metrics across business operations, user engagement, AI performance, and technical infrastructure. The analytics system supports data-driven decision making for both platform optimization and customer success.

Business Analytics Dashboard

Key Performance Indicators (KPIs)

User Engagement Metrics

# Core engagement metrics tracked
USER_ENGAGEMENT_METRICS = {
    'daily_active_users': 'DAU',
    'monthly_active_users': 'MAU', 
    'user_retention_rate': 'Percentage of users returning after 30 days',
    'session_duration': 'Average time spent per session',
    'interview_completion_rate': 'Percentage of started interviews completed',
    'feature_adoption_rate': 'Usage of key features by active users'
}

# Example analytics query
def get_user_engagement_metrics(start_date, end_date):
    return {
        'dau': User.objects.filter(
            last_login__range=[start_date, end_date]
        ).distinct().count(),
        
        'mau': User.objects.filter(
            last_login__range=[start_date - timedelta(days=30), end_date]
        ).distinct().count(),
        
        'avg_session_duration': InterviewSession.objects.filter(
            start_time__range=[start_date, end_date],
            status='completed'
        ).aggregate(
            avg_duration=Avg('duration')
        )['avg_duration'],
        
        'completion_rate': InterviewSession.objects.filter(
            start_time__range=[start_date, end_date]
        ).aggregate(
            completion_rate=Avg('completion_percentage')
        )['completion_rate']
    }

Revenue Metrics

REVENUE_METRICS = {
    'monthly_recurring_revenue': 'MRR from active subscriptions',
    'annual_recurring_revenue': 'ARR projection',
    'average_revenue_per_user': 'ARPU across all paying customers',
    'customer_lifetime_value': 'CLV calculation',
    'churn_rate': 'Monthly subscription cancellation rate',
    'conversion_rate': 'Free to paid conversion percentage'
}

def get_revenue_metrics(month, year):
    """Calculate key revenue metrics for a given month."""
    start_date = datetime(year, month, 1)
    end_date = start_date + relativedelta(months=1)
    
    # Active subscriptions
    active_subs = CustomerSubscription.objects.filter(
        status='active',
        current_period_start__lt=end_date,
        current_period_end__gte=start_date
    )
    
    # MRR calculation
    mrr = active_subs.aggregate(
        total=Sum(
            Case(
                When(plan__interval='year', then=F('plan__price') / 12),
                default=F('plan__price'),
                output_field=DecimalField()
            )
        )
    )['total'] or 0
    
    # ARPU calculation
    arpu = mrr / active_subs.count() if active_subs.count() > 0 else 0
    
    # Churn rate
    churned_subs = CustomerSubscription.objects.filter(
        status='canceled',
        updated_at__range=[start_date, end_date]
    ).count()
    
    total_subs_start = CustomerSubscription.objects.filter(
        created_at__lt=start_date,
        status__in=['active', 'canceled']
    ).count()
    
    churn_rate = (churned_subs / total_subs_start * 100) if total_subs_start > 0 else 0
    
    return {
        'mrr': float(mrr),
        'arr': float(mrr * 12),
        'arpu': float(arpu),
        'churn_rate': churn_rate,
        'active_subscriptions': active_subs.count()
    }

Interview Analytics

Interview Performance Metrics

class InterviewAnalyticsService:
    def get_interview_metrics(self, company_id=None, date_range=None):
        """Get comprehensive interview performance metrics."""
        
        queryset = InterviewSession.objects.all()
        
        if company_id:
            queryset = queryset.filter(job__company_id=company_id)
            
        if date_range:
            queryset = queryset.filter(start_time__range=date_range)
        
        # Basic interview metrics
        basic_metrics = queryset.aggregate(
            total_interviews=Count('id'),
            completed_interviews=Count('id', filter=Q(status='completed')),
            avg_duration=Avg('duration'),
            avg_completion_rate=Avg('completion_percentage'),
            avg_technical_score=Avg('technical_accuracy'),
            avg_behavioral_score=Avg('behavioral_score')
        )
        
        # Success rate by interview outcome
        success_metrics = self.calculate_success_rates(queryset)
        
        # Interviewer performance
        interviewer_metrics = self.get_interviewer_performance(queryset)
        
        # Candidate pipeline metrics
        pipeline_metrics = self.get_pipeline_metrics(queryset)
        
        return {
            'basic_metrics': basic_metrics,
            'success_metrics': success_metrics,
            'interviewer_metrics': interviewer_metrics,
            'pipeline_metrics': pipeline_metrics
        }
    
    def calculate_success_rates(self, queryset):
        """Calculate interview success rates and outcomes."""
        completed_interviews = queryset.filter(status='completed')
        
        # Define success criteria
        high_performers = completed_interviews.filter(
            completion_percentage__gte=80,
            technical_accuracy__gte=75
        )
        
        # Calculate conversion rates
        return {
            'completion_rate': (
                completed_interviews.count() / queryset.count() * 100
                if queryset.count() > 0 else 0
            ),
            'high_performer_rate': (
                high_performers.count() / completed_interviews.count() * 100
                if completed_interviews.count() > 0 else 0
            ),
            'average_scores': {
                'technical': completed_interviews.aggregate(
                    avg=Avg('technical_accuracy')
                )['avg'],
                'behavioral': completed_interviews.aggregate(
                    avg=Avg('behavioral_score')
                )['avg'],
                'overall': completed_interviews.aggregate(
                    avg=Avg('completion_percentage')
                )['avg']
            }
        }

Candidate Performance Analytics

class CandidateAnalyticsService:
    def get_candidate_performance_trends(self, user_id):
        """Track candidate improvement over time."""
        
        interviews = InterviewSession.objects.filter(
            user_id=user_id,
            status='completed'
        ).order_by('start_time')
        
        performance_trend = []
        for interview in interviews:
            # Get associated scores
            sentiment = SentimentSession.objects.filter(
                interview_session=interview
            ).first()
            
            skill_scores = SkillAssessment.objects.filter(
                interview_session=interview
            ).aggregate(
                avg_score=Avg('accumulated_score')
            )
            
            performance_trend.append({
                'date': interview.start_time.date(),
                'interview_id': interview.id,
                'overall_score': interview.completion_percentage,
                'technical_score': interview.technical_accuracy,
                'behavioral_score': interview.behavioral_score,
                'sentiment_score': sentiment.positive_score if sentiment else None,
                'skill_score': skill_scores['avg_score']
            })
        
        # Calculate improvement metrics
        improvement_analysis = self.calculate_improvement_metrics(performance_trend)
        
        return {
            'performance_history': performance_trend,
            'improvement_analysis': improvement_analysis,
            'strengths': self.identify_strengths(user_id),
            'areas_for_improvement': self.identify_improvement_areas(user_id)
        }
    
    def calculate_improvement_metrics(self, performance_trend):
        """Calculate candidate improvement over time."""
        if len(performance_trend) < 2:
            return {'insufficient_data': True}
        
        first_interview = performance_trend[0]
        latest_interview = performance_trend[-1]
        
        return {
            'overall_improvement': (
                latest_interview['overall_score'] - first_interview['overall_score']
            ),
            'technical_improvement': (
                latest_interview['technical_score'] - first_interview['technical_score']
            ),
            'behavioral_improvement': (
                latest_interview['behavioral_score'] - first_interview['behavioral_score']
            ),
            'total_interviews': len(performance_trend),
            'improvement_rate': self.calculate_improvement_rate(performance_trend)
        }

AI Performance Analytics

AI Model Performance Tracking

class AIPerformanceAnalytics:
    def __init__(self):
        self.metrics_collector = MetricsCollector()
        
    def track_sentiment_analysis_performance(self):
        """Track sentiment analysis accuracy and performance."""
        
        # Get recent sentiment analyses
        recent_analyses = SentimentSession.objects.filter(
            created_at__gte=timezone.now() - timedelta(days=30)
        )
        
        performance_metrics = {
            'total_analyses': recent_analyses.count(),
            'avg_processing_time': self.calculate_avg_processing_time(recent_analyses),
            'confidence_distribution': self.get_confidence_distribution(recent_analyses),
            'accuracy_metrics': self.calculate_accuracy_metrics(recent_analyses)
        }
        
        return performance_metrics
    
    def get_skill_assessment_accuracy(self):
        """Measure skill assessment prediction accuracy."""
        
        # Compare AI assessments with human validation
        validated_assessments = SkillAssessment.objects.filter(
            human_validated=True,
            created_at__gte=timezone.now() - timedelta(days=30)
        )
        
        accuracy_metrics = {
            'prediction_accuracy': self.calculate_prediction_accuracy(validated_assessments),
            'false_positive_rate': self.calculate_false_positive_rate(validated_assessments),
            'false_negative_rate': self.calculate_false_negative_rate(validated_assessments),
            'skill_category_accuracy': self.get_skill_category_accuracy(validated_assessments)
        }
        
        return accuracy_metrics
        
    def track_recommendation_effectiveness(self):
        """Measure effectiveness of AI-generated recommendations."""
        
        # Track user engagement with recommendations
        recommendations = Recommendation.objects.filter(
            created_at__gte=timezone.now() - timedelta(days=90)
        )
        
        effectiveness_metrics = {
            'recommendation_acceptance_rate': self.calculate_acceptance_rate(recommendations),
            'improvement_correlation': self.calculate_improvement_correlation(recommendations),
            'user_satisfaction': self.get_recommendation_satisfaction(recommendations)
        }
        
        return effectiveness_metrics

Real-Time AI Monitoring

class RealTimeAIMonitoring:
    def __init__(self):
        self.redis_client = redis.Redis()
        
    def track_live_processing_metrics(self, component, processing_time, accuracy):
        """Track real-time AI processing metrics."""
        
        # Store metrics in Redis for real-time dashboard
        timestamp = int(time.time())
        
        # Processing time tracking
        self.redis_client.zadd(
            f'ai_processing_times:{component}',
            {timestamp: processing_time}
        )
        
        # Accuracy tracking
        self.redis_client.zadd(
            f'ai_accuracy:{component}',
            {timestamp: accuracy}
        )
        
        # Keep only last 24 hours of data
        cutoff = timestamp - 86400
        self.redis_client.zremrangebyscore(
            f'ai_processing_times:{component}', '-inf', cutoff
        )
        self.redis_client.zremrangebyscore(
            f'ai_accuracy:{component}', '-inf', cutoff
        )
        
    def get_real_time_metrics(self, component):
        """Get current AI performance metrics."""
        
        # Get recent processing times
        recent_times = self.redis_client.zrange(
            f'ai_processing_times:{component}', -100, -1, withscores=True
        )
        
        # Get recent accuracy scores
        recent_accuracy = self.redis_client.zrange(
            f'ai_accuracy:{component}', -100, -1, withscores=True
        )
        
        if not recent_times or not recent_accuracy:
            return {'error': 'Insufficient data'}
        
        return {
            'avg_processing_time': sum(score for _, score in recent_times) / len(recent_times),
            'avg_accuracy': sum(score for _, score in recent_accuracy) / len(recent_accuracy),
            'processing_trend': self.calculate_trend(recent_times),
            'accuracy_trend': self.calculate_trend(recent_accuracy)
        }

User Behavior Analytics

Feature Usage Analytics

class FeatureUsageAnalytics:
    def get_feature_adoption_metrics(self, time_period='30d'):
        """Analyze feature adoption and usage patterns."""
        
        cutoff_date = timezone.now() - self.parse_time_period(time_period)
        
        # Core feature usage
        feature_metrics = {
            'interview_practice': self.get_practice_usage(cutoff_date),
            'ai_feedback': self.get_ai_feedback_usage(cutoff_date),
            'learning_paths': self.get_learning_path_usage(cutoff_date),
            'analytics_dashboard': self.get_dashboard_usage(cutoff_date),
            'skill_assessments': self.get_skill_assessment_usage(cutoff_date)
        }
        
        return feature_metrics
    
    def get_practice_usage(self, cutoff_date):
        """Analyze interview practice feature usage."""
        
        practice_sessions = InterviewSession.objects.filter(
            is_practice=True,
            created_at__gte=cutoff_date
        )
        
        total_users = User.objects.filter(
            last_login__gte=cutoff_date
        ).count()
        
        return {
            'total_practice_sessions': practice_sessions.count(),
            'unique_users': practice_sessions.values('user_id').distinct().count(),
            'adoption_rate': (
                practice_sessions.values('user_id').distinct().count() / total_users * 100
                if total_users > 0 else 0
            ),
            'avg_sessions_per_user': (
                practice_sessions.count() / 
                practice_sessions.values('user_id').distinct().count()
                if practice_sessions.values('user_id').distinct().count() > 0 else 0
            ),
            'completion_rate': practice_sessions.filter(
                status='completed'
            ).count() / practice_sessions.count() * 100 if practice_sessions.count() > 0 else 0
        }

User Journey Analytics

class UserJourneyAnalytics:
    def analyze_user_funnel(self):
        """Analyze user conversion funnel."""
        
        # Define funnel stages
        funnel_stages = {
            'registration': User.objects.all().count(),
            'email_verification': User.objects.filter(is_verified=True).count(),
            'first_interview': User.objects.filter(
                interviewsession__isnull=False
            ).distinct().count(),
            'completed_interview': User.objects.filter(
                interviewsession__status='completed'
            ).distinct().count(),
            'subscription': User.objects.filter(
                subscription__status='active'
            ).distinct().count()
        }
        
        # Calculate conversion rates
        conversion_rates = {}
        stages = list(funnel_stages.keys())
        
        for i in range(1, len(stages)):
            current_stage = stages[i]
            previous_stage = stages[i-1]
            
            conversion_rates[f'{previous_stage}_to_{current_stage}'] = (
                funnel_stages[current_stage] / funnel_stages[previous_stage] * 100
                if funnel_stages[previous_stage] > 0 else 0
            )
        
        return {
            'funnel_stages': funnel_stages,
            'conversion_rates': conversion_rates,
            'overall_conversion': (
                funnel_stages['subscription'] / funnel_stages['registration'] * 100
                if funnel_stages['registration'] > 0 else 0
            )
        }
    
    def get_user_retention_cohorts(self):
        """Analyze user retention by registration cohorts."""
        
        # Group users by registration month
        cohorts = User.objects.extra(
            select={'cohort_month': "DATE_TRUNC('month', date_joined)"}
        ).values('cohort_month').annotate(
            total_users=Count('id')
        ).order_by('cohort_month')
        
        retention_data = []
        
        for cohort in cohorts:
            cohort_users = User.objects.filter(
                date_joined__month=cohort['cohort_month'].month,
                date_joined__year=cohort['cohort_month'].year
            )
            
            # Calculate retention for each month after registration
            monthly_retention = []
            for month_offset in range(1, 13):  # 12 months
                retention_date = cohort['cohort_month'] + relativedelta(months=month_offset)
                
                active_users = cohort_users.filter(
                    last_login__gte=retention_date,
                    last_login__lt=retention_date + relativedelta(months=1)
                ).count()
                
                retention_rate = (
                    active_users / cohort['total_users'] * 100
                    if cohort['total_users'] > 0 else 0
                )
                
                monthly_retention.append({
                    'month': month_offset,
                    'active_users': active_users,
                    'retention_rate': retention_rate
                })
            
            retention_data.append({
                'cohort_month': cohort['cohort_month'],
                'total_users': cohort['total_users'],
                'retention_by_month': monthly_retention
            })
        
        return retention_data

Technical Performance Metrics

System Performance Analytics

class SystemPerformanceAnalytics:
    def __init__(self):
        self.datadog_client = DatadogAPIClient()
        
    def get_application_performance_metrics(self, time_range='1h'):
        """Get application performance metrics from DataDog."""
        
        metrics = {
            'response_time': self.get_response_time_metrics(time_range),
            'throughput': self.get_throughput_metrics(time_range),
            'error_rate': self.get_error_rate_metrics(time_range),
            'database_performance': self.get_database_metrics(time_range),
            'cache_performance': self.get_cache_metrics(time_range)
        }
        
        return metrics
    
    def get_response_time_metrics(self, time_range):
        """Get API response time metrics."""
        
        query = f"""
        avg:trace.django.request.duration{{service:jobhive-api}} by {{resource_name}}
        """
        
        response_times = self.datadog_client.query_timeseries(
            query=query,
            from_time=time_range
        )
        
        return {
            'average_response_time': self.calculate_average(response_times),
            'p95_response_time': self.calculate_percentile(response_times, 95),
            'p99_response_time': self.calculate_percentile(response_times, 99),
            'slowest_endpoints': self.identify_slowest_endpoints(response_times)
        }
    
    def get_database_metrics(self, time_range):
        """Get database performance metrics."""
        
        db_metrics = {
            'connection_count': self.datadog_client.query_timeseries(
                'avg:postgresql.connections{host:jobhive-db}',
                time_range
            ),
            'query_duration': self.datadog_client.query_timeseries(
                'avg:postgresql.query_duration{host:jobhive-db}',
                time_range
            ),
            'cpu_usage': self.datadog_client.query_timeseries(
                'avg:system.cpu.user{host:jobhive-db}',
                time_range
            ),
            'memory_usage': self.datadog_client.query_timeseries(
                'avg:system.mem.used{host:jobhive-db}',
                time_range
            )
        }
        
        return db_metrics

Infrastructure Cost Analytics

class CostAnalytics:
    def __init__(self):
        self.aws_client = boto3.client('ce')  # Cost Explorer
        
    def get_cost_breakdown(self, time_period='MONTHLY'):
        """Get AWS cost breakdown by service."""
        
        end_date = datetime.now().date()
        start_date = end_date - relativedelta(months=1)
        
        response = self.aws_client.get_cost_and_usage(
            TimePeriod={
                'Start': start_date.strftime('%Y-%m-%d'),
                'End': end_date.strftime('%Y-%m-%d')
            },
            Granularity=time_period,
            Metrics=['BlendedCost'],
            GroupBy=[
                {
                    'Type': 'DIMENSION',
                    'Key': 'SERVICE'
                }
            ]
        )
        
        cost_breakdown = {}
        total_cost = 0
        
        for result in response['ResultsByTime']:
            for group in result['Groups']:
                service = group['Keys'][0]
                cost = float(group['Metrics']['BlendedCost']['Amount'])
                cost_breakdown[service] = cost
                total_cost += cost
        
        # Calculate cost per feature/metric
        feature_costs = self.allocate_costs_to_features(cost_breakdown)
        
        return {
            'total_cost': total_cost,
            'cost_by_service': cost_breakdown,
            'cost_by_feature': feature_costs,
            'cost_trends': self.get_cost_trends(),
            'optimization_recommendations': self.get_cost_optimization_recommendations()
        }
    
    def calculate_cost_per_user(self):
        """Calculate cost per active user."""
        
        monthly_cost = self.get_cost_breakdown()['total_cost']
        monthly_active_users = User.objects.filter(
            last_login__gte=timezone.now() - timedelta(days=30)
        ).count()
        
        return {
            'cost_per_user': monthly_cost / monthly_active_users if monthly_active_users > 0 else 0,
            'monthly_cost': monthly_cost,
            'monthly_active_users': monthly_active_users
        }

Analytics Dashboard APIs

Real-Time Dashboard Endpoints

class AnalyticsDashboardAPI:
    @require_http_methods(['GET'])
    def get_real_time_metrics(request):
        """Get real-time platform metrics for dashboard."""
        
        # Current active sessions
        active_sessions = InterviewSession.objects.filter(
            status='in_progress',
            start_time__gte=timezone.now() - timedelta(hours=1)
        ).count()
        
        # Recent completions (last hour)
        recent_completions = InterviewSession.objects.filter(
            status='completed',
            end_time__gte=timezone.now() - timedelta(hours=1)
        ).count()
        
        # Average scores (last 24 hours)
        recent_scores = InterviewSession.objects.filter(
            status='completed',
            end_time__gte=timezone.now() - timedelta(days=1)
        ).aggregate(
            avg_technical=Avg('technical_accuracy'),
            avg_behavioral=Avg('behavioral_score'),
            avg_completion=Avg('completion_percentage')
        )
        
        # AI processing metrics
        ai_metrics = RealTimeAIMonitoring().get_real_time_metrics('sentiment')
        
        return JsonResponse({
            'active_interviews': active_sessions,
            'recent_completions': recent_completions,
            'average_scores': recent_scores,
            'ai_performance': ai_metrics,
            'timestamp': timezone.now().isoformat()
        })
    
    @require_http_methods(['GET'])
    def get_business_metrics(request):
        """Get key business metrics."""
        
        # Revenue metrics
        revenue_metrics = get_revenue_metrics(
            month=timezone.now().month,
            year=timezone.now().year
        )
        
        # User engagement
        engagement_metrics = get_user_engagement_metrics(
            start_date=timezone.now() - timedelta(days=30),
            end_date=timezone.now()
        )
        
        # Feature adoption
        feature_analytics = FeatureUsageAnalytics()
        feature_metrics = feature_analytics.get_feature_adoption_metrics()
        
        return JsonResponse({
            'revenue': revenue_metrics,
            'engagement': engagement_metrics,
            'features': feature_metrics,
            'generated_at': timezone.now().isoformat()
        })

Analytics Export and Reporting

Automated Reporting

class AnalyticsReportGenerator:
    def generate_weekly_report(self):
        """Generate comprehensive weekly analytics report."""
        
        end_date = timezone.now()
        start_date = end_date - timedelta(days=7)
        
        report_data = {
            'period': f"{start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}",
            'user_metrics': get_user_engagement_metrics(start_date, end_date),
            'interview_metrics': InterviewAnalyticsService().get_interview_metrics(
                date_range=[start_date, end_date]
            ),
            'ai_performance': AIPerformanceAnalytics().track_sentiment_analysis_performance(),
            'revenue_metrics': get_revenue_metrics(end_date.month, end_date.year),
            'system_performance': SystemPerformanceAnalytics().get_application_performance_metrics('7d')
        }
        
        # Generate PDF report
        report_pdf = self.create_pdf_report(report_data)
        
        # Send to stakeholders
        self.send_report_email(report_pdf, report_data)
        
        return report_data
    
    def create_pdf_report(self, data):
        """Create PDF report from analytics data."""
        # Implementation for PDF generation
        pass
    
    def send_report_email(self, pdf_attachment, data):
        """Send analytics report via email."""
        # Implementation for email sending
        pass
This comprehensive analytics system provides JobHive with deep insights into business performance, user behavior, AI effectiveness, and technical metrics, enabling data-driven optimization and growth strategies.