Searching users based on multiple weighted factors is a common challenge in social platforms and marketplaces. This article explores how to build a sophisticated user search system using Elasticsearch with weighted scoring algorithms for factors like location proximity, network connections, past interactions, and financial transactions.
Problem Statement
At Spice, we needed to search users based on multiple relevancy factors:
- Location proximity - How close users are geographically
- Network connections - Direct and indirect connections
- Past interactions - Likes, comments, messages
- Financial transactions - Money transfers between users
- Activity level - Recent engagement and activity
Each factor has different importance (weights) and we need to combine them into a single relevancy score for fast, accurate search results.
System Architecture
graph TB
A[Django Models] --> B[Celery Task]
B --> C[Data Aggregation]
C --> D[Elasticsearch Index]
E[Search Request] --> F[Weighted Scoring]
F --> D
D --> G[Ranked Results]
subgraph "Data Sources"
H[User Profile]
I[Network Connections]
J[Interactions]
K[Transactions]
end
H --> C
I --> C
J --> C
K --> C
Data Models
Django Models Structure
# Core models for user data
class User(AbstractUser):
bio = models.TextField()
location = models.CharField()
latitude = models.DecimalField()
longitude = models.DecimalField()
profile_completeness = models.IntegerField()
last_active = models.DateTimeField()
class UserConnection(models.Model):
from_user = models.ForeignKey(User)
to_user = models.ForeignKey(User)
connection_type = models.CharField() # direct, mutual, pending
strength = models.FloatField() # 0-1 connection strength
class UserInteraction(models.Model):
from_user = models.ForeignKey(User)
to_user = models.ForeignKey(User)
interaction_type = models.CharField() # like, comment, message
created_at = models.DateTimeField()
class Transaction(models.Model):
from_user = models.ForeignKey(User)
to_user = models.ForeignKey(User)
amount = models.DecimalField()
created_at = models.DateTimeField()
Elasticsearch Document Structure
# Elasticsearch document for aggregated user data
class UserDocument(Document):
# Basic fields
username = fields.TextField(analyzer='text_analyzer')
first_name = fields.TextField()
last_name = fields.TextField()
bio = fields.TextField()
location = fields.TextField()
location_coords = fields.GeoPointField()
# Aggregated metrics
connection_count = fields.IntegerField()
mutual_connections = fields.IntegerField()
interaction_count = fields.IntegerField()
transaction_count = fields.IntegerField()
total_transaction_amount = fields.FloatField()
# Network and activity scores
network_proximity = fields.FloatField()
connection_strength = fields.FloatField()
profile_completeness = fields.IntegerField()
last_active_days = fields.IntegerField()
activity_score = fields.FloatField()
# Pre-calculated relevancy factors
location_score = fields.FloatField()
network_score = fields.FloatField()
interaction_score = fields.FloatField()
transaction_score = fields.FloatField()
activity_score_calc = fields.FloatField()
Weighted Scoring Algorithm
The core algorithm combines multiple factors with configurable weights:
class SearchWeights:
location_weight = 0.25 # Geographic proximity
network_weight = 0.30 # Connection strength
interaction_weight = 0.20 # Past interactions
transaction_weight = 0.15 # Financial transactions
activity_weight = 0.10 # Recent activity
def calculate_total_score(scores):
return (
scores['location'] * 0.25 +
scores['network'] * 0.30 +
scores['interaction'] * 0.20 +
scores['transaction'] * 0.15 +
scores['activity'] * 0.10
)
Scoring Factors
- Location Score: Exponential decay based on distance
score = exp(-distance_km / 50.0)
- Network Score: Connection count, mutual connections, and strength
network_score = (connection_score * 0.4) + (mutual_score * 0.4) + (strength_score * 0.2)
- Interaction Score: Logarithmic scale for total interactions + recent activity bonus
interaction_score = log10(interaction_count + 1) / 3.0 + (recent_interactions / 50.0)
- Transaction Score: Count and amount-based scoring
transaction_score = (count_score * 0.6) + (amount_score * 0.4)
- Activity Score: Profile completeness, recency, and engagement
activity_score = (profile_score * 0.3) + (recency_score * 0.3) + (engagement_score * 0.4)
Periodic Data Aggregation
Celery Task for Bulk Indexing
@shared_task
def bulk_index_users():
"""Periodically aggregate and index user data"""
users = User.objects.select_related().prefetch_related(
'connections_from', 'connections_to',
'interactions_from', 'interactions_to',
'transactions_from', 'transactions_to'
)
for user in users:
# Calculate aggregated metrics
connection_count = user.connections_from.count() + user.connections_to.count()
mutual_connections = calculate_mutual_connections(user)
interaction_count = user.interactions_from.count() + user.interactions_to.count()
transaction_amount = calculate_total_transactions(user)
# Calculate relevancy scores
scores = calculate_relevancy_scores(user)
# Update Elasticsearch document
UserDocument.update(user)
Data Aggregation Process
flowchart TD
A[Start Aggregation] --> B[Fetch Users with Relations]
B --> C[Calculate Connection Metrics]
C --> D[Calculate Interaction Metrics]
D --> E[Calculate Transaction Metrics]
E --> F[Calculate Activity Metrics]
F --> G[Compute Relevancy Scores]
G --> H[Update Elasticsearch Document]
H --> I{More Users?}
I -->|Yes| B
I -->|No| J[Complete]
Elasticsearch Index Creation
# Index settings for optimal search performance
index_settings = {
'number_of_shards': 3,
'number_of_replicas': 1,
'refresh_interval': '30s',
'analysis': {
'analyzer': {
'text_analyzer': {
'type': 'custom',
'tokenizer': 'standard',
'filter': ['lowercase', 'stop', 'snowball']
}
}
}
}
# Create index with mappings
UserDocument._index.create()
Search Implementation
Django Elasticsearch DSL Query
def search_users(query=None, location=None, radius_km=50, weights=None):
search = Search(index='users')
# Text search
if query:
search = search.query(
'multi_match',
query=query,
fields=['username^3', 'first_name^2', 'last_name^2', 'bio']
)
# Location-based search
if location:
search = search.query(
'geo_distance',
location_coords=location,
distance=f'{radius_km}km'
)
# Execute and score results
response = search.execute()
# Apply weighted scoring
for hit in response:
scores = calculate_factor_scores(hit.to_dict(), location)
hit.total_score = calculate_total_score(scores)
return sorted(response, key=lambda x: x.total_score, reverse=True)
Sample Data and Results
Input Data Example
{
"users": [
{
"id": 1,
"username": "john_developer",
"location": "San Francisco, CA",
"latitude": 37.7749,
"longitude": -122.4194,
"connection_count": 45,
"mutual_connections": 12,
"interaction_count": 156,
"transaction_count": 8,
"total_transaction_amount": 1250.00,
"profile_completeness": 85,
"last_active_days": 2
},
{
"id": 2,
"username": "jane_designer",
"location": "Oakland, CA",
"latitude": 37.8044,
"longitude": -122.2711,
"connection_count": 32,
"mutual_connections": 8,
"interaction_count": 89,
"transaction_count": 3,
"total_transaction_amount": 450.00,
"profile_completeness": 92,
"last_active_days": 1
}
]
}
Search Results
Query: “developer” near San Francisco
{
"results": [
{
"id": 1,
"username": "john_developer",
"relevancy_scores": {
"location": 0.95,
"network": 0.78,
"interaction": 0.82,
"transaction": 0.65,
"activity": 0.88
},
"total_relevancy_score": 0.82,
"rank": 1
},
{
"id": 2,
"username": "jane_designer",
"relevancy_scores": {
"location": 0.85,
"network": 0.62,
"interaction": 0.58,
"transaction": 0.42,
"activity": 0.92
},
"total_relevancy_score": 0.71,
"rank": 2
}
]
}
Performance Optimization
Caching Strategy
# Cache search results for 5 minutes
cache_key = f"search:{hash(search_params)}"
cached_results = cache.get(cache_key)
if cached_results:
return cached_results
results = perform_search(search_params)
cache.set(cache_key, results, 300)
Query Optimization
# Optimized queryset with annotations
users = User.objects.annotate(
connection_count=Count('connections_from') + Count('connections_to'),
interaction_count=Count('interactions_from') + Count('interactions_to'),
transaction_count=Count('transactions_from') + Count('transactions_to')
).select_related().prefetch_related(
'connections_from', 'connections_to',
'interactions_from', 'interactions_to'
)
Monitoring and Analytics
# Search performance metrics
metrics = {
'avg_response_time_ms': 45,
'cache_hit_rate': 0.78,
'success_rate': 0.995,
'top_search_terms': [
{'term': 'developer', 'count': 1250},
{'term': 'designer', 'count': 980}
]
}
Conclusion
This implementation provides a robust user search system with:
- Weighted scoring for multiple relevancy factors
- Periodic bulk indexing for performance
- Geographic search with distance-based scoring
- Network-aware results based on connections
- Activity-based relevancy scoring
- Caching for sub-second response times
The system can handle thousands of users efficiently while providing personalized, relevant search results based on multiple weighted factors.
Key Benefits
- Scalable: Elasticsearch handles large datasets
- Flexible: Easy to adjust weights and add factors
- Fast: Caching and optimized queries
- Accurate: Multi-factor scoring provides relevant results
- Maintainable: Clean separation of concerns
This solution addresses real-world challenges in user discovery and provides a solid foundation for building sophisticated search features in social platforms and marketplaces.
There are currently no comments on this article, be the first to add one below