from fastapi import FastAPI, HTTPException, Depends, Request from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import sqlite3 from passlib.context import CryptContext import uvicorn from werkzeug.security import generate_password_hash, check_password_hash import jwt import datetime app = FastAPI(title="Work BD Auth API", description="API для авторизации и регистрации", version="1.0") app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:5173", "https://allowlgroup.ru"], # или список конкретных доменов allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) DB_PATH = 'users.db' # Инициализация базы данных def init_db(): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute('''CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password TEXT NOT NULL)''') conn.commit() conn.close() init_db() # Pydantic модель для входящих данных class UserIn(BaseModel): username: str password: str @app.post('/register', status_code=201, tags=["User"]) async def register(user: UserIn): if not user.username or not user.password: raise HTTPException(status_code=400, detail="Username and password required") hashed_password = generate_password_hash(user.password) try: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute('INSERT INTO users (username, password) VALUES (?, ?)', (user.username, hashed_password)) conn.commit() conn.close() except sqlite3.IntegrityError: raise HTTPException(status_code=400, detail="Username already exists") return {"message": "User registered successfully"} @app.post('/login', tags=["User"]) async def login(user: UserIn): if not user.username or not user.password: raise HTTPException(status_code=400, detail="Username and password required") conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute('SELECT password FROM users WHERE username = ?', (user.username,)) row = cursor.fetchone() conn.close() if row and check_password_hash(row[0], user.password): # Генерация JWT токена token = jwt.encode({ "username": user.username, "exp": datetime.datetime.utcnow() + datetime.timedelta(days=30) }, "95ad4fb1f2612c41ed299d5ca695945890c957fa", algorithm="HS256") response = JSONResponse(content={"message": "Login successful", "token": token}) response.set_cookie( key="auth_token", value=token, max_age=30*24*60*60, # 30 дней httponly=True, # Безопасность samesite="lax", path="/" ) response.set_cookie( key="username", value=user.username, max_age=30*24*60*60, samesite="lax", path="/" ) return response else: raise HTTPException(status_code=401, detail="Invalid credentials") @app.get('/users', tags=["User"]) async def get_users(): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute('SELECT * FROM users') rows = cursor.fetchall() conn.close() return rows @app.get('/verify', tags=["User"]) async def verify_token_endpoint(request: Request): token = request.cookies.get('auth_token') if not token: raise HTTPException(status_code=401, detail="No token provided") try: payload = jwt.decode( token, "95ad4fb1f2612c41ed299d5ca695945890c957fa", algorithms=["HS256"] ) return {"user": {"username": payload["username"]}} except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token expired") except jwt.InvalidTokenError: raise HTTPException(status_code=401, detail="Invalid token") # # Запуск сервера для теста # if __name__ == "__main__": # uvicorn.run("main:app", port=8004, reload=True)