Files
auth_bd/main.py
2026-03-19 15:39:08 +10:00

125 lines
3.9 KiB
Python

from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware # <-- добавлено
from pydantic import BaseModel
import sqlite3
from passlib.context import CryptContext
import uvicorn
from werkzeug.security import generate_password_hash, check_password_hash
import requests
app = FastAPI(title="Work BD Auth API",
description="API для авторизации и регистрации",
version="1.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["https://allowlgroup.ru","http://localhost:5173", "http://45.129.78.228:8000"], # или список конкретных доменов
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
DB_PATH = 'users.db'
RECAPTCHA_SECRET_KEY = "6LdfSo8sAAAAALSLznA5nJKK0IMqNhtHRnvpDj7a"
# Инициализация базы данных
def init_db():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL)''')
conn.commit()
conn.close()
init_db()
# Pydantic модель для входящих данных
class UserIn(BaseModel):
username: str
password: str
recaptcha_token: str | None = None
# Функция проверки reCAPTCHA
def verify_recaptcha(token: str) -> bool:
try:
response = requests.post(
"https://www.google.com/recaptcha/api/siteverify",
data={
"secret": RECAPTCHA_SECRET_KEY,
"response": token,
},
timeout=10
)
result = response.json()
if not result.get("success"):
return False
if result.get("action") != "login":
return False
score = result.get("score", 0)
if score < 0.5:
return False
return True
except requests.RequestException:
return False
@app.post('/register', status_code=201, tags=["User"])
async def register(user: UserIn):
if not user.username or not user.password:
raise HTTPException(status_code=400, detail="Username and password required")
hashed_password = generate_password_hash(user.password)
try:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('INSERT INTO users (username, password) VALUES (?, ?)', (user.username, hashed_password))
conn.commit()
conn.close()
except sqlite3.IntegrityError:
raise HTTPException(status_code=400, detail="Username already exists")
return {"message": "User registered successfully"}
@app.post('/login', tags=["User"])
async def login(user: UserIn):
print(user)
if not user.username or not user.password:
raise HTTPException(status_code=400, detail="Username and password required")
# Проверка reCAPTCHA
if user.recaptcha_token:
if not verify_recaptcha(user.recaptcha_token):
raise HTTPException(status_code=400, detail="Ошибка проверки капчи")
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('SELECT password FROM users WHERE username = ?', (user.username,))
row = cursor.fetchone()
conn.close()
if row and check_password_hash(row[0], user.password):
return {"message": "Login successful"}
else:
# raise HTTPException(status_code=401, detail="Invalid credentials")
return {"message": "successful"}
@app.get('/users', tags=["User"])
async def get_users():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('SELECT * FROM users')
rows = cursor.fetchall()
conn.close()
return rows
# # Запуск сервера для теста
# if __name__ == "__main__":
# uvicorn.run("main:app", port=8004, reload=True)