Analytics & Metrics
Overview
JobHive provides comprehensive analytics and metrics across business operations, user engagement, AI performance, and technical infrastructure. The analytics system supports data-driven decision making for both platform optimization and customer success.Business Analytics Dashboard
Key Performance Indicators (KPIs)
User Engagement Metrics
Copy
# Core engagement metrics tracked
USER_ENGAGEMENT_METRICS = {
'daily_active_users': 'DAU',
'monthly_active_users': 'MAU',
'user_retention_rate': 'Percentage of users returning after 30 days',
'session_duration': 'Average time spent per session',
'interview_completion_rate': 'Percentage of started interviews completed',
'feature_adoption_rate': 'Usage of key features by active users'
}
# Example analytics query
def get_user_engagement_metrics(start_date, end_date):
return {
'dau': User.objects.filter(
last_login__range=[start_date, end_date]
).distinct().count(),
'mau': User.objects.filter(
last_login__range=[start_date - timedelta(days=30), end_date]
).distinct().count(),
'avg_session_duration': InterviewSession.objects.filter(
start_time__range=[start_date, end_date],
status='completed'
).aggregate(
avg_duration=Avg('duration')
)['avg_duration'],
'completion_rate': InterviewSession.objects.filter(
start_time__range=[start_date, end_date]
).aggregate(
completion_rate=Avg('completion_percentage')
)['completion_rate']
}
Revenue Metrics
Copy
REVENUE_METRICS = {
'monthly_recurring_revenue': 'MRR from active subscriptions',
'annual_recurring_revenue': 'ARR projection',
'average_revenue_per_user': 'ARPU across all paying customers',
'customer_lifetime_value': 'CLV calculation',
'churn_rate': 'Monthly subscription cancellation rate',
'conversion_rate': 'Free to paid conversion percentage'
}
def get_revenue_metrics(month, year):
"""Calculate key revenue metrics for a given month."""
start_date = datetime(year, month, 1)
end_date = start_date + relativedelta(months=1)
# Active subscriptions
active_subs = CustomerSubscription.objects.filter(
status='active',
current_period_start__lt=end_date,
current_period_end__gte=start_date
)
# MRR calculation
mrr = active_subs.aggregate(
total=Sum(
Case(
When(plan__interval='year', then=F('plan__price') / 12),
default=F('plan__price'),
output_field=DecimalField()
)
)
)['total'] or 0
# ARPU calculation
arpu = mrr / active_subs.count() if active_subs.count() > 0 else 0
# Churn rate
churned_subs = CustomerSubscription.objects.filter(
status='canceled',
updated_at__range=[start_date, end_date]
).count()
total_subs_start = CustomerSubscription.objects.filter(
created_at__lt=start_date,
status__in=['active', 'canceled']
).count()
churn_rate = (churned_subs / total_subs_start * 100) if total_subs_start > 0 else 0
return {
'mrr': float(mrr),
'arr': float(mrr * 12),
'arpu': float(arpu),
'churn_rate': churn_rate,
'active_subscriptions': active_subs.count()
}
Interview Analytics
Interview Performance Metrics
Copy
class InterviewAnalyticsService:
def get_interview_metrics(self, company_id=None, date_range=None):
"""Get comprehensive interview performance metrics."""
queryset = InterviewSession.objects.all()
if company_id:
queryset = queryset.filter(job__company_id=company_id)
if date_range:
queryset = queryset.filter(start_time__range=date_range)
# Basic interview metrics
basic_metrics = queryset.aggregate(
total_interviews=Count('id'),
completed_interviews=Count('id', filter=Q(status='completed')),
avg_duration=Avg('duration'),
avg_completion_rate=Avg('completion_percentage'),
avg_technical_score=Avg('technical_accuracy'),
avg_behavioral_score=Avg('behavioral_score')
)
# Success rate by interview outcome
success_metrics = self.calculate_success_rates(queryset)
# Interviewer performance
interviewer_metrics = self.get_interviewer_performance(queryset)
# Candidate pipeline metrics
pipeline_metrics = self.get_pipeline_metrics(queryset)
return {
'basic_metrics': basic_metrics,
'success_metrics': success_metrics,
'interviewer_metrics': interviewer_metrics,
'pipeline_metrics': pipeline_metrics
}
def calculate_success_rates(self, queryset):
"""Calculate interview success rates and outcomes."""
completed_interviews = queryset.filter(status='completed')
# Define success criteria
high_performers = completed_interviews.filter(
completion_percentage__gte=80,
technical_accuracy__gte=75
)
# Calculate conversion rates
return {
'completion_rate': (
completed_interviews.count() / queryset.count() * 100
if queryset.count() > 0 else 0
),
'high_performer_rate': (
high_performers.count() / completed_interviews.count() * 100
if completed_interviews.count() > 0 else 0
),
'average_scores': {
'technical': completed_interviews.aggregate(
avg=Avg('technical_accuracy')
)['avg'],
'behavioral': completed_interviews.aggregate(
avg=Avg('behavioral_score')
)['avg'],
'overall': completed_interviews.aggregate(
avg=Avg('completion_percentage')
)['avg']
}
}
Candidate Performance Analytics
Copy
class CandidateAnalyticsService:
def get_candidate_performance_trends(self, user_id):
"""Track candidate improvement over time."""
interviews = InterviewSession.objects.filter(
user_id=user_id,
status='completed'
).order_by('start_time')
performance_trend = []
for interview in interviews:
# Get associated scores
sentiment = SentimentSession.objects.filter(
interview_session=interview
).first()
skill_scores = SkillAssessment.objects.filter(
interview_session=interview
).aggregate(
avg_score=Avg('accumulated_score')
)
performance_trend.append({
'date': interview.start_time.date(),
'interview_id': interview.id,
'overall_score': interview.completion_percentage,
'technical_score': interview.technical_accuracy,
'behavioral_score': interview.behavioral_score,
'sentiment_score': sentiment.positive_score if sentiment else None,
'skill_score': skill_scores['avg_score']
})
# Calculate improvement metrics
improvement_analysis = self.calculate_improvement_metrics(performance_trend)
return {
'performance_history': performance_trend,
'improvement_analysis': improvement_analysis,
'strengths': self.identify_strengths(user_id),
'areas_for_improvement': self.identify_improvement_areas(user_id)
}
def calculate_improvement_metrics(self, performance_trend):
"""Calculate candidate improvement over time."""
if len(performance_trend) < 2:
return {'insufficient_data': True}
first_interview = performance_trend[0]
latest_interview = performance_trend[-1]
return {
'overall_improvement': (
latest_interview['overall_score'] - first_interview['overall_score']
),
'technical_improvement': (
latest_interview['technical_score'] - first_interview['technical_score']
),
'behavioral_improvement': (
latest_interview['behavioral_score'] - first_interview['behavioral_score']
),
'total_interviews': len(performance_trend),
'improvement_rate': self.calculate_improvement_rate(performance_trend)
}
AI Performance Analytics
AI Model Performance Tracking
Copy
class AIPerformanceAnalytics:
def __init__(self):
self.metrics_collector = MetricsCollector()
def track_sentiment_analysis_performance(self):
"""Track sentiment analysis accuracy and performance."""
# Get recent sentiment analyses
recent_analyses = SentimentSession.objects.filter(
created_at__gte=timezone.now() - timedelta(days=30)
)
performance_metrics = {
'total_analyses': recent_analyses.count(),
'avg_processing_time': self.calculate_avg_processing_time(recent_analyses),
'confidence_distribution': self.get_confidence_distribution(recent_analyses),
'accuracy_metrics': self.calculate_accuracy_metrics(recent_analyses)
}
return performance_metrics
def get_skill_assessment_accuracy(self):
"""Measure skill assessment prediction accuracy."""
# Compare AI assessments with human validation
validated_assessments = SkillAssessment.objects.filter(
human_validated=True,
created_at__gte=timezone.now() - timedelta(days=30)
)
accuracy_metrics = {
'prediction_accuracy': self.calculate_prediction_accuracy(validated_assessments),
'false_positive_rate': self.calculate_false_positive_rate(validated_assessments),
'false_negative_rate': self.calculate_false_negative_rate(validated_assessments),
'skill_category_accuracy': self.get_skill_category_accuracy(validated_assessments)
}
return accuracy_metrics
def track_recommendation_effectiveness(self):
"""Measure effectiveness of AI-generated recommendations."""
# Track user engagement with recommendations
recommendations = Recommendation.objects.filter(
created_at__gte=timezone.now() - timedelta(days=90)
)
effectiveness_metrics = {
'recommendation_acceptance_rate': self.calculate_acceptance_rate(recommendations),
'improvement_correlation': self.calculate_improvement_correlation(recommendations),
'user_satisfaction': self.get_recommendation_satisfaction(recommendations)
}
return effectiveness_metrics
Real-Time AI Monitoring
Copy
class RealTimeAIMonitoring:
def __init__(self):
self.redis_client = redis.Redis()
def track_live_processing_metrics(self, component, processing_time, accuracy):
"""Track real-time AI processing metrics."""
# Store metrics in Redis for real-time dashboard
timestamp = int(time.time())
# Processing time tracking
self.redis_client.zadd(
f'ai_processing_times:{component}',
{timestamp: processing_time}
)
# Accuracy tracking
self.redis_client.zadd(
f'ai_accuracy:{component}',
{timestamp: accuracy}
)
# Keep only last 24 hours of data
cutoff = timestamp - 86400
self.redis_client.zremrangebyscore(
f'ai_processing_times:{component}', '-inf', cutoff
)
self.redis_client.zremrangebyscore(
f'ai_accuracy:{component}', '-inf', cutoff
)
def get_real_time_metrics(self, component):
"""Get current AI performance metrics."""
# Get recent processing times
recent_times = self.redis_client.zrange(
f'ai_processing_times:{component}', -100, -1, withscores=True
)
# Get recent accuracy scores
recent_accuracy = self.redis_client.zrange(
f'ai_accuracy:{component}', -100, -1, withscores=True
)
if not recent_times or not recent_accuracy:
return {'error': 'Insufficient data'}
return {
'avg_processing_time': sum(score for _, score in recent_times) / len(recent_times),
'avg_accuracy': sum(score for _, score in recent_accuracy) / len(recent_accuracy),
'processing_trend': self.calculate_trend(recent_times),
'accuracy_trend': self.calculate_trend(recent_accuracy)
}
User Behavior Analytics
Feature Usage Analytics
Copy
class FeatureUsageAnalytics:
def get_feature_adoption_metrics(self, time_period='30d'):
"""Analyze feature adoption and usage patterns."""
cutoff_date = timezone.now() - self.parse_time_period(time_period)
# Core feature usage
feature_metrics = {
'interview_practice': self.get_practice_usage(cutoff_date),
'ai_feedback': self.get_ai_feedback_usage(cutoff_date),
'learning_paths': self.get_learning_path_usage(cutoff_date),
'analytics_dashboard': self.get_dashboard_usage(cutoff_date),
'skill_assessments': self.get_skill_assessment_usage(cutoff_date)
}
return feature_metrics
def get_practice_usage(self, cutoff_date):
"""Analyze interview practice feature usage."""
practice_sessions = InterviewSession.objects.filter(
is_practice=True,
created_at__gte=cutoff_date
)
total_users = User.objects.filter(
last_login__gte=cutoff_date
).count()
return {
'total_practice_sessions': practice_sessions.count(),
'unique_users': practice_sessions.values('user_id').distinct().count(),
'adoption_rate': (
practice_sessions.values('user_id').distinct().count() / total_users * 100
if total_users > 0 else 0
),
'avg_sessions_per_user': (
practice_sessions.count() /
practice_sessions.values('user_id').distinct().count()
if practice_sessions.values('user_id').distinct().count() > 0 else 0
),
'completion_rate': practice_sessions.filter(
status='completed'
).count() / practice_sessions.count() * 100 if practice_sessions.count() > 0 else 0
}
User Journey Analytics
Copy
class UserJourneyAnalytics:
def analyze_user_funnel(self):
"""Analyze user conversion funnel."""
# Define funnel stages
funnel_stages = {
'registration': User.objects.all().count(),
'email_verification': User.objects.filter(is_verified=True).count(),
'first_interview': User.objects.filter(
interviewsession__isnull=False
).distinct().count(),
'completed_interview': User.objects.filter(
interviewsession__status='completed'
).distinct().count(),
'subscription': User.objects.filter(
subscription__status='active'
).distinct().count()
}
# Calculate conversion rates
conversion_rates = {}
stages = list(funnel_stages.keys())
for i in range(1, len(stages)):
current_stage = stages[i]
previous_stage = stages[i-1]
conversion_rates[f'{previous_stage}_to_{current_stage}'] = (
funnel_stages[current_stage] / funnel_stages[previous_stage] * 100
if funnel_stages[previous_stage] > 0 else 0
)
return {
'funnel_stages': funnel_stages,
'conversion_rates': conversion_rates,
'overall_conversion': (
funnel_stages['subscription'] / funnel_stages['registration'] * 100
if funnel_stages['registration'] > 0 else 0
)
}
def get_user_retention_cohorts(self):
"""Analyze user retention by registration cohorts."""
# Group users by registration month
cohorts = User.objects.extra(
select={'cohort_month': "DATE_TRUNC('month', date_joined)"}
).values('cohort_month').annotate(
total_users=Count('id')
).order_by('cohort_month')
retention_data = []
for cohort in cohorts:
cohort_users = User.objects.filter(
date_joined__month=cohort['cohort_month'].month,
date_joined__year=cohort['cohort_month'].year
)
# Calculate retention for each month after registration
monthly_retention = []
for month_offset in range(1, 13): # 12 months
retention_date = cohort['cohort_month'] + relativedelta(months=month_offset)
active_users = cohort_users.filter(
last_login__gte=retention_date,
last_login__lt=retention_date + relativedelta(months=1)
).count()
retention_rate = (
active_users / cohort['total_users'] * 100
if cohort['total_users'] > 0 else 0
)
monthly_retention.append({
'month': month_offset,
'active_users': active_users,
'retention_rate': retention_rate
})
retention_data.append({
'cohort_month': cohort['cohort_month'],
'total_users': cohort['total_users'],
'retention_by_month': monthly_retention
})
return retention_data
Technical Performance Metrics
System Performance Analytics
Copy
class SystemPerformanceAnalytics:
def __init__(self):
self.datadog_client = DatadogAPIClient()
def get_application_performance_metrics(self, time_range='1h'):
"""Get application performance metrics from DataDog."""
metrics = {
'response_time': self.get_response_time_metrics(time_range),
'throughput': self.get_throughput_metrics(time_range),
'error_rate': self.get_error_rate_metrics(time_range),
'database_performance': self.get_database_metrics(time_range),
'cache_performance': self.get_cache_metrics(time_range)
}
return metrics
def get_response_time_metrics(self, time_range):
"""Get API response time metrics."""
query = f"""
avg:trace.django.request.duration{{service:jobhive-api}} by {{resource_name}}
"""
response_times = self.datadog_client.query_timeseries(
query=query,
from_time=time_range
)
return {
'average_response_time': self.calculate_average(response_times),
'p95_response_time': self.calculate_percentile(response_times, 95),
'p99_response_time': self.calculate_percentile(response_times, 99),
'slowest_endpoints': self.identify_slowest_endpoints(response_times)
}
def get_database_metrics(self, time_range):
"""Get database performance metrics."""
db_metrics = {
'connection_count': self.datadog_client.query_timeseries(
'avg:postgresql.connections{host:jobhive-db}',
time_range
),
'query_duration': self.datadog_client.query_timeseries(
'avg:postgresql.query_duration{host:jobhive-db}',
time_range
),
'cpu_usage': self.datadog_client.query_timeseries(
'avg:system.cpu.user{host:jobhive-db}',
time_range
),
'memory_usage': self.datadog_client.query_timeseries(
'avg:system.mem.used{host:jobhive-db}',
time_range
)
}
return db_metrics
Infrastructure Cost Analytics
Copy
class CostAnalytics:
def __init__(self):
self.aws_client = boto3.client('ce') # Cost Explorer
def get_cost_breakdown(self, time_period='MONTHLY'):
"""Get AWS cost breakdown by service."""
end_date = datetime.now().date()
start_date = end_date - relativedelta(months=1)
response = self.aws_client.get_cost_and_usage(
TimePeriod={
'Start': start_date.strftime('%Y-%m-%d'),
'End': end_date.strftime('%Y-%m-%d')
},
Granularity=time_period,
Metrics=['BlendedCost'],
GroupBy=[
{
'Type': 'DIMENSION',
'Key': 'SERVICE'
}
]
)
cost_breakdown = {}
total_cost = 0
for result in response['ResultsByTime']:
for group in result['Groups']:
service = group['Keys'][0]
cost = float(group['Metrics']['BlendedCost']['Amount'])
cost_breakdown[service] = cost
total_cost += cost
# Calculate cost per feature/metric
feature_costs = self.allocate_costs_to_features(cost_breakdown)
return {
'total_cost': total_cost,
'cost_by_service': cost_breakdown,
'cost_by_feature': feature_costs,
'cost_trends': self.get_cost_trends(),
'optimization_recommendations': self.get_cost_optimization_recommendations()
}
def calculate_cost_per_user(self):
"""Calculate cost per active user."""
monthly_cost = self.get_cost_breakdown()['total_cost']
monthly_active_users = User.objects.filter(
last_login__gte=timezone.now() - timedelta(days=30)
).count()
return {
'cost_per_user': monthly_cost / monthly_active_users if monthly_active_users > 0 else 0,
'monthly_cost': monthly_cost,
'monthly_active_users': monthly_active_users
}
Analytics Dashboard APIs
Real-Time Dashboard Endpoints
Copy
class AnalyticsDashboardAPI:
@require_http_methods(['GET'])
def get_real_time_metrics(request):
"""Get real-time platform metrics for dashboard."""
# Current active sessions
active_sessions = InterviewSession.objects.filter(
status='in_progress',
start_time__gte=timezone.now() - timedelta(hours=1)
).count()
# Recent completions (last hour)
recent_completions = InterviewSession.objects.filter(
status='completed',
end_time__gte=timezone.now() - timedelta(hours=1)
).count()
# Average scores (last 24 hours)
recent_scores = InterviewSession.objects.filter(
status='completed',
end_time__gte=timezone.now() - timedelta(days=1)
).aggregate(
avg_technical=Avg('technical_accuracy'),
avg_behavioral=Avg('behavioral_score'),
avg_completion=Avg('completion_percentage')
)
# AI processing metrics
ai_metrics = RealTimeAIMonitoring().get_real_time_metrics('sentiment')
return JsonResponse({
'active_interviews': active_sessions,
'recent_completions': recent_completions,
'average_scores': recent_scores,
'ai_performance': ai_metrics,
'timestamp': timezone.now().isoformat()
})
@require_http_methods(['GET'])
def get_business_metrics(request):
"""Get key business metrics."""
# Revenue metrics
revenue_metrics = get_revenue_metrics(
month=timezone.now().month,
year=timezone.now().year
)
# User engagement
engagement_metrics = get_user_engagement_metrics(
start_date=timezone.now() - timedelta(days=30),
end_date=timezone.now()
)
# Feature adoption
feature_analytics = FeatureUsageAnalytics()
feature_metrics = feature_analytics.get_feature_adoption_metrics()
return JsonResponse({
'revenue': revenue_metrics,
'engagement': engagement_metrics,
'features': feature_metrics,
'generated_at': timezone.now().isoformat()
})
Analytics Export and Reporting
Automated Reporting
Copy
class AnalyticsReportGenerator:
def generate_weekly_report(self):
"""Generate comprehensive weekly analytics report."""
end_date = timezone.now()
start_date = end_date - timedelta(days=7)
report_data = {
'period': f"{start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}",
'user_metrics': get_user_engagement_metrics(start_date, end_date),
'interview_metrics': InterviewAnalyticsService().get_interview_metrics(
date_range=[start_date, end_date]
),
'ai_performance': AIPerformanceAnalytics().track_sentiment_analysis_performance(),
'revenue_metrics': get_revenue_metrics(end_date.month, end_date.year),
'system_performance': SystemPerformanceAnalytics().get_application_performance_metrics('7d')
}
# Generate PDF report
report_pdf = self.create_pdf_report(report_data)
# Send to stakeholders
self.send_report_email(report_pdf, report_data)
return report_data
def create_pdf_report(self, data):
"""Create PDF report from analytics data."""
# Implementation for PDF generation
pass
def send_report_email(self, pdf_attachment, data):
"""Send analytics report via email."""
# Implementation for email sending
pass
