Billing & Monetization
Overview
JobHive’s billing system is built on Stripe integration with a flexible subscription model that supports both candidate and employer user types. The system includes freemium tiers, usage-based pricing, promotional codes, and comprehensive billing analytics.Revenue Model
Subscription Tiers
Candidate Plans
Copy
CANDIDATE_SUBSCRIPTION_PLANS = {
'free_candidate': {
'price': 0.00,
'interval': 'monthly',
'features': {
'interview_limit': 2,
'basic_feedback': True,
'performance_tracking': 'basic',
'learning_resources': 'limited'
}
},
'basic_candidate': {
'price': 29.99,
'interval': 'monthly',
'features': {
'interview_limit': 0, # unlimited
'detailed_feedback': True,
'sentiment_analysis': True,
'personalized_learning_paths': True,
'performance_benchmarking': True,
'priority_support': False
}
},
'premium_candidate': {
'price': 49.99,
'interval': 'monthly',
'features': {
'interview_limit': 0, # unlimited
'detailed_feedback': True,
'sentiment_analysis': True,
'personalized_learning_paths': True,
'performance_benchmarking': True,
'priority_support': True,
'career_coaching_integration': True,
'custom_interview_scenarios': True
}
}
}
Employer Plans
Copy
EMPLOYER_SUBSCRIPTION_PLANS = {
'free_employer': {
'price': 0.00,
'interval': 'monthly',
'features': {
'job_posting_limit': 1,
'interview_limit': 5,
'application_limit': 10,
'basic_analytics': True,
'team_members': 1
}
},
'starter_employer': {
'price': 99.99,
'interval': 'monthly',
'features': {
'job_posting_limit': 5,
'interview_limit': 25,
'application_limit': 100,
'advanced_analytics': True,
'team_collaboration': True,
'team_members': 5,
'email_support': True
}
},
'business_employer': {
'price': 249.99,
'interval': 'monthly',
'features': {
'job_posting_limit': 20,
'interview_limit': 100,
'application_limit': 500,
'advanced_analytics': True,
'team_collaboration': True,
'api_access': True,
'team_members': 20,
'priority_support': True,
'custom_integrations': True
}
},
'enterprise_employer': {
'price': 'custom',
'interval': 'monthly',
'features': {
'job_posting_limit': 0, # unlimited
'interview_limit': 0, # unlimited
'application_limit': 0, # unlimited
'white_label': True,
'dedicated_support': True,
'custom_ai_training': True,
'sla_guarantee': True,
'team_members': 0 # unlimited
}
}
}
Stripe Integration
Subscription Management
Copy
class StripeSubscriptionManager:
def __init__(self):
stripe.api_key = settings.STRIPE_SECRET_KEY
def create_customer(self, user):
"""Create Stripe customer for user."""
try:
customer = stripe.Customer.create(
email=user.email,
name=f"{user.first_name} {user.last_name}",
metadata={
'user_id': user.id,
'user_role': user.role
}
)
# Update user subscription record
subscription, created = CustomerSubscription.objects.get_or_create(
user=user,
defaults={
'stripe_customer_id': customer.id,
'plan': self.get_free_plan(user.role),
'status': 'active'
}
)
if not created:
subscription.stripe_customer_id = customer.id
subscription.save()
return customer
except stripe.error.StripeError as e:
logger.error(f"Stripe customer creation failed: {e}")
raise
def create_subscription(self, user, plan_id, payment_method_id=None, promo_code=None):
"""Create new subscription for user."""
plan = SubscriptionPlan.objects.get(id=plan_id)
# Get or create Stripe customer
customer_subscription = user.subscription
if not customer_subscription.stripe_customer_id:
customer = self.create_customer(user)
else:
customer = stripe.Customer.retrieve(customer_subscription.stripe_customer_id)
# Attach payment method if provided
if payment_method_id:
stripe.PaymentMethod.attach(
payment_method_id,
customer=customer.id
)
# Set as default payment method
stripe.Customer.modify(
customer.id,
invoice_settings={'default_payment_method': payment_method_id}
)
# Create subscription parameters
subscription_params = {
'customer': customer.id,
'items': [{'price': plan.stripe_price_id}],
'metadata': {
'user_id': user.id,
'plan_id': plan.id
},
'expand': ['latest_invoice.payment_intent']
}
# Apply promo code if provided
if promo_code:
promo = PromoCode.objects.get(code=promo_code)
if promo.is_valid:
coupon = self.create_stripe_coupon(promo)
subscription_params['coupon'] = coupon.id
try:
# Create Stripe subscription
stripe_subscription = stripe.Subscription.create(**subscription_params)
# Update local subscription record
customer_subscription.plan = plan
customer_subscription.stripe_subscription_id = stripe_subscription.id
customer_subscription.status = stripe_subscription.status
customer_subscription.current_period_start = datetime.fromtimestamp(
stripe_subscription.current_period_start, tz=timezone.utc
)
customer_subscription.current_period_end = datetime.fromtimestamp(
stripe_subscription.current_period_end, tz=timezone.utc
)
customer_subscription.save()
# Record promo code usage
if promo_code:
PromoCodeUsage.objects.create(
promo_code=promo,
user=user,
subscription=customer_subscription,
original_amount=plan.price,
discounted_amount=promo.calculate_discounted_price(plan.price)
)
return stripe_subscription
except stripe.error.StripeError as e:
logger.error(f"Stripe subscription creation failed: {e}")
raise
Payment Processing
Copy
class StripePaymentProcessor:
def process_payment(self, user, amount, currency='usd', description=None):
"""Process one-time payment."""
try:
# Create payment intent
payment_intent = stripe.PaymentIntent.create(
amount=int(amount * 100), # Convert to cents
currency=currency,
customer=user.subscription.stripe_customer_id,
description=description,
metadata={
'user_id': user.id,
'payment_type': 'one_time'
}
)
return payment_intent
except stripe.error.StripeError as e:
logger.error(f"Payment processing failed: {e}")
raise
def handle_failed_payment(self, subscription_id, invoice_id):
"""Handle failed payment for subscription."""
try:
subscription = CustomerSubscription.objects.get(
stripe_subscription_id=subscription_id
)
# Update subscription status
subscription.status = 'past_due'
subscription.save()
# Create notification
BillingNotification.create(
user=subscription.user,
notification_type='payment_failed',
title='Payment Failed',
message=f'Your payment for {subscription.plan.name} has failed. Please update your payment method.',
subscription=subscription,
send_email=True
)
# Schedule retry attempts
self.schedule_payment_retry(subscription)
except CustomerSubscription.DoesNotExist:
logger.error(f"Subscription not found for Stripe ID: {subscription_id}")
Usage-Based Billing
Usage Tracking
Copy
class UsageTrackingService:
def record_usage(self, user, usage_type, quantity=1, resource_id=None, metadata=None):
"""Record usage for billing purposes."""
subscription = user.subscription
# Check if usage is within plan limits
if not self.is_usage_allowed(subscription, usage_type):
raise UsageLimitExceeded(f"Usage limit exceeded for {usage_type}")
# Record usage
usage_record = UsageRecord.objects.create(
user=user,
subscription=subscription,
usage_type=usage_type,
quantity=quantity,
resource_id=resource_id,
metadata=metadata or {}
)
# Update usage metrics
self.update_usage_metrics(subscription, usage_type, quantity)
# Check if approaching limits
self.check_usage_warnings(subscription, usage_type)
return usage_record
def is_usage_allowed(self, subscription, usage_type):
"""Check if usage is within plan limits."""
plan = subscription.plan
current_period_start = subscription.current_period_start
current_period_end = subscription.current_period_end
# Get current period usage
current_usage = UsageRecord.objects.filter(
subscription=subscription,
usage_type=usage_type,
created_at__range=[current_period_start, current_period_end]
).aggregate(
total=Sum('quantity')
)['total'] or 0
# Check limits based on usage type
limits = {
'interview': plan.interview_limit,
'job_posting': plan.job_posting_limit,
'application': plan.application_limit
}
plan_limit = limits.get(usage_type, 0)
# 0 means unlimited
if plan_limit == 0:
return True
return current_usage < plan_limit
def check_usage_warnings(self, subscription, usage_type):
"""Send warnings when approaching usage limits."""
plan = subscription.plan
current_usage = self.get_current_period_usage(subscription, usage_type)
limits = {
'interview': plan.interview_limit,
'job_posting': plan.job_posting_limit,
'application': plan.application_limit
}
limit = limits.get(usage_type, 0)
if limit > 0: # Skip unlimited plans
usage_percentage = (current_usage / limit) * 100
# Send warning at 80% usage
if usage_percentage >= 80 and usage_percentage < 100:
BillingNotification.create(
user=subscription.user,
notification_type='usage_limit',
title=f'{usage_type.title()} Usage Warning',
message=f'You have used {usage_percentage:.0f}% of your {usage_type} limit.',
subscription=subscription,
metadata={'usage_type': usage_type, 'percentage': usage_percentage}
)
Metered Billing Integration
Copy
class MeteredBillingService:
def __init__(self):
self.stripe_manager = StripeSubscriptionManager()
def report_usage_to_stripe(self, subscription, usage_records):
"""Report usage to Stripe for metered billing."""
if not subscription.stripe_subscription_item_id:
logger.warning(f"No subscription item ID for subscription {subscription.id}")
return
for usage_record in usage_records:
try:
# Report usage to Stripe
usage_record_response = stripe.UsageRecord.create(
subscription_item=subscription.stripe_subscription_item_id,
quantity=usage_record.quantity,
timestamp=int(usage_record.created_at.timestamp()),
action='increment'
)
# Update local record with Stripe ID
usage_record.stripe_usage_record_id = usage_record_response.id
usage_record.save()
except stripe.error.StripeError as e:
logger.error(f"Failed to report usage to Stripe: {e}")
Promotional System
Promo Code Management
Copy
class PromoCodeManager:
def create_promo_code(self, code=None, discount_type='percentage',
discount_value=20, max_uses=100, valid_until=None,
applicable_plans=None, first_time_only=False):
"""Create promotional code."""
if not code:
code = generate_promo_code()
promo_code = PromoCode.objects.create(
code=code,
discount_type=discount_type,
discount_value=discount_value,
max_uses=max_uses,
valid_until=valid_until,
first_time_only=first_time_only
)
if applicable_plans:
promo_code.applicable_plans.set(applicable_plans)
return promo_code
def validate_promo_code(self, code, user, plan):
"""Validate promo code for user and plan."""
try:
promo = PromoCode.objects.get(code=code.upper())
except PromoCode.DoesNotExist:
return {'valid': False, 'error': 'Invalid promo code'}
# Check if promo code is valid
if not promo.is_valid:
return {'valid': False, 'error': 'Promo code has expired or reached usage limit'}
# Check if first-time only restriction applies
if promo.first_time_only:
if CustomerSubscription.objects.filter(
user=user,
status__in=['active', 'canceled']
).exists():
return {'valid': False, 'error': 'This promo code is only valid for first-time subscribers'}
# Check if plan is applicable
if promo.applicable_plans.exists() and plan not in promo.applicable_plans.all():
return {'valid': False, 'error': 'This promo code is not valid for the selected plan'}
# Check if user has already used this promo code
if PromoCodeUsage.objects.filter(promo_code=promo, user=user).exists():
return {'valid': False, 'error': 'You have already used this promo code'}
# Calculate discounted price
discounted_price = promo.calculate_discounted_price(plan.price)
return {
'valid': True,
'promo_code': promo,
'original_price': float(plan.price),
'discounted_price': float(discounted_price),
'discount_amount': float(plan.price - discounted_price),
'discount_display': promo.get_discount_display()
}
Beta Program Integration
Beta User Management
Copy
class BetaUserManager:
def enroll_user_in_beta(self, user, duration_days=None):
"""Enroll user in beta program."""
beta_settings = BetaSettings.get_instance()
if not beta_settings.is_active:
raise BetaProgramNotActive("Beta program is not currently active")
# Check beta capacity
if beta_settings.max_beta_users > 0:
active_beta_users = BetaUser.objects.filter(is_active=True).count()
if active_beta_users >= beta_settings.max_beta_users:
raise BetaProgramFull("Beta program has reached maximum capacity")
# Set duration
if not duration_days:
duration_days = beta_settings.default_beta_days
end_date = timezone.now() + timedelta(days=duration_days)
# Create or update beta access
beta_user, created = BetaUser.objects.get_or_create(
user=user,
defaults={
'end_date': end_date,
'is_active': True
}
)
if not created:
beta_user.end_date = end_date
beta_user.is_active = True
beta_user.save()
# Grant premium access during beta
self.grant_beta_access(user)
# Send welcome notification
BillingNotification.create(
user=user,
notification_type='beta_access_granted',
title='Welcome to JobHive Beta!',
message=f'You now have access to all premium features until {end_date.strftime("%B %d, %Y")}.',
metadata={'beta_end_date': end_date.isoformat()}
)
return beta_user
def handle_beta_expiration(self, beta_user):
"""Handle beta program expiration for user."""
beta_settings = BetaSettings.get_instance()
user = beta_user.user
if beta_settings.beta_expiration_behavior == 'downgrade':
# Downgrade to free plan
free_plan = SubscriptionPlan.objects.get(
price=0,
plan_type=user.role
)
subscription = user.subscription
subscription.plan = free_plan
subscription.save()
BillingNotification.create(
user=user,
notification_type='beta_expiring',
title='Beta Access Expired',
message='Your beta access has expired. You have been moved to our free plan.',
subscription=subscription
)
elif beta_settings.beta_expiration_behavior == 'prompt':
# Prompt user to subscribe
self.send_conversion_offer(user, beta_settings)
# Deactivate beta access
beta_user.is_active = False
beta_user.save()
def send_conversion_offer(self, user, beta_settings):
"""Send special offer to convert beta user to paid plan."""
if beta_settings.conversion_offer_enabled:
discount_percent = beta_settings.conversion_discount_percent
# Create special promo code for beta user
promo_code = PromoCodeManager().create_promo_code(
discount_type='percentage',
discount_value=discount_percent,
max_uses=1,
valid_until=timezone.now() + timedelta(days=7),
first_time_only=False
)
BillingNotification.create(
user=user,
notification_type='beta_conversion_offer',
title='Special Offer: Continue with Premium',
message=f'Your beta access has expired. Continue with {discount_percent}% off any plan using code {promo_code.code}',
metadata={'promo_code': promo_code.code, 'discount_percent': discount_percent}
)
Billing Analytics
Revenue Analytics
Copy
class BillingAnalyticsService:
def get_revenue_metrics(self, start_date, end_date):
"""Get comprehensive revenue metrics."""
# Monthly Recurring Revenue (MRR)
active_subscriptions = CustomerSubscription.objects.filter(
status='active',
current_period_start__lte=end_date,
current_period_end__gte=start_date
)
mrr = 0
for subscription in active_subscriptions:
if subscription.plan.interval == 'year':
mrr += subscription.plan.price / 12
else:
mrr += subscription.plan.price
# New subscriptions in period
new_subscriptions = CustomerSubscription.objects.filter(
created_at__range=[start_date, end_date],
status='active'
)
# Churned subscriptions
churned_subscriptions = CustomerSubscription.objects.filter(
status='canceled',
updated_at__range=[start_date, end_date]
)
# Calculate churn rate
total_subscriptions_start = CustomerSubscription.objects.filter(
created_at__lt=start_date,
status__in=['active', 'canceled']
).count()
churn_rate = (
churned_subscriptions.count() / total_subscriptions_start * 100
if total_subscriptions_start > 0 else 0
)
# Customer Lifetime Value (CLV)
avg_monthly_revenue = mrr / active_subscriptions.count() if active_subscriptions.count() > 0 else 0
avg_customer_lifespan = 1 / (churn_rate / 100) if churn_rate > 0 else float('inf')
clv = avg_monthly_revenue * avg_customer_lifespan
return {
'mrr': float(mrr),
'arr': float(mrr * 12),
'new_subscriptions': new_subscriptions.count(),
'churned_subscriptions': churned_subscriptions.count(),
'churn_rate': churn_rate,
'active_subscriptions': active_subscriptions.count(),
'arpu': float(avg_monthly_revenue),
'clv': float(clv) if clv != float('inf') else None
}
def get_plan_performance(self):
"""Analyze performance of different subscription plans."""
plan_metrics = []
for plan in SubscriptionPlan.objects.filter(is_active=True):
active_subs = CustomerSubscription.objects.filter(
plan=plan,
status='active'
)
# Calculate conversion rate
trial_users = CustomerSubscription.objects.filter(
plan=plan,
status='trialing'
).count()
conversion_rate = (
active_subs.count() / (active_subs.count() + trial_users) * 100
if (active_subs.count() + trial_users) > 0 else 0
)
# Revenue contribution
if plan.interval == 'year':
monthly_revenue = (plan.price / 12) * active_subs.count()
else:
monthly_revenue = plan.price * active_subs.count()
plan_metrics.append({
'plan_name': plan.name,
'plan_type': plan.plan_type,
'price': float(plan.price),
'active_subscriptions': active_subs.count(),
'conversion_rate': conversion_rate,
'monthly_revenue': float(monthly_revenue),
'revenue_share': 0 # Will be calculated after all plans
})
# Calculate revenue share
total_revenue = sum(metric['monthly_revenue'] for metric in plan_metrics)
for metric in plan_metrics:
metric['revenue_share'] = (
metric['monthly_revenue'] / total_revenue * 100
if total_revenue > 0 else 0
)
return plan_metrics
Usage Analytics
Copy
class UsageAnalyticsService:
def get_usage_trends(self, days=30):
"""Analyze usage trends across the platform."""
end_date = timezone.now()
start_date = end_date - timedelta(days=days)
usage_data = UsageRecord.objects.filter(
created_at__range=[start_date, end_date]
).values('usage_type', 'created_at__date').annotate(
total_usage=Sum('quantity'),
unique_users=Count('user_id', distinct=True)
).order_by('created_at__date', 'usage_type')
# Organize data by usage type
usage_trends = {}
for record in usage_data:
usage_type = record['usage_type']
date = record['created_at__date']
if usage_type not in usage_trends:
usage_trends[usage_type] = []
usage_trends[usage_type].append({
'date': date,
'total_usage': record['total_usage'],
'unique_users': record['unique_users']
})
# Calculate growth rates
growth_analysis = {}
for usage_type, trends in usage_trends.items():
if len(trends) >= 2:
recent_usage = trends[-1]['total_usage']
previous_usage = trends[-2]['total_usage']
growth_rate = (
(recent_usage - previous_usage) / previous_usage * 100
if previous_usage > 0 else 0
)
growth_analysis[usage_type] = {
'growth_rate': growth_rate,
'recent_usage': recent_usage,
'trend': 'increasing' if growth_rate > 0 else 'decreasing'
}
return {
'usage_trends': usage_trends,
'growth_analysis': growth_analysis
}
def identify_usage_patterns(self):
"""Identify patterns in user usage behavior."""
# Analyze usage by plan type
usage_by_plan = UsageRecord.objects.values(
'subscription__plan__name',
'usage_type'
).annotate(
total_usage=Sum('quantity'),
avg_usage_per_user=Avg('quantity'),
unique_users=Count('user_id', distinct=True)
)
# Heavy users analysis
heavy_users = UsageRecord.objects.values('user_id').annotate(
total_usage=Sum('quantity')
).filter(
total_usage__gte=100 # Define heavy user threshold
).order_by('-total_usage')[:10]
# Usage patterns by time of day
hourly_usage = UsageRecord.objects.extra(
select={'hour': 'EXTRACT(hour FROM created_at)'}
).values('hour').annotate(
total_usage=Sum('quantity')
).order_by('hour')
return {
'usage_by_plan': list(usage_by_plan),
'heavy_users': list(heavy_users),
'hourly_patterns': list(hourly_usage)
}
Webhook Handling
Stripe Webhook Processing
Copy
class StripeWebhookHandler:
def __init__(self):
self.webhook_secret = settings.STRIPE_WEBHOOK_SECRET
def handle_webhook(self, request):
"""Process Stripe webhook events."""
payload = request.body
sig_header = request.META.get('HTTP_STRIPE_SIGNATURE')
try:
event = stripe.Webhook.construct_event(
payload, sig_header, self.webhook_secret
)
except ValueError:
return HttpResponse(status=400)
except stripe.error.SignatureVerificationError:
return HttpResponse(status=400)
# Handle the event
if event['type'] == 'customer.subscription.updated':
self.handle_subscription_updated(event['data']['object'])
elif event['type'] == 'customer.subscription.deleted':
self.handle_subscription_deleted(event['data']['object'])
elif event['type'] == 'invoice.payment_succeeded':
self.handle_payment_succeeded(event['data']['object'])
elif event['type'] == 'invoice.payment_failed':
self.handle_payment_failed(event['data']['object'])
else:
logger.info(f'Unhandled webhook event type: {event["type"]}')
return HttpResponse(status=200)
def handle_subscription_updated(self, subscription):
"""Handle subscription update webhook."""
try:
customer_subscription = CustomerSubscription.objects.get(
stripe_subscription_id=subscription['id']
)
# Update subscription details
customer_subscription.status = subscription['status']
customer_subscription.current_period_start = datetime.fromtimestamp(
subscription['current_period_start'], tz=timezone.utc
)
customer_subscription.current_period_end = datetime.fromtimestamp(
subscription['current_period_end'], tz=timezone.utc
)
customer_subscription.cancel_at_period_end = subscription['cancel_at_period_end']
customer_subscription.save()
# Send notification if subscription was canceled
if subscription['cancel_at_period_end']:
BillingNotification.create(
user=customer_subscription.user,
notification_type='subscription_canceled',
title='Subscription Canceled',
message='Your subscription has been canceled and will end at the end of the current billing period.',
subscription=customer_subscription
)
except CustomerSubscription.DoesNotExist:
logger.error(f"Subscription not found for Stripe ID: {subscription['id']}")
def handle_payment_succeeded(self, invoice):
"""Handle successful payment webhook."""
try:
subscription_id = invoice['subscription']
customer_subscription = CustomerSubscription.objects.get(
stripe_subscription_id=subscription_id
)
# Create invoice record
Invoice.objects.create(
user=customer_subscription.user,
subscription=customer_subscription,
stripe_invoice_id=invoice['id'],
amount=invoice['total'] / 100, # Convert from cents
status='paid',
invoice_date=datetime.fromtimestamp(invoice['created'], tz=timezone.utc),
paid_date=timezone.now()
)
# Send confirmation notification
BillingNotification.create(
user=customer_subscription.user,
notification_type='payment_success',
title='Payment Confirmed',
message=f'Your payment of ${invoice["total"] / 100:.2f} has been processed successfully.',
subscription=customer_subscription
)
except CustomerSubscription.DoesNotExist:
logger.error(f"Subscription not found for invoice: {invoice['id']}")
