mirror of
https://github.com/wowlikon/LiB.git
synced 2026-02-04 04:31:09 +00:00
Расширение фронтэнда
This commit is contained in:
@@ -2,15 +2,15 @@
|
||||
from datetime import timedelta
|
||||
from typing import Annotated
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
from fastapi import APIRouter, Body, Depends, HTTPException, status
|
||||
from fastapi.security import OAuth2PasswordRequestForm
|
||||
from sqlmodel import Session, select
|
||||
|
||||
from library_service.models.db import Role, User
|
||||
from library_service.models.dto import Token, UserCreate, UserRead, UserUpdate, UserList, RoleRead, RoleList
|
||||
from library_service.settings import get_session
|
||||
from library_service.auth import (ACCESS_TOKEN_EXPIRE_MINUTES, RequireAdmin,
|
||||
RequireAuth, authenticate_user, get_password_hash,
|
||||
from library_service.auth import (ACCESS_TOKEN_EXPIRE_MINUTES, RequireAdmin, RequireAuth,
|
||||
authenticate_user, get_password_hash, decode_token,
|
||||
create_access_token, create_refresh_token)
|
||||
|
||||
router = APIRouter(prefix="/auth", tags=["authentication"])
|
||||
@@ -49,7 +49,7 @@ def register(user_data: UserCreate, session: Session = Depends(get_session)):
|
||||
hashed_password=get_password_hash(user_data.password)
|
||||
)
|
||||
|
||||
default_role = session.exec(select(Role).where(Role.name == "user")).first()
|
||||
default_role = session.exec(select(Role).where(Role.name == "member")).first()
|
||||
if default_role:
|
||||
db_user.roles.append(default_role)
|
||||
|
||||
@@ -93,13 +93,62 @@ def login(
|
||||
)
|
||||
|
||||
|
||||
@router.post(
|
||||
"/refresh",
|
||||
response_model=Token,
|
||||
summary="Обновление токена",
|
||||
description="Получение новой пары токенов (Access + Refresh) используя действующий Refresh токен",
|
||||
)
|
||||
def refresh_token(
|
||||
refresh_token: str = Body(..., embed=True),
|
||||
session: Session = Depends(get_session),
|
||||
):
|
||||
"""Эндпоинт для обновления токенов."""
|
||||
try:
|
||||
token_data = decode_token(refresh_token, expected_type="refresh")
|
||||
except HTTPException:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Invalid refresh token",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
|
||||
user = session.get(User, token_data.user_id)
|
||||
if not user:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="User not found",
|
||||
)
|
||||
|
||||
if not user.is_active:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="User is inactive",
|
||||
)
|
||||
|
||||
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
||||
new_access_token = create_access_token(
|
||||
data={"sub": user.username, "user_id": user.id},
|
||||
expires_delta=access_token_expires,
|
||||
)
|
||||
new_refresh_token = create_refresh_token(
|
||||
data={"sub": user.username, "user_id": user.id}
|
||||
)
|
||||
|
||||
return Token(
|
||||
access_token=new_access_token,
|
||||
refresh_token=new_refresh_token,
|
||||
token_type="bearer",
|
||||
)
|
||||
|
||||
|
||||
@router.get(
|
||||
"/me",
|
||||
response_model=UserRead,
|
||||
summary="Текущий пользователь",
|
||||
description="Получить информацию о текущем авторизованном пользователе",
|
||||
)
|
||||
def read_users_me(current_user: RequireAuth):
|
||||
def get_my_profile(current_user: RequireAuth):
|
||||
"""Эндпоинт получения информации о себе"""
|
||||
return UserRead(
|
||||
**current_user.model_dump(), roles=[role.name for role in current_user.roles]
|
||||
@@ -142,14 +191,17 @@ def update_user_me(
|
||||
)
|
||||
def read_users(
|
||||
admin: RequireAdmin,
|
||||
session: Session = Depends(get_session),
|
||||
skip: int = 0,
|
||||
limit: int = 100,
|
||||
session: Session = Depends(get_session),
|
||||
):
|
||||
"""Эндпоинт получения списка всех пользователей"""
|
||||
users = session.exec(select(User).offset(skip).limit(limit)).all()
|
||||
return UserList(
|
||||
users=[UserRead(**user.model_dump()) for user in users],
|
||||
users=[
|
||||
UserRead(**user.model_dump(), roles=[r.name for r in user.roles])
|
||||
for user in users
|
||||
],
|
||||
total=len(users),
|
||||
)
|
||||
|
||||
@@ -243,11 +295,14 @@ def remove_role_from_user(
|
||||
description="Возвращает список ролей",
|
||||
)
|
||||
def get_roles(
|
||||
auth: RequireAuth,
|
||||
session: Session = Depends(get_session),
|
||||
):
|
||||
"""Эндпоинт получения списа ролей"""
|
||||
user_roles = [role.name for role in auth.roles]
|
||||
exclude = {"payroll"} if "admin" in user_roles else set()
|
||||
roles = session.exec(select(Role)).all()
|
||||
return RoleList(
|
||||
roles=[RoleRead(**role.model_dump()) for role in roles],
|
||||
roles=[RoleRead(**role.model_dump(exclude=exclude)) for role in roles],
|
||||
total=len(roles),
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user