from __future__ import annotations from django.core.management.base import BaseCommand, CommandError from django.contrib.auth import get_user_model from orgs.models import Organization, UserProfile class Command(BaseCommand): help = "Create demo users for an organization with roles and default passwords." def add_arguments(self, parser): parser.add_argument("--org", required=True, help="Organization code, id, or name") parser.add_argument("--owners", type=int, default=1, help="Number of owners to create (default: 1)") parser.add_argument("--managers", type=int, default=2, help="Number of managers to create (default: 2)") parser.add_argument("--drivers", type=int, default=2, help="Number of drivers to create (default: 2)") parser.add_argument("--customers", type=int, default=2, help="Number of customers to create (default: 2)") parser.add_argument("--auditors", type=int, default=0, help="Number of auditors to create (default: 0)") parser.add_argument("--password", default="password123", help="Password to set for demo users") parser.add_argument("--prefix", default=None, help="Username prefix (defaults to org code)") parser.add_argument("--email-domain", default="example.com", help="Email domain for users") parser.add_argument("--staff-owners", action="store_true", help="Mark owner users as staff for admin access") def handle(self, *args, **options): org_ident: str = options["org"] counts = { UserProfile.ROLE_OWNER: int(options["owners"]), UserProfile.ROLE_MANAGER: int(options["managers"]), UserProfile.ROLE_DRIVER: int(options["drivers"]), UserProfile.ROLE_CUSTOMER: int(options["customers"]), UserProfile.ROLE_AUDITOR: int(options["auditors"]), } password: str = options["password"] email_domain: str = options["email_domain"] staff_owners: bool = bool(options.get("staff_owners", False)) # Resolve organization org = None if org_ident.isdigit(): org = Organization.objects.filter(pk=int(org_ident)).first() if org is None: org = Organization.objects.filter(code=org_ident).first() or Organization.objects.filter(name=org_ident).first() if org is None: raise CommandError(f"Organization not found: {org_ident}") prefix = options.get("prefix") or org.code U = get_user_model() created = 0 results = [] for role, n in counts.items(): for i in range(1, max(0, n) + 1): username = f"{prefix}-{role}-{i}" email = f"{username}@{email_domain}" user, was_created = U.objects.get_or_create( username=username, defaults={"email": email}, ) if was_created: user.set_password(password) if staff_owners and role == UserProfile.ROLE_OWNER: user.is_staff = True user.save() profile, _ = UserProfile.objects.get_or_create( user=user, defaults={"organization": org, "role": role}, ) if profile.organization_id != org.id or profile.role != role: profile.organization = org profile.role = role profile.save(update_fields=["organization", "role"]) created += 1 results.append((username, role)) else: # Ensure profile exists and matches org/role profile, _ = UserProfile.objects.get_or_create( user=user, defaults={"organization": org, "role": role}, ) changed = False if profile.organization_id != org.id: profile.organization = org changed = True if profile.role != role: profile.role = role changed = True if changed: profile.save(update_fields=["organization", "role"]) self.stdout.write(self.style.SUCCESS( f"Created {created} users for org {org.code}. Password='{password}'." )) if results: for uname, role in results[:10]: self.stdout.write(f" - {uname} ({role})") if len(results) > 10: self.stdout.write(f" ... and {len(results) - 10} more")