| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178 |
- from __future__ import annotations
- from collections import Counter
- from dataclasses import dataclass
- from typing import Iterable, List, Sequence, Tuple
- from django.contrib.auth.models import User
- from django.db.models import Q
- from django.utils import timezone
- from api.models import IntroductionRequest, Opportunity, Profile
- def _now():
- return timezone.now()
- def tokenize_interests_text(text: str | None) -> set[str]:
- if not text:
- return set()
- raw = [t.strip().lower() for part in text.split(",") for t in part.split()] # type: ignore
- return {t for t in raw if t}
- def profile_interests(profile: Profile) -> set[str]:
- # Prefer taggit tags when available, fall back to comma-separated string
- try:
- tag_names = set(t.lower() for t in profile.tags.names())
- except Exception:
- tag_names = set()
- if tag_names:
- return tag_names
- return tokenize_interests_text(profile.interests)
- def tokenize_text(text: str | None) -> List[str]:
- if not text:
- return []
- clean = "".join(ch.lower() if ch.isalnum() or ch.isspace() else " " for ch in text)
- return [t for t in clean.split() if t]
- def jaccard(a: Iterable[str], b: Iterable[str]) -> float:
- sa, sb = set(a), set(b)
- if not sa and not sb:
- return 0.0
- inter = len(sa & sb)
- union = len(sa | sb)
- return inter / union if union else 0.0
- def cosine(a: Sequence[str], b: Sequence[str]) -> float:
- if not a or not b:
- return 0.0
- ca, cb = Counter(a), Counter(b)
- # dot product
- dot = sum(ca[t] * cb.get(t, 0) for t in ca)
- if dot == 0:
- return 0.0
- import math
- na = math.sqrt(sum(v * v for v in ca.values()))
- nb = math.sqrt(sum(v * v for v in cb.values()))
- denom = na * nb
- return (dot / denom) if denom else 0.0
- def time_decay(ts, half_life_days: float = 7.0) -> float:
- if not ts:
- return 0.0
- import math
- dt = (_now() - ts).total_seconds()
- half_life = half_life_days * 24 * 3600.0
- if dt <= 0:
- return 1.0
- return math.pow(0.5, dt / half_life)
- def neighbor_set(u: User) -> set[int]:
- # Users this user has interacted with via introduction requests
- ids = set(
- IntroductionRequest.objects.filter(Q(from_user=u) | Q(to_user=u))
- .values_list("from_user_id", "to_user_id")
- )
- # Flatten and drop self id
- flat = {i for pair in ids for i in pair if i and i != u.id}
- return flat
- def graph_similarity(u: User, v: User) -> float:
- nu, nv = neighbor_set(u), neighbor_set(v)
- return jaccard(nu, nv)
- def has_existing_request(u: User, v: User) -> bool:
- return IntroductionRequest.objects.filter(
- (Q(from_user=u, to_user=v) | Q(from_user=v, to_user=u))
- ).exists()
- @dataclass
- class RankedProfile:
- profile: Profile
- score: float
- def rank_users_for(user: User, k: int = 20) -> List[RankedProfile]:
- try:
- me_profile = Profile.objects.select_related("user").get(user=user)
- except Profile.DoesNotExist:
- me_profile = Profile(user=user) # empty defaults
- my_interests = profile_interests(me_profile)
- ranked: List[Tuple[float, Profile]] = []
- qs = (
- Profile.objects.select_related("user")
- .exclude(user=user)
- )
- for p in qs.iterator():
- if has_existing_request(user, p.user):
- continue
- jac = jaccard(my_interests, profile_interests(p))
- # Industry match: exact string match and non-empty
- industry_match = 1.0 if (me_profile.industry and me_profile.industry == p.industry) else 0.0
- ver = 1.0 if p.is_verified else 0.0
- gsim = graph_similarity(user, p.user)
- # Candidate activity recency: last accepted intro involving candidate
- last_acc = (
- IntroductionRequest.objects.filter(
- Q(from_user=p.user) | Q(to_user=p.user), is_accepted=True
- )
- .order_by("-created_at")
- .values_list("created_at", flat=True)
- .first()
- )
- rec = time_decay(last_acc) if last_acc else 0.0
- score = 0.45 * jac + 0.15 * industry_match + 0.20 * gsim + 0.10 * ver + 0.10 * rec
- if score > 0:
- ranked.append((score, p))
- ranked.sort(key=lambda t: t[0], reverse=True)
- top = ranked[:k]
- return [RankedProfile(profile=p, score=s) for s, p in top]
- @dataclass
- class RankedOpportunity:
- opportunity: Opportunity
- score: float
- def rank_opportunities_for(user: User, k: int = 20) -> List[RankedOpportunity]:
- try:
- me_profile = Profile.objects.select_related("user").get(user=user)
- except Profile.DoesNotExist:
- me_profile = Profile(user=user) # empty defaults
- query_tokens = tokenize_text((me_profile.bio or "") + " " + (me_profile.interests or "") + " " + (me_profile.industry or ""))
- ranked: List[Tuple[float, Opportunity]] = []
- for o in Opportunity.objects.all().iterator():
- doc_tokens = tokenize_text(o.title + " " + (o.description or ""))
- sim = cosine(query_tokens, doc_tokens)
- freshness = time_decay(o.created_at, half_life_days=10.0)
- score = 0.6 * sim + 0.4 * freshness
- if score > 0:
- ranked.append((score, o))
- ranked.sort(key=lambda t: t[0], reverse=True)
- top = ranked[:k]
- return [RankedOpportunity(opportunity=o, score=s) for s, o in top]
|