mirror of
https://github.com/wowlikon/LiB.git
synced 2026-03-21 23:53:38 +00:00
Исправление ошибок, добавление ИИ-ассистента
This commit is contained in:
@@ -8,6 +8,7 @@ import os
|
||||
|
||||
from argon2.low_level import hash_secret_raw, Type
|
||||
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
||||
from fastapi import WebSocket, WebSocketException, Query
|
||||
from fastapi import Depends, HTTPException, status
|
||||
from fastapi.security import OAuth2PasswordBearer
|
||||
from jose import jwt, JWTError, ExpiredSignatureError
|
||||
@@ -251,6 +252,31 @@ def get_current_user(
|
||||
return user
|
||||
|
||||
|
||||
async def get_current_user_ws(
|
||||
websocket: WebSocket,
|
||||
token: Annotated[str | None, Query()] = None,
|
||||
session: Session = Depends(get_session),
|
||||
) -> User:
|
||||
"""Аутентификация для WebSocket через Query параметр."""
|
||||
if token is None:
|
||||
raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION, reason="Token required")
|
||||
|
||||
try:
|
||||
token_data = decode_token(token)
|
||||
|
||||
user = session.get(User, token_data.user_id)
|
||||
if user is None:
|
||||
raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION, reason="User not found")
|
||||
|
||||
if not user.is_active:
|
||||
raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION, reason="Inactive user")
|
||||
|
||||
return user
|
||||
|
||||
except HTTPException:
|
||||
raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION, reason="Invalid token")
|
||||
|
||||
|
||||
def get_current_active_user(
|
||||
current_user: Annotated[User, Depends(get_current_user)],
|
||||
) -> User:
|
||||
@@ -315,14 +341,31 @@ def require_any_role(allowed_roles: list[str]):
|
||||
return role_checker
|
||||
|
||||
|
||||
def require_any_role_ws(allowed_roles: list[str]):
|
||||
"""Создает dependency для WebSocket проверки наличия хотя бы одной из ролей"""
|
||||
|
||||
def role_checker(current_user: User = Depends(get_current_user_ws)) -> User:
|
||||
user_roles = {role.name for role in current_user.roles}
|
||||
if not (user_roles & set(allowed_roles)):
|
||||
raise WebSocketException(
|
||||
code=status.WS_1008_POLICY_VIOLATION,
|
||||
reason=f"Requires one of roles: {allowed_roles}",
|
||||
)
|
||||
return current_user
|
||||
|
||||
return role_checker
|
||||
|
||||
|
||||
# Создание dependencies
|
||||
OptionalAuth = Annotated[User | None, Depends(get_optional_user)]
|
||||
RequireAuth = Annotated[User, Depends(get_current_active_user)]
|
||||
RequireAuthWS = Annotated[User, Depends(get_current_user_ws)]
|
||||
RequireAdmin = Annotated[User, Depends(require_role("admin"))]
|
||||
RequireMember = Annotated[User, Depends(require_role("member"))]
|
||||
RequireLibrarian = Annotated[User, Depends(require_role("librarian"))]
|
||||
RequirePartialAuth = Annotated[User, Depends(get_user_from_partial_token)]
|
||||
RequireStaff = Annotated[User, Depends(require_any_role(["admin", "librarian"]))]
|
||||
RequireStaffWS = Annotated[User, Depends(require_any_role_ws(["admin", "librarian"]))]
|
||||
|
||||
|
||||
def is_user_staff(user: User) -> bool:
|
||||
|
||||
Reference in New Issue
Block a user