All checks were successful
continuous-integration/drone/push Build is passing
127 lines
3.9 KiB
Python
127 lines
3.9 KiB
Python
from fastapi import FastAPI, HTTPException, Depends, Request
|
|
from fastapi.responses import JSONResponse
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from pydantic import BaseModel
|
|
import sqlite3
|
|
import uvicorn
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
import jwt
|
|
import datetime
|
|
|
|
app = FastAPI(title="Work BD Auth API",
|
|
description="API для авторизации и регистрации",
|
|
version="1.0")
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=[
|
|
"http://localhost:5173",
|
|
"https://allowlgroup.ru",
|
|
],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
DB_PATH = 'users.db'
|
|
|
|
def init_db():
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute('''CREATE TABLE IF NOT EXISTS users (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
username TEXT UNIQUE NOT NULL,
|
|
password TEXT NOT NULL)''')
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
init_db()
|
|
|
|
class UserIn(BaseModel):
|
|
username: str
|
|
password: str
|
|
|
|
@app.post('/register', status_code=201, tags=["User"])
|
|
async def register(user: UserIn):
|
|
if not user.username or not user.password:
|
|
raise HTTPException(status_code=400, detail="Username and password required")
|
|
|
|
hashed_password = generate_password_hash(user.password)
|
|
try:
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute('INSERT INTO users (username, password) VALUES (?, ?)', (user.username, hashed_password))
|
|
conn.commit()
|
|
conn.close()
|
|
except sqlite3.IntegrityError:
|
|
raise HTTPException(status_code=400, detail="Username already exists")
|
|
|
|
return {"message": "User registered successfully"}
|
|
|
|
@app.post('/login', tags=["User"])
|
|
async def login(user: UserIn):
|
|
if not user.username or not user.password:
|
|
raise HTTPException(status_code=400, detail="Username and password required")
|
|
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT password FROM users WHERE username = ?', (user.username,))
|
|
row = cursor.fetchone()
|
|
conn.close()
|
|
|
|
if row and check_password_hash(row[0], user.password):
|
|
token = jwt.encode({
|
|
"username": user.username,
|
|
"exp": datetime.datetime.utcnow() + datetime.timedelta(days=30)
|
|
}, "95ad4fb1f2612c41ed299d5ca695945890c957fa", algorithm="HS256")
|
|
|
|
response = JSONResponse(content={"message": "Login successful", "token": token})
|
|
response.set_cookie(
|
|
key="auth_token",
|
|
value=token,
|
|
max_age=30*24*60*60,
|
|
httponly=True,
|
|
samesite="lax",
|
|
path="/"
|
|
)
|
|
response.set_cookie(
|
|
key="username",
|
|
value=user.username,
|
|
max_age=30*24*60*60,
|
|
samesite="lax",
|
|
path="/"
|
|
)
|
|
return response
|
|
else:
|
|
raise HTTPException(status_code=401, detail="Invalid credentials")
|
|
|
|
@app.get('/users', tags=["User"])
|
|
async def get_users():
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT * FROM users')
|
|
rows = cursor.fetchall()
|
|
conn.close()
|
|
return rows
|
|
|
|
@app.get('/verify', tags=["User"])
|
|
async def verify_token_endpoint(request: Request):
|
|
token = request.cookies.get('auth_token')
|
|
|
|
if not token:
|
|
raise HTTPException(status_code=401, detail="No token provided")
|
|
|
|
try:
|
|
payload = jwt.decode(
|
|
token,
|
|
"95ad4fb1f2612c41ed299d5ca695945890c957fa",
|
|
algorithms=["HS256"]
|
|
)
|
|
return {"user": {"username": payload["username"]}}
|
|
except jwt.ExpiredSignatureError:
|
|
raise HTTPException(status_code=401, detail="Token expired")
|
|
except jwt.InvalidTokenError:
|
|
raise HTTPException(status_code=401, detail="Invalid token")
|
|
|
|
if __name__ == "__main__":
|
|
uvicorn.run("main:app", host="0.0.0.0", port=8004, reload=True) |