import uuid, bcrypt, random, string from sqlalchemy.orm import Session from datetime import datetime, timedelta from redis.asyncio import Redis from app.utils.redis_utils import redis_qpush from app.core.auth import get_current_user from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError from app.models.user import VasUser from app.models.session import VasSession from app.models.email_verification import VasEmailVerification from app.schemas.auth import AutoRegisterRequest, BindEmailRequest, LoginRequest def _random_password(length=16): return ''.join(random.choices(string.ascii_letters + string.digits + "!@#$%", k=length)) class AuthService: # ----------------------- # 自动注册 # ----------------------- @staticmethod def auto_register(db: Session, req:AutoRegisterRequest): uid = str(uuid.uuid4()) user = VasUser( id=uid, role="user", nickname="anonymous visitor", preferred_language="en", timezone="Asia/Shanghai", register_ip=req.register_ip, ) db.add(user) db.commit() # 创建 session token = f"tok_{uuid.uuid4().hex}" session = VasSession( id=token, user_id=uid, user_agent=req.user_agent or "", ip=req.register_ip, expire_at=datetime.utcnow() + timedelta(days=7) ) db.add(session) db.commit() return { "user": user, "token": token } # ----------------------- # 绑定邮箱 # ----------------------- @staticmethod def bind_email(db: Session, req: BindEmailRequest, auth_user: VasUser, redis_client:Redis): user = db.query(VasUser).filter_by(email=req.email).first() if user: raise BizLogicError("Email has been bound") token = uuid.uuid4().hex record = VasEmailVerification( user_id=auth_user.id, email=req.email, token=token, expire_at=datetime.utcnow() + timedelta(minutes=30) ) db.add(record) db.commit() print(f"📧 send verification email token={token}") notification_payload = { "notification_id": uuid.uuid4().hex, "type": "verification email", "user_id": auth_user.id, "channels": ["email"], "template_id": "verification_email", "payload": { "sendTo": req.email, "token": token } } redis_qpush(redis_client, 'vas_notification_queue', notification_payload) return True # ----------------------- # 邮箱验证 # ----------------------- @staticmethod def verify_email(db: Session, token: str, redis_client:Redis): record = ( db.query(VasEmailVerification) .filter_by(token=token, used=0) .first() ) if not record: raise BizLogicError("Token invalid") if record.expire_at < datetime.utcnow(): raise BizLogicError("Token expired") # 更新 user.email user = db.query(VasUser).filter_by(id=record.user_id).first() user.email = record.email # 随机密码 plain = _random_password() hashed = bcrypt.hashpw(plain.encode(), bcrypt.gensalt()).decode() setattr(user, "password_hash", hashed) record.used = 1 db.commit() print(f"📧 send login email and password") notification_payload = { "notification_id": uuid.uuid4().hex, "type": "login credentials", "user_id": record.user_id, "channels": ["email"], "template_id": "login_credentials", "payload": { "sendTo": record.email, "username": record.email, "password": plain } } redis_qpush(redis_client, 'vas_notification_queue', notification_payload) return True # ----------------------- # 用户登录 # ----------------------- @staticmethod def login(db: Session, req:LoginRequest): user = db.query(VasUser).filter_by(email=req.email).first() if not user: raise NotFoundError("User not found") # 对比密码 if not bcrypt.checkpw(req.password.encode(), user.password_hash.encode()): raise PermissionDeniedError("Password incorrect") # 创建 session token = "tok_" + uuid.uuid4().hex session = VasSession( id=token, user_id=user.id, user_agent="", ip="", expire_at=datetime.utcnow() + timedelta(days=7) ) db.add(session) db.commit() return { "user": user, "token": token }