| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171 |
- from __future__ import annotations
- from datetime import date
- from decimal import Decimal
- from typing import Iterable
- from django.db import transaction
- from django.db.models import Q
- from django.utils import timezone
- from ..models import (
- Organization,
- Material,
- WeighTicket,
- WeighLine,
- MaterialEmissionFactor,
- CarbonEvent,
- CarbonBalance,
- )
- LB_TO_KG = Decimal("0.45359237")
- def _convert_qty(qty: Decimal, from_unit: str, to_unit: str) -> Decimal:
- if from_unit == to_unit:
- return qty
- # Minimal support: lb <-> kg; pcs assumed 1:1 (no conversion)
- if from_unit == Material.UNIT_LB and to_unit == Material.UNIT_KG:
- return (qty * LB_TO_KG).quantize(Decimal("0.000"))
- if from_unit == Material.UNIT_KG and to_unit == Material.UNIT_LB:
- return (qty / LB_TO_KG).quantize(Decimal("0.000"))
- return qty
- def get_applicable_factor(org: Organization, material: Material, unit: str, as_of: date | None = None) -> MaterialEmissionFactor | None:
- """Return the most recent factor effective for the given org/material/unit.
- If none matches the unit, try the material's default unit as a fallback.
- """
- as_of = as_of or timezone.now().date()
- qs = (
- MaterialEmissionFactor.objects.filter(
- organization=org,
- material=material,
- unit=unit,
- )
- .filter(Q(effective_from__isnull=True) | Q(effective_from__lte=as_of))
- .order_by("-effective_from", "-id")
- )
- fac = qs.first()
- if fac:
- return fac
- # fallback to default unit
- if unit != material.default_unit:
- qs2 = (
- MaterialEmissionFactor.objects.filter(
- organization=org,
- material=material,
- unit=material.default_unit,
- )
- .filter(Q(effective_from__isnull=True) | Q(effective_from__lte=as_of))
- .order_by("-effective_from", "-id")
- )
- return qs2.first()
- return None
- @transaction.atomic
- def create_event_for_line(line: WeighLine) -> CarbonEvent | None:
- """Create or update a pending CarbonEvent for a given weigh line.
- Returns the event or None if no factor available.
- """
- ticket = line.ticket
- org = ticket.pickup.organization
- fac = get_applicable_factor(org, line.material, line.unit)
- if not fac:
- # Try convert to factor unit
- fac = get_applicable_factor(org, line.material, fac_unit := line.material.default_unit)
- if not fac:
- return None
- qty = _convert_qty(Decimal(line.quantity), line.unit, fac.unit)
- else:
- qty = Decimal(line.quantity)
- kgco2e = (qty * Decimal(fac.kgco2e_per_unit)).quantize(Decimal("0.000001"))
- ev, _ = CarbonEvent.objects.update_or_create(
- weigh_line=line,
- defaults={
- "organization": org,
- "weigh_ticket": ticket,
- "material": line.material,
- "quantity": qty,
- "unit": fac.unit,
- "kgco2e": kgco2e,
- "event_date": timezone.now(),
- "status": CarbonEvent.STATUS_PENDING,
- },
- )
- return ev
- def calculate_events_for_ticket(ticket: WeighTicket) -> list[CarbonEvent]:
- events: list[CarbonEvent] = []
- for ln in ticket.lines.select_related("material"):
- ev = create_event_for_line(ln)
- if ev:
- events.append(ev)
- return events
- def approve_events(org: Organization, events: Iterable[CarbonEvent], *, approver) -> int:
- """Approve events and update monthly CarbonBalance aggregates.
- Returns number approved.
- """
- approved = 0
- months: set[tuple[int, int]] = set()
- now = timezone.now()
- for ev in events:
- if ev.organization_id != org.id:
- continue
- if ev.status == CarbonEvent.STATUS_APPROVED:
- continue
- ev.status = CarbonEvent.STATUS_APPROVED
- ev.approved_by = approver
- ev.approved_at = now
- ev.save(update_fields=["status", "approved_by", "approved_at", "updated_at"])
- months.add((ev.event_date.year, ev.event_date.month))
- approved += 1
- # recompute balances for affected months
- for (y, m) in months:
- recompute_balance_for_month(org, y, m)
- return approved
- def recompute_balance_for_month(org: Organization, year: int, month: int) -> CarbonBalance:
- qs = CarbonEvent.objects.filter(
- organization=org,
- status=CarbonEvent.STATUS_APPROVED,
- event_date__year=year,
- event_date__month=month,
- )
- # Avoid circular import for Sum; simple loop to sum
- total_val = Decimal("0")
- cnt = 0
- for ev in qs.only("kgco2e"):
- total_val += Decimal(ev.kgco2e)
- cnt += 1
- bal, _ = CarbonBalance.objects.get_or_create(organization=org, year=year, month=month)
- bal.events_count = cnt
- bal.approved_kgco2e = total_val.quantize(Decimal("0.000001"))
- bal.save(update_fields=["events_count", "approved_kgco2e", "updated_at"])
- return bal
- def get_totals_for_org(org: Organization) -> dict[str, Decimal]:
- """Convenience totals for dashboards."""
- from django.utils.timezone import now
- today = now().date()
- y = today.year
- m = today.month
- bal = CarbonBalance.objects.filter(organization=org, year=y, month=m).first()
- pending = CarbonEvent.objects.filter(organization=org, status=CarbonEvent.STATUS_PENDING).count()
- return {
- "pending_events": pending,
- "approved_mtd": (bal.approved_kgco2e if bal else Decimal("0")),
- }
|