| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- from __future__ import annotations
- from django.core.management.base import BaseCommand, CommandError
- from django.contrib.auth import get_user_model
- from orgs.models import Organization, UserProfile
- class Command(BaseCommand):
- help = "Create demo users for an organization with roles and default passwords."
- def add_arguments(self, parser):
- parser.add_argument("--org", required=True, help="Organization code, id, or name")
- parser.add_argument("--owners", type=int, default=1, help="Number of owners to create (default: 1)")
- parser.add_argument("--managers", type=int, default=2, help="Number of managers to create (default: 2)")
- parser.add_argument("--drivers", type=int, default=2, help="Number of drivers to create (default: 2)")
- parser.add_argument("--customers", type=int, default=2, help="Number of customers to create (default: 2)")
- parser.add_argument("--auditors", type=int, default=0, help="Number of auditors to create (default: 0)")
- parser.add_argument("--password", default="password123", help="Password to set for demo users")
- parser.add_argument("--prefix", default=None, help="Username prefix (defaults to org code)")
- parser.add_argument("--email-domain", default="example.com", help="Email domain for users")
- parser.add_argument("--staff-owners", action="store_true", help="Mark owner users as staff for admin access")
- def handle(self, *args, **options):
- org_ident: str = options["org"]
- counts = {
- UserProfile.ROLE_OWNER: int(options["owners"]),
- UserProfile.ROLE_MANAGER: int(options["managers"]),
- UserProfile.ROLE_DRIVER: int(options["drivers"]),
- UserProfile.ROLE_CUSTOMER: int(options["customers"]),
- UserProfile.ROLE_AUDITOR: int(options["auditors"]),
- }
- password: str = options["password"]
- email_domain: str = options["email_domain"]
- staff_owners: bool = bool(options.get("staff_owners", False))
- # Resolve organization
- org = None
- if org_ident.isdigit():
- org = Organization.objects.filter(pk=int(org_ident)).first()
- if org is None:
- org = Organization.objects.filter(code=org_ident).first() or Organization.objects.filter(name=org_ident).first()
- if org is None:
- raise CommandError(f"Organization not found: {org_ident}")
- prefix = options.get("prefix") or org.code
- U = get_user_model()
- created = 0
- results = []
- for role, n in counts.items():
- for i in range(1, max(0, n) + 1):
- username = f"{prefix}-{role}-{i}"
- email = f"{username}@{email_domain}"
- user, was_created = U.objects.get_or_create(
- username=username,
- defaults={"email": email},
- )
- if was_created:
- user.set_password(password)
- if staff_owners and role == UserProfile.ROLE_OWNER:
- user.is_staff = True
- user.save()
- profile, _ = UserProfile.objects.get_or_create(
- user=user,
- defaults={"organization": org, "role": role},
- )
- if profile.organization_id != org.id or profile.role != role:
- profile.organization = org
- profile.role = role
- profile.save(update_fields=["organization", "role"])
- created += 1
- results.append((username, role))
- else:
- # Ensure profile exists and matches org/role
- profile, _ = UserProfile.objects.get_or_create(
- user=user,
- defaults={"organization": org, "role": role},
- )
- changed = False
- if profile.organization_id != org.id:
- profile.organization = org
- changed = True
- if profile.role != role:
- profile.role = role
- changed = True
- if changed:
- profile.save(update_fields=["organization", "role"])
- self.stdout.write(self.style.SUCCESS(
- f"Created {created} users for org {org.code}. Password='{password}'."
- ))
- if results:
- for uname, role in results[:10]:
- self.stdout.write(f" - {uname} ({role})")
- if len(results) > 10:
- self.stdout.write(f" ... and {len(results) - 10} more")
|