Няма описание

carbon.py 5.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. from __future__ import annotations
  2. from datetime import date
  3. from decimal import Decimal
  4. from typing import Iterable
  5. from django.db import transaction
  6. from django.db.models import Q
  7. from django.utils import timezone
  8. from ..models import (
  9. Organization,
  10. Material,
  11. WeighTicket,
  12. WeighLine,
  13. MaterialEmissionFactor,
  14. CarbonEvent,
  15. CarbonBalance,
  16. )
  17. LB_TO_KG = Decimal("0.45359237")
  18. def _convert_qty(qty: Decimal, from_unit: str, to_unit: str) -> Decimal:
  19. if from_unit == to_unit:
  20. return qty
  21. # Minimal support: lb <-> kg; pcs assumed 1:1 (no conversion)
  22. if from_unit == Material.UNIT_LB and to_unit == Material.UNIT_KG:
  23. return (qty * LB_TO_KG).quantize(Decimal("0.000"))
  24. if from_unit == Material.UNIT_KG and to_unit == Material.UNIT_LB:
  25. return (qty / LB_TO_KG).quantize(Decimal("0.000"))
  26. return qty
  27. def get_applicable_factor(org: Organization, material: Material, unit: str, as_of: date | None = None) -> MaterialEmissionFactor | None:
  28. """Return the most recent factor effective for the given org/material/unit.
  29. If none matches the unit, try the material's default unit as a fallback.
  30. """
  31. as_of = as_of or timezone.now().date()
  32. qs = (
  33. MaterialEmissionFactor.objects.filter(
  34. organization=org,
  35. material=material,
  36. unit=unit,
  37. )
  38. .filter(Q(effective_from__isnull=True) | Q(effective_from__lte=as_of))
  39. .order_by("-effective_from", "-id")
  40. )
  41. fac = qs.first()
  42. if fac:
  43. return fac
  44. # fallback to default unit
  45. if unit != material.default_unit:
  46. qs2 = (
  47. MaterialEmissionFactor.objects.filter(
  48. organization=org,
  49. material=material,
  50. unit=material.default_unit,
  51. )
  52. .filter(Q(effective_from__isnull=True) | Q(effective_from__lte=as_of))
  53. .order_by("-effective_from", "-id")
  54. )
  55. return qs2.first()
  56. return None
  57. @transaction.atomic
  58. def create_event_for_line(line: WeighLine) -> CarbonEvent | None:
  59. """Create or update a pending CarbonEvent for a given weigh line.
  60. Returns the event or None if no factor available.
  61. """
  62. ticket = line.ticket
  63. org = ticket.pickup.organization
  64. fac = get_applicable_factor(org, line.material, line.unit)
  65. if not fac:
  66. # Try convert to factor unit
  67. fac = get_applicable_factor(org, line.material, fac_unit := line.material.default_unit)
  68. if not fac:
  69. return None
  70. qty = _convert_qty(Decimal(line.quantity), line.unit, fac.unit)
  71. else:
  72. qty = Decimal(line.quantity)
  73. kgco2e = (qty * Decimal(fac.kgco2e_per_unit)).quantize(Decimal("0.000001"))
  74. ev, _ = CarbonEvent.objects.update_or_create(
  75. weigh_line=line,
  76. defaults={
  77. "organization": org,
  78. "weigh_ticket": ticket,
  79. "material": line.material,
  80. "quantity": qty,
  81. "unit": fac.unit,
  82. "kgco2e": kgco2e,
  83. "event_date": timezone.now(),
  84. "status": CarbonEvent.STATUS_PENDING,
  85. },
  86. )
  87. return ev
  88. def calculate_events_for_ticket(ticket: WeighTicket) -> list[CarbonEvent]:
  89. events: list[CarbonEvent] = []
  90. for ln in ticket.lines.select_related("material"):
  91. ev = create_event_for_line(ln)
  92. if ev:
  93. events.append(ev)
  94. return events
  95. def approve_events(org: Organization, events: Iterable[CarbonEvent], *, approver) -> int:
  96. """Approve events and update monthly CarbonBalance aggregates.
  97. Returns number approved.
  98. """
  99. approved = 0
  100. months: set[tuple[int, int]] = set()
  101. now = timezone.now()
  102. for ev in events:
  103. if ev.organization_id != org.id:
  104. continue
  105. if ev.status == CarbonEvent.STATUS_APPROVED:
  106. continue
  107. ev.status = CarbonEvent.STATUS_APPROVED
  108. ev.approved_by = approver
  109. ev.approved_at = now
  110. ev.save(update_fields=["status", "approved_by", "approved_at", "updated_at"])
  111. months.add((ev.event_date.year, ev.event_date.month))
  112. approved += 1
  113. # recompute balances for affected months
  114. for (y, m) in months:
  115. recompute_balance_for_month(org, y, m)
  116. return approved
  117. def recompute_balance_for_month(org: Organization, year: int, month: int) -> CarbonBalance:
  118. qs = CarbonEvent.objects.filter(
  119. organization=org,
  120. status=CarbonEvent.STATUS_APPROVED,
  121. event_date__year=year,
  122. event_date__month=month,
  123. )
  124. # Avoid circular import for Sum; simple loop to sum
  125. total_val = Decimal("0")
  126. cnt = 0
  127. for ev in qs.only("kgco2e"):
  128. total_val += Decimal(ev.kgco2e)
  129. cnt += 1
  130. bal, _ = CarbonBalance.objects.get_or_create(organization=org, year=year, month=month)
  131. bal.events_count = cnt
  132. bal.approved_kgco2e = total_val.quantize(Decimal("0.000001"))
  133. bal.save(update_fields=["events_count", "approved_kgco2e", "updated_at"])
  134. return bal
  135. def get_totals_for_org(org: Organization) -> dict[str, Decimal]:
  136. """Convenience totals for dashboards."""
  137. from django.utils.timezone import now
  138. today = now().date()
  139. y = today.year
  140. m = today.month
  141. bal = CarbonBalance.objects.filter(organization=org, year=y, month=m).first()
  142. pending = CarbonEvent.objects.filter(organization=org, status=CarbonEvent.STATUS_PENDING).count()
  143. return {
  144. "pending_events": pending,
  145. "approved_mtd": (bal.approved_kgco2e if bal else Decimal("0")),
  146. }