import uuid, bcrypt, random, string from sqlalchemy.orm import Session from datetime import datetime, timedelta from redis.asyncio import Redis from app.utils.redis_utils import redis_qpush from app.core.auth import get_current_user from app.core.biz_exception import NotFoundError, PermissionDeniedError, BizLogicError from app.models.user import VasUser from app.models.session import VasSession from app.models.email_verification import VasEmailVerification from app.schemas.auth import AutoRegisterRequest, SendBindCodeRequest, SendResetCodeRequest, BindEmailRequest, ResetPasswordRequest, LoginRequest from app.services.notification_service import NotificationService def _random_password(length=16): return ''.join(random.choices(string.ascii_letters + string.digits + "!@#$%", k=length)) class AuthService: # ----------------------- # 自动注册 # ----------------------- @staticmethod def auto_register(db: Session, req:AutoRegisterRequest): uid = f'usr-{uuid.uuid4().hex[:8]}' user = VasUser( id=uid, role="user", nickname="anonymous visitor", preferred_language="en", timezone="Asia/Shanghai", register_ip=req.register_ip, ) db.add(user) db.commit() # 创建 session token = f"tok_{uuid.uuid4().hex}" session = VasSession( id=token, user_id=uid, user_agent=req.user_agent or "", ip=req.register_ip, expire_at=datetime.utcnow() + timedelta(days=7) ) db.add(session) db.commit() return { "user": user, "token": token } def send_bind_code(db: Session, payload: SendBindCodeRequest, auth_user: VasUser, redis_client:Redis): token = uuid.uuid4().hex[0:6] record = VasEmailVerification( user_id=auth_user.id, email=payload.email, token=token, expire_at=datetime.utcnow() + timedelta(minutes=30) ) db.add(record) db.commit() print(f"📧 send verification email token={token}") NotificationService.create( redis_client=redis_client, ntype="email verification email", user_id=auth_user.id, channels=["email"], template_id="email_verification_for_bind", payload={ "token": token } ) def send_reset_code(db: Session, payload: SendResetCodeRequest, redis_client:Redis): user = db.query(VasUser).filter( VasUser.email == payload.email, VasUser.email_verified == 1 ).first() if not user: raise BizLogicError("User not exist") token = uuid.uuid4().hex[0:6] record = VasEmailVerification( user_id=user.id, email=payload.email, token=token, expire_at=datetime.utcnow() + timedelta(minutes=30) ) db.add(record) db.commit() print(f"📧 send verification email token={token}") NotificationService.create( redis_client=redis_client, ntype="email verification email", user_id=user.id, channels=["email"], template_id="email_verification_for_reset", payload={ "token": token } ) # ----------------------- # 绑定邮箱 # ----------------------- @staticmethod def bind_email(db: Session, payload: BindEmailRequest, auth_user: VasUser, redis_client:Redis): user = db.query(VasUser).filter( VasUser.email == payload.email, VasUser.email_verified == 1 ).first() if user: raise BizLogicError("Email has been bound") record = ( db.query(VasEmailVerification) .filter_by(token=payload.code, used=0) .first() ) if not record: raise BizLogicError("Token invalid") if record.expire_at < datetime.utcnow(): raise BizLogicError("Token expired") # 更新 user.email user = db.query(VasUser).filter_by(id=record.user_id).first() user.email = payload.email # 随机密码 plain = _random_password() hashed = bcrypt.hashpw(plain.encode(), bcrypt.gensalt()).decode() user.password_hash = hashed user.email_verified = 1 record.used = 1 # 创建 session session_id = "tok_" + uuid.uuid4().hex session = VasSession( id=session_id, user_id=user.id, user_agent="", ip="", expire_at=datetime.utcnow() + timedelta(days=30) ) db.add(session) db.commit() db.refresh(user) print(f"📧 send login email and password") NotificationService.create( redis_client=redis_client, ntype="login credentials", user_id=user.id, channels=["email"], template_id="login_credentials", payload={ "username": payload.email, "password": plain } ) return { "user": user, "token": session_id } def reset_password(db: Session, payload: ResetPasswordRequest): user = db.query(VasUser).filter( VasUser.email == payload.email, VasUser.email_verified == 1 ).first() if not user: raise BizLogicError("User not exist") record = ( db.query(VasEmailVerification) .filter_by(token=payload.code, used=0) .first() ) if not record: raise BizLogicError("Token invalid") if record.expire_at < datetime.utcnow(): raise BizLogicError("Token expired") hashed = bcrypt.hashpw(payload.new_password.encode(), bcrypt.gensalt()).decode() user.password_hash = hashed record.used = 1 db.commit() return True # ----------------------- # 用户登录 # ----------------------- @staticmethod def login(db: Session, req:LoginRequest): user = db.query(VasUser).filter_by(email=req.email).first() if not user: raise NotFoundError("User not found") # 对比密码 if not bcrypt.checkpw(req.password.encode(), user.password_hash.encode()): raise PermissionDeniedError("Password incorrect") # 创建 session token = "tok_" + uuid.uuid4().hex session = VasSession( id=token, user_id=user.id, user_agent="", ip="", expire_at=datetime.utcnow() + timedelta(days=7) ) db.add(session) db.commit() return { "user": user, "token": token }