Files
gptchat/main.py
2026-04-01 17:22:06 +10:00

564 lines
17 KiB
Python

import re
import argparse
import json
import os
import random
import string
import sqlite3
from uuid import uuid4
import threading
# GPT Library
import g4f
from g4f.api import run_api
# Server
from flask import Flask, redirect, render_template
from flask import request
import getpass
from werkzeug.security import generate_password_hash, check_password_hash
from werkzeug.utils import secure_filename
app = Flask(__name__)
UPLOAD_FOLDER = 'data/'
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = 16 * 1000 * 1000 # 16 MB
# Settings file path
SETTINGS_FILE = "./data/settings.db"
PROXIES_FILE = "./data/proxies.json"
# Available providers
PROVIDERS = {
"Auto": "",
# "Acytoo": g4f.Provider.Acytoo,
# "Aichat": g4f.Provider.Aichat,
# "Ails": g4f.Provider.Ails,
# "BlackBox": g4f.Provider.Blackbox,
# "Chatgpt4o": g4f.Provider.Chatgpt4o,
# "ChatGpt": g4f.Provider.ChatGpt,
# "ChatGptt" : g4f.Provider.ChatGptt,
# "DeepInfraChat": g4f.Provider.DeepInfraChat,
# "Glider": g4f.Provider.Glider,
# "H2o": g4f.Provider.H2o,
# "HuggingChat": g4f.Provider.HuggingChat,
# "Opchatgpts": g4f.Provider.Opchatgpts,
# "OpenAssistant": g4f.Provider.OpenAssistant,
# "OpenaiChat": g4f.Provider.OpenaiChat,
# "Raycast": g4f.Provider.Raycast,
# "Theb": g4f.Provider.Theb,
# "Wewordle": g4f.Provider.Wewordle,
# "You": g4f.Provider.You,
# "Yqcloud": g4f.Provider.Yqcloud,
# "Pizzagpt": g4f.Provider.Pizzagpt,
# "HuggingChat": g4f.Provider.HuggingChat,
# "HuggingFace": g4f.Provider.HuggingFace
}
GENERIC_MODELS = ["gpt-3.5-turbo", "gpt-4", "gpt-4o","gpt-4o-mini"]
print(
"""
FreeGPT4 Web API - A Web API for GPT-4
Repo: github.com/aledipa/FreeGPT4-WEB-API
By: Alessandro Di Pasquale
GPT4Free Credits: github.com/xtekky/gpt4free
""",
)
parser = argparse.ArgumentParser()
parser.add_argument(
"--remove-sources",
action='store_true',
required=False,
help="Remove the sources from the response",
)
parser.add_argument(
"--enable-gui",
action='store_true',
required=False,
help="Use a graphical interface for settings",
)
parser.add_argument(
"--private-mode",
action='store_true',
required=False,
help="Use a private token to access the API",
)
parser.add_argument(
"--enable-proxies",
action='store_true',
required=False,
help="Use one or more proxies to avoid being blocked or banned",
)
parser.add_argument(
"--enable-history",
action='store_true',
required=False,
help="Enable the history of the messages",
)
parser.add_argument(
"--password",
action='store',
required=False,
help="Set or change the password for the settings page [mandatory in docker envirtonment]",
)
parser.add_argument(
"--cookie-file",
action='store',
required=False,
type=str,
help="Use a cookie file",
)
parser.add_argument(
"--file-input",
action='store_true',
required=False,
help="Add the file as input support",
)
parser.add_argument(
"--port",
action='store',
required=False,
type=str,
help="Change the port (default: 5500)",
)
parser.add_argument(
"--model",
action='store',
required=False,
type=str,
help="Change the model (default: gpt_4)",
)
parser.add_argument(
"--provider",
action='store',
required=False,
type=str,
help="Change the provider (default: Bing)",
)
parser.add_argument(
"--keyword",
action='store',
required=False,
type=str,
help="Add the keyword support",
)
parser.add_argument(
"--system-prompt",
action='store',
required=False,
type=str,
help="Use a system prompt to 'customize' the answers",
)
parser.add_argument(
"--enable-fast-api",
action='store_true',
required=False,
help="Use the fast API standard (PORT 1337 - compatible with OpenAI integrations)",
)
args, unknown = parser.parse_known_args()
# Get the absolute path of the script
script_path = os.path.abspath(__file__)
# Get the directory of the script
script_dir = os.path.dirname(script_path)
# Change the current working directory
os.chdir(script_dir)
# Loads the proxies from the file
if (args.enable_proxies and os.path.exists(PROXIES_FILE)):
proxies = json.load(open(PROXIES_FILE))
else:
proxies = None
# Crates the Fast API Thread
t = threading.Thread(target=run_api, name="fastapi")
# Loads the settings from the file
def load_settings(file=SETTINGS_FILE):
conn = sqlite3.connect(file)
c = conn.cursor()
c.execute("SELECT * FROM settings")
data = c.fetchone()
conn.close()
# Converts the data to a dictionary
data = {
"keyword": data[1],
"file_input": data[2],
"port": data[3],
"provider": data[4],
"model": data[5],
"cookie_file": data[6],
"token": data[7],
"remove_sources": data[8],
"system_prompt": data[9],
"message_history": data[10],
"proxies": data[11],
"password": data[12],
"fast_api": data[13]
}
print(data)
return data
def start_fast_api():
print("[!] FAST API PORT: 1336")
t.daemon = True
t.start()
# Updates the settings
data = load_settings()
if (args.keyword == None):
args.keyword = data["keyword"]
if (args.file_input == False):
args.file_input = data["file_input"]
if (args.port == None):
args.port = data["port"]
if (args.provider == None):
args.provider = data["provider"]
if (args.model == None):
args.model = data["model"]
if (args.cookie_file == None):
args.cookie_file = data["cookie_file"]
if (args.private_mode and data["token"] == ""):
token = str(uuid4())
data["token"] = token
elif (data["token"] != ""):
token = data["token"]
args.private_mode = True
if (args.remove_sources == False):
args.remove_sources = data["remove_sources"]
if (args.system_prompt == None):
args.system_prompt = data["system_prompt"]
if (args.enable_history == False):
args.enable_history = data["message_history"]
if (args.enable_proxies == False):
args.enable_proxies = data["proxies"]
if (args.enable_fast_api or data["fast_api"]):
start_fast_api()
# Initializes the message history
message_history = [{"role": "system", "content": args.system_prompt}]
if (args.enable_gui):
# Asks for password to set to lock the settings page
# Checks if settings.db contains a password
try:
set_password = True
if (args.password != None):
password = args.password
confirm_password = password
else:
if (data["password"] == ""):
password = getpass.getpass("Settings page password:\n > ")
confirm_password = getpass.getpass("Confirm password:\n > ")
else:
set_password = False
if (set_password):
if(password == "" or confirm_password == ""):
print("[X] Password cannot be empty")
exit()
if ((password != confirm_password) and (data["password"] == "")):
print("[X] Passwords don't match")
exit()
else:
password = generate_password_hash(password)
confirm_password = generate_password_hash(confirm_password)
print("[V] Password set.")
try:
conn = sqlite3.connect(SETTINGS_FILE)
c = conn.cursor()
c.execute("UPDATE settings SET password = ?", (password,))
conn.commit()
conn.close()
print("[V] Password saved.")
except Exception as error:
print("[X] Error:", error)
exit()
except Exception as error:
print("[X] Error:", error)
exit()
else:
print("[!] GUI disabled")
# Saves the settings to the file
def save_settings(request, file):
conn = sqlite3.connect(file)
c = conn.cursor()
c.execute("UPDATE settings SET file_input = ?", (bool(request.form["file_input"] == "true"),))
c.execute("UPDATE settings SET remove_sources = ?", (bool(request.form["remove_sources"] == "true"),))
c.execute("UPDATE settings SET port = ?", (request.form["port"],))
c.execute("UPDATE settings SET model = ?", (request.form["model"],))
c.execute("UPDATE settings SET keyword = ?", (request.form["keyword"],))
c.execute("UPDATE settings SET provider = ?", (request.form["provider"],))
c.execute("UPDATE settings SET system_prompt = ?", (request.form["system_prompt"],))
c.execute("UPDATE settings SET message_history = ?", (bool(request.form["message_history"] == "true"),))
c.execute("UPDATE settings SET proxies = ?", (bool(request.form["proxies"] == "true"),))
c.execute("UPDATE settings SET fast_api = ?", (bool(request.form["fast_api"] == "true"),))
if (len(request.form["new_password"]) > 0):
c.execute("UPDATE settings SET password = ?", (generate_password_hash(request.form["new_password"]),))
file = request.files["cookie_file"]
if (bool(request.form["private_mode"] == "true")):
c.execute("UPDATE settings SET token = ?", (request.form["token"],))
args.private_mode = True
else:
c.execute("UPDATE settings SET token = ''")
args.private_mode = False
#checks if the file is not empty
if file.filename != '':
#checks if the file is a json file
if file.filename.endswith('.json'):
#saves the file
filename = secure_filename(file.filename)
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
#updates the cookie_file
c.execute("UPDATE settings SET cookie_file = ?", (os.path.join(app.config['UPLOAD_FOLDER'], filename),))
else:
print("The file is not a json file")
if (args.enable_proxies or bool(request.form["proxies"] == "true")):
# Extracts the proxies from the form
proxies = []
i = 1
while True:
if (("proxy_" + str(i)) in request.form):
proxy = request.form["proxy_" + str(i)]
if (proxy != ""):
# Checks if the proxy syntax is correct
if (proxy.count(":") == 3 and proxy.count("@") == 1):
proxy = {
"protocol": proxy.split("://")[0],
"username": proxy.split("://")[1].split(":")[0],
"password": proxy.split("://")[1].split(":")[1].split("@")[0],
"ip": proxy.split("://")[1].split(":")[1].split("@")[1],
"port": proxy.split("://")[1].split(":")[2]
}
proxies.append(proxy)
i += 1
else:
break
# Saves the proxies to the file proxies.json
with open(PROXIES_FILE, "w") as pf:
json.dump(proxies, pf)
pf.close()
if (bool(request.form["fast_api"] == "true") and not args.enable_fast_api):
start_fast_api()
conn.commit()
conn.close()
return
# Loads the settings from the file and updates the args
def apply_settings(file):
data = load_settings(file)
args.keyword = data["keyword"]
args.file_input = data["file_input"]
args.port = data["port"]
args.provider = data["provider"]
args.model = data["model"]
args.cookie_file = data["cookie_file"]
args.token = data["token"]
args.remove_sources = data["remove_sources"]
args.system_prompt = data["system_prompt"]
args.enable_history = data["message_history"]
args.enable_proxies = data["proxies"]
args.password = data["password"]
args.enable_fast_api = data["fast_api"]
return
@app.route("/", methods=["GET", "POST"])
async def index() -> str:
"""
Main function
"""
# Starts the bot and gets the input
print("Initializing...")
question = None
print("start")
if request.method == "GET":
question = request.args.get(args.keyword) #text
print(args.private_mode)
if (args.private_mode and request.args.get("token") != data["token"]):
return "<p id='response'>Invalid token</p>"
print("get")
else:
file = request.files["file"]
text = file.read().decode("utf-8")
question = text
print("Post reading the file", question)
print("ici")
if question is None:
return "<p id='response'>Please enter a question</p>"
# Gets the response from the bot
# print(PROVIDERS[args.provider].params) # supported args
print("\nCookies: " + str(len(args.cookie_file)))
print("\nInput: " + question)
if (len(args.cookie_file) != 0):
try:
cookies = json.load(open(args.cookie_file)) # Loads the cookies from the file
print("COOKIES: "+str(cookies))
if (len(cookies) == 0):
cookies = {"a": "b"} # Dummy cookies
except Exception as error:
print("[X] Error:", error)
exit()
else:
cookies = {"a": "b"} # Dummy cookies
if (args.enable_history):
print("History enabled")
message_history.append({"role": "user", "content": question})
else:
print("History disabled")
message_history.clear()
message_history.append({"role": "system", "content": args.system_prompt})
message_history.append({"role": "user", "content": question})
proxy = None
if (args.enable_proxies):
# Extracts a proxy from the list
proxy = random.choice(proxies)
# Formats the proxy like https://user:password@host:port
proxy = f"{proxy['protocol']}://{proxy['username']}:{proxy['password']}@{proxy['ip']}:{proxy['port']}"
print("Proxy: " + proxy)
if (args.provider == "Auto"):
response = (
await g4f.ChatCompletion.create_async(
model=args.model,
messages=message_history,
cookies=cookies,
proxy=proxy
)
)
else:
response = (
await g4f.ChatCompletion.create_async(
model=args.model,
provider=args.provider,
messages=message_history,
cookies=cookies,
proxy=proxy
)
)
print(response)
#Joins the response into a single string
resp_str = ""
for message in response:
resp_str += message
# Cleans the response from the resources links
if (args.remove_sources):
if re.search(r"\[\^[0-9]+\^\]\[[0-9]+\]", resp_str):
resp_str = resp_str.split("\n\n")
if len(resp_str) > 1:
resp_str.pop(0)
resp_str = re.sub(r"\[\^[0-9]+\^\]\[[0-9]+\]", "", str(resp_str[0]))
# Returns the response
return resp_str
# return "<p id='response'>" + resp + "</p>" # Uncomment if preferred
def auth():
# Reads the password from the data file
data = load_settings()
# Checks if the password is set
if (data["password"] != ""):
if (request.method == "POST"):
password = request.form["password"]
if (check_password_hash(data["password"], password)):
return True
else:
return False
else:
return False
else:
return True
@app.route("/login", methods=["GET", "POST"])
async def login():
if (args.enable_gui):
return render_template("login.html", **locals())
else:
return "The GUI is disabled. In order to enable it, use the --enable-gui argument."
@app.route("/settings", methods=["GET", "POST"])
async def settings():
if (request.method == "GET"):
return redirect("/login", code=302)
if (auth()):
try:
providers=PROVIDERS
generic_models=GENERIC_MODELS
data = load_settings()
proxies = json.loads(open(PROXIES_FILE).read())
return render_template("settings.html", **locals())
except Exception as error:
print("[X] Error:", error)
return "Error: " + str(error)
else:
return render_template("login.html", **locals())
@app.route("/save", methods=["POST"])
async def save():
if (auth()):
try:
if (request.method == "POST"):
save_settings(request, SETTINGS_FILE)
apply_settings(SETTINGS_FILE)
return "New settings saved and applied successfully!"
else:
return render_template("login.html", **locals())
except Exception as error:
print("[X] Error:", error)
return "Error: " + str(error)
else:
return render_template("login.html", **locals())
@app.route("/models", methods=["GET"])
async def get_models():
provider = request.args.get("provider")
if (provider == "Auto"):
return GENERIC_MODELS
try:
return PROVIDERS[provider].models
except:
return ["default"]
@app.route("/generatetoken", methods=["GET", "POST"])
async def get_token():
return str(uuid4())
# if __name__ == "__main__":
# app.run("0.0.0.0", port=args.port, debug=False)