from __future__ import annotations from datetime import date from decimal import Decimal from typing import Iterable from django.db import transaction from django.db.models import Q from django.utils import timezone from ..models import ( Organization, Material, WeighTicket, WeighLine, MaterialEmissionFactor, CarbonEvent, CarbonBalance, ) LB_TO_KG = Decimal("0.45359237") def _convert_qty(qty: Decimal, from_unit: str, to_unit: str) -> Decimal: if from_unit == to_unit: return qty # Minimal support: lb <-> kg; pcs assumed 1:1 (no conversion) if from_unit == Material.UNIT_LB and to_unit == Material.UNIT_KG: return (qty * LB_TO_KG).quantize(Decimal("0.000")) if from_unit == Material.UNIT_KG and to_unit == Material.UNIT_LB: return (qty / LB_TO_KG).quantize(Decimal("0.000")) return qty def get_applicable_factor(org: Organization, material: Material, unit: str, as_of: date | None = None) -> MaterialEmissionFactor | None: """Return the most recent factor effective for the given org/material/unit. If none matches the unit, try the material's default unit as a fallback. """ as_of = as_of or timezone.now().date() qs = ( MaterialEmissionFactor.objects.filter( organization=org, material=material, unit=unit, ) .filter(Q(effective_from__isnull=True) | Q(effective_from__lte=as_of)) .order_by("-effective_from", "-id") ) fac = qs.first() if fac: return fac # fallback to default unit if unit != material.default_unit: qs2 = ( MaterialEmissionFactor.objects.filter( organization=org, material=material, unit=material.default_unit, ) .filter(Q(effective_from__isnull=True) | Q(effective_from__lte=as_of)) .order_by("-effective_from", "-id") ) return qs2.first() return None @transaction.atomic def create_event_for_line(line: WeighLine) -> CarbonEvent | None: """Create or update a pending CarbonEvent for a given weigh line. Returns the event or None if no factor available. """ ticket = line.ticket org = ticket.pickup.organization fac = get_applicable_factor(org, line.material, line.unit) if not fac: # Try convert to factor unit fac = get_applicable_factor(org, line.material, fac_unit := line.material.default_unit) if not fac: return None qty = _convert_qty(Decimal(line.quantity), line.unit, fac.unit) else: qty = Decimal(line.quantity) kgco2e = (qty * Decimal(fac.kgco2e_per_unit)).quantize(Decimal("0.000001")) ev, _ = CarbonEvent.objects.update_or_create( weigh_line=line, defaults={ "organization": org, "weigh_ticket": ticket, "material": line.material, "quantity": qty, "unit": fac.unit, "kgco2e": kgco2e, "event_date": timezone.now(), "status": CarbonEvent.STATUS_PENDING, }, ) return ev def calculate_events_for_ticket(ticket: WeighTicket) -> list[CarbonEvent]: events: list[CarbonEvent] = [] for ln in ticket.lines.select_related("material"): ev = create_event_for_line(ln) if ev: events.append(ev) return events def approve_events(org: Organization, events: Iterable[CarbonEvent], *, approver) -> int: """Approve events and update monthly CarbonBalance aggregates. Returns number approved. """ approved = 0 months: set[tuple[int, int]] = set() now = timezone.now() for ev in events: if ev.organization_id != org.id: continue if ev.status == CarbonEvent.STATUS_APPROVED: continue ev.status = CarbonEvent.STATUS_APPROVED ev.approved_by = approver ev.approved_at = now ev.save(update_fields=["status", "approved_by", "approved_at", "updated_at"]) months.add((ev.event_date.year, ev.event_date.month)) approved += 1 # recompute balances for affected months for (y, m) in months: recompute_balance_for_month(org, y, m) return approved def recompute_balance_for_month(org: Organization, year: int, month: int) -> CarbonBalance: qs = CarbonEvent.objects.filter( organization=org, status=CarbonEvent.STATUS_APPROVED, event_date__year=year, event_date__month=month, ) # Avoid circular import for Sum; simple loop to sum total_val = Decimal("0") cnt = 0 for ev in qs.only("kgco2e"): total_val += Decimal(ev.kgco2e) cnt += 1 bal, _ = CarbonBalance.objects.get_or_create(organization=org, year=year, month=month) bal.events_count = cnt bal.approved_kgco2e = total_val.quantize(Decimal("0.000001")) bal.save(update_fields=["events_count", "approved_kgco2e", "updated_at"]) return bal def get_totals_for_org(org: Organization) -> dict[str, Decimal]: """Convenience totals for dashboards.""" from django.utils.timezone import now today = now().date() y = today.year m = today.month bal = CarbonBalance.objects.filter(organization=org, year=y, month=m).first() pending = CarbonEvent.objects.filter(organization=org, status=CarbonEvent.STATUS_PENDING).count() return { "pending_events": pending, "approved_mtd": (bal.approved_kgco2e if bal else Decimal("0")), }