Files
auth_bd/main.py
Игорь Бандурист 6965cc8d94
All checks were successful
continuous-integration/drone/push Build is passing
verify
2026-05-31 16:37:13 +10:00

131 lines
4.3 KiB
Python

from fastapi import FastAPI, HTTPException, Depends, Request
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import sqlite3
from passlib.context import CryptContext
import uvicorn
from werkzeug.security import generate_password_hash, check_password_hash
import jwt
import datetime
app = FastAPI(title="Work BD Auth API",
description="API для авторизации и регистрации",
version="1.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:5173", "https://allowlgroup.ru"], # или список конкретных доменов
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
DB_PATH = 'users.db'
# Инициализация базы данных
def init_db():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL)''')
conn.commit()
conn.close()
init_db()
# Pydantic модель для входящих данных
class UserIn(BaseModel):
username: str
password: str
@app.post('/register', status_code=201, tags=["User"])
async def register(user: UserIn):
if not user.username or not user.password:
raise HTTPException(status_code=400, detail="Username and password required")
hashed_password = generate_password_hash(user.password)
try:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('INSERT INTO users (username, password) VALUES (?, ?)', (user.username, hashed_password))
conn.commit()
conn.close()
except sqlite3.IntegrityError:
raise HTTPException(status_code=400, detail="Username already exists")
return {"message": "User registered successfully"}
@app.post('/login', tags=["User"])
async def login(user: UserIn):
if not user.username or not user.password:
raise HTTPException(status_code=400, detail="Username and password required")
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('SELECT password FROM users WHERE username = ?', (user.username,))
row = cursor.fetchone()
conn.close()
if row and check_password_hash(row[0], user.password):
# Генерация JWT токена
token = jwt.encode({
"username": user.username,
"exp": datetime.datetime.utcnow() + datetime.timedelta(days=30)
}, "95ad4fb1f2612c41ed299d5ca695945890c957fa", algorithm="HS256")
response = JSONResponse(content={"message": "Login successful", "token": token})
response.set_cookie(
key="auth_token",
value=token,
max_age=30*24*60*60, # 30 дней
httponly=True, # Безопасность
samesite="lax",
path="/"
)
response.set_cookie(
key="username",
value=user.username,
max_age=30*24*60*60,
samesite="lax",
path="/"
)
return response
else:
raise HTTPException(status_code=401, detail="Invalid credentials")
@app.get('/users', tags=["User"])
async def get_users():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('SELECT * FROM users')
rows = cursor.fetchall()
conn.close()
return rows
@app.get('/verify', tags=["User"])
async def verify_token_endpoint(request: Request):
token = request.cookies.get('auth_token')
if not token:
raise HTTPException(status_code=401, detail="No token provided")
try:
payload = jwt.decode(
token,
"95ad4fb1f2612c41ed299d5ca695945890c957fa",
algorithms=["HS256"]
)
return {"user": {"username": payload["username"]}}
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
# # Запуск сервера для теста
# if __name__ == "__main__":
# uvicorn.run("main:app", port=8004, reload=True)