from __future__ import annotations from collections import Counter from dataclasses import dataclass from typing import Iterable, List, Sequence, Tuple from django.contrib.auth.models import User from django.db.models import Q from django.utils import timezone from api.models import IntroductionRequest, Opportunity, Profile def _now(): return timezone.now() def tokenize_interests_text(text: str | None) -> set[str]: if not text: return set() raw = [t.strip().lower() for part in text.split(",") for t in part.split()] # type: ignore return {t for t in raw if t} def profile_interests(profile: Profile) -> set[str]: # Prefer taggit tags when available, fall back to comma-separated string try: tag_names = set(t.lower() for t in profile.tags.names()) except Exception: tag_names = set() if tag_names: return tag_names return tokenize_interests_text(profile.interests) def tokenize_text(text: str | None) -> List[str]: if not text: return [] clean = "".join(ch.lower() if ch.isalnum() or ch.isspace() else " " for ch in text) return [t for t in clean.split() if t] def jaccard(a: Iterable[str], b: Iterable[str]) -> float: sa, sb = set(a), set(b) if not sa and not sb: return 0.0 inter = len(sa & sb) union = len(sa | sb) return inter / union if union else 0.0 def cosine(a: Sequence[str], b: Sequence[str]) -> float: if not a or not b: return 0.0 ca, cb = Counter(a), Counter(b) # dot product dot = sum(ca[t] * cb.get(t, 0) for t in ca) if dot == 0: return 0.0 import math na = math.sqrt(sum(v * v for v in ca.values())) nb = math.sqrt(sum(v * v for v in cb.values())) denom = na * nb return (dot / denom) if denom else 0.0 def time_decay(ts, half_life_days: float = 7.0) -> float: if not ts: return 0.0 import math dt = (_now() - ts).total_seconds() half_life = half_life_days * 24 * 3600.0 if dt <= 0: return 1.0 return math.pow(0.5, dt / half_life) def neighbor_set(u: User) -> set[int]: # Users this user has interacted with via introduction requests ids = set( IntroductionRequest.objects.filter(Q(from_user=u) | Q(to_user=u)) .values_list("from_user_id", "to_user_id") ) # Flatten and drop self id flat = {i for pair in ids for i in pair if i and i != u.id} return flat def graph_similarity(u: User, v: User) -> float: nu, nv = neighbor_set(u), neighbor_set(v) return jaccard(nu, nv) def has_existing_request(u: User, v: User) -> bool: return IntroductionRequest.objects.filter( (Q(from_user=u, to_user=v) | Q(from_user=v, to_user=u)) ).exists() @dataclass class RankedProfile: profile: Profile score: float def rank_users_for(user: User, k: int = 20) -> List[RankedProfile]: try: me_profile = Profile.objects.select_related("user").get(user=user) except Profile.DoesNotExist: me_profile = Profile(user=user) # empty defaults my_interests = profile_interests(me_profile) ranked: List[Tuple[float, Profile]] = [] qs = ( Profile.objects.select_related("user") .exclude(user=user) ) for p in qs.iterator(): if has_existing_request(user, p.user): continue jac = jaccard(my_interests, profile_interests(p)) # Industry match: exact string match and non-empty industry_match = 1.0 if (me_profile.industry and me_profile.industry == p.industry) else 0.0 ver = 1.0 if p.is_verified else 0.0 gsim = graph_similarity(user, p.user) # Candidate activity recency: last accepted intro involving candidate last_acc = ( IntroductionRequest.objects.filter( Q(from_user=p.user) | Q(to_user=p.user), is_accepted=True ) .order_by("-created_at") .values_list("created_at", flat=True) .first() ) rec = time_decay(last_acc) if last_acc else 0.0 score = 0.45 * jac + 0.15 * industry_match + 0.20 * gsim + 0.10 * ver + 0.10 * rec if score > 0: ranked.append((score, p)) ranked.sort(key=lambda t: t[0], reverse=True) top = ranked[:k] return [RankedProfile(profile=p, score=s) for s, p in top] @dataclass class RankedOpportunity: opportunity: Opportunity score: float def rank_opportunities_for(user: User, k: int = 20) -> List[RankedOpportunity]: try: me_profile = Profile.objects.select_related("user").get(user=user) except Profile.DoesNotExist: me_profile = Profile(user=user) # empty defaults query_tokens = tokenize_text((me_profile.bio or "") + " " + (me_profile.interests or "") + " " + (me_profile.industry or "")) ranked: List[Tuple[float, Opportunity]] = [] for o in Opportunity.objects.all().iterator(): doc_tokens = tokenize_text(o.title + " " + (o.description or "")) sim = cosine(query_tokens, doc_tokens) freshness = time_decay(o.created_at, half_life_days=10.0) score = 0.6 * sim + 0.4 * freshness if score > 0: ranked.append((score, o)) ranked.sort(key=lambda t: t[0], reverse=True) top = ranked[:k] return [RankedOpportunity(opportunity=o, score=s) for s, o in top]