mirror of
https://github.com/csd4ni3l/grass_touching_captcha.git
synced 2025-11-05 04:58:09 +01:00
457 lines
15 KiB
Python
457 lines
15 KiB
Python
from flask import Flask, redirect, url_for, render_template, request, Response, send_from_directory, g, flash
|
|
from dotenv import load_dotenv
|
|
from constants import RICKROLL_LINK, UPLOAD_DIR, MINIMUM_COSINE_SIMILARITY, MINIMUM_OCR_SIMILARITY, DATABASE_FILE, ACHIEVEMENTS
|
|
from PIL import Image
|
|
|
|
from jina import get_grass_touching_similarity
|
|
from ocr_check import generate_challenge, check_text_similarity
|
|
|
|
import os, flask_login, uuid, base64, sqlite3, bcrypt, secrets, hashlib, time, threading, html
|
|
|
|
if os.path.exists(".env"):
|
|
load_dotenv(".env")
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = os.environ["FLASK_SECRET_KEY"]
|
|
|
|
login_manager = flask_login.LoginManager()
|
|
login_manager.init_app(app)
|
|
|
|
global challenges
|
|
challenges = {}
|
|
|
|
os.makedirs("uploads", exist_ok=True)
|
|
|
|
def time_ago(seconds):
|
|
seconds = int(seconds)
|
|
if seconds < 60:
|
|
return f"{seconds} second{'s' if seconds != 1 else ''} ago"
|
|
elif seconds < 3600:
|
|
minutes = seconds // 60
|
|
return f"{minutes} minute{'s' if minutes != 1 else ''} ago"
|
|
elif seconds < 86400:
|
|
hours = seconds // 3600
|
|
return f"{hours} hour{'s' if hours != 1 else ''} ago"
|
|
else:
|
|
days = seconds // 86400
|
|
return f"{days} day{'s' if days != 1 else ''} ago"
|
|
|
|
app.jinja_env.filters['timeago'] = time_ago
|
|
|
|
def get_db():
|
|
db = getattr(g, '_database', None)
|
|
if db is None:
|
|
db = g._database = sqlite3.connect(DATABASE_FILE)
|
|
db.execute("""
|
|
CREATE TABLE IF NOT EXISTS Users (
|
|
username TEXT PRIMARY KEY,
|
|
last_grass_touch_time TEXT NOT NULL,
|
|
grass_touching_count INT NOT NULL,
|
|
banned BOOL NOT NULL,
|
|
password TEXT NOT NULL,
|
|
password_salt TEXT NOT NULL
|
|
)
|
|
""")
|
|
db.execute("""
|
|
CREATE TABLE IF NOT EXISTS Images (
|
|
image_hash TEXT PRIMARY KEY,
|
|
username TEXT NOT NULL,
|
|
filename TEXT NOT NULL
|
|
)
|
|
""")
|
|
db.commit()
|
|
return db
|
|
|
|
class User(flask_login.UserMixin):
|
|
pass
|
|
|
|
@login_manager.user_loader
|
|
def user_loader(user_id):
|
|
user = User()
|
|
user.id = user_id
|
|
return user
|
|
|
|
@login_manager.unauthorized_handler
|
|
def unauthorized_handler():
|
|
flash("Why are you trying to access content reserved for logged in users? Just go outside, touch some grass and register your own account.", "error")
|
|
return redirect(url_for("login"))
|
|
|
|
@app.before_request
|
|
def check_banned():
|
|
if not hasattr(flask_login.current_user, "id"):
|
|
return
|
|
|
|
username = flask_login.current_user.id
|
|
|
|
cur = get_db().cursor()
|
|
cur.execute("SELECT banned FROM Users WHERE username = ?", (username,))
|
|
row = cur.fetchone()
|
|
cur.close()
|
|
|
|
if row is None or row[0]:
|
|
flash("Imagine forgetting to touch grass so you get banned from my app. Such a discord moderator you are. You have no life. Just go outside.", "error")
|
|
flask_login.logout_user()
|
|
return redirect("/")
|
|
|
|
@app.teardown_appcontext
|
|
def close_connection(exception):
|
|
db = getattr(g, '_database', None)
|
|
if db is not None:
|
|
db.close()
|
|
|
|
def check_grass_touching_bans():
|
|
while True:
|
|
with app.app_context():
|
|
cur = get_db().cursor()
|
|
|
|
cur.execute("SELECT username, last_grass_touch_time FROM Users")
|
|
for user in cur.fetchall():
|
|
if time.time() - float(user[1]) >= (24 * 3600):
|
|
cur.execute("UPDATE users SET banned = ? WHERE username = ?", (True, user[0]))
|
|
|
|
get_db().commit()
|
|
cur.close()
|
|
|
|
time.sleep(60)
|
|
|
|
threading.Thread(target=check_grass_touching_bans, daemon=True).start()
|
|
|
|
def resize_image_file(path, max_side=256, fmt="JPEG"):
|
|
img = Image.open(path)
|
|
scale = max_side / max(img.size)
|
|
if scale < 1:
|
|
img = img.resize((int(img.width * scale), int(img.height * scale)), Image.LANCZOS)
|
|
img.save(path, format=fmt)
|
|
|
|
@app.route("/grass_touch_submit", methods=["POST"])
|
|
@flask_login.login_required
|
|
def submit_grass_touching():
|
|
username = flask_login.current_user.id
|
|
|
|
if not challenges.get(username):
|
|
return Response("Start and finish a challenge before submitting the grass touching.", 401)
|
|
|
|
if not challenges[username]["completed"]:
|
|
return Response("Finish a challenge before submitting the grass touching.", 401)
|
|
|
|
cur = get_db().cursor()
|
|
|
|
cur.execute("UPDATE Users SET grass_touching_count = grass_touching_count + 1 WHERE username = ?", (username,))
|
|
cur.execute("UPDATE Users SET last_grass_touch_time = ? WHERE username = ?", (time.time(), username))
|
|
|
|
get_db().commit()
|
|
|
|
cur.close()
|
|
|
|
return redirect("/")
|
|
|
|
@app.route("/submit_grasstouching")
|
|
@flask_login.login_required
|
|
def submit_grasstouching():
|
|
username = flask_login.current_user.id
|
|
return render_template("submit_grass_touching.jinja2", username=username)
|
|
|
|
@app.route("/generate_challenge", methods=["POST"])
|
|
def generate_challenge_route():
|
|
username = request.json["username"]
|
|
|
|
if not username in challenges:
|
|
challenges[username] = {"text": generate_challenge(username), "completed": False}
|
|
|
|
return challenges[username]["text"]
|
|
|
|
@app.route("/submit_challenge", methods=["POST"])
|
|
def submit_challenge():
|
|
try:
|
|
username, image_type, image_data = request.json["username"], request.json["image_type"], request.json["image_data"].encode("utf-8")
|
|
|
|
if image_type == "jpeg":
|
|
image_data = image_data[23:] # data:image/jpeg;base64,
|
|
else:
|
|
image_data = image_data[22:] # data:image/png;base64,
|
|
|
|
image_uuid = str(uuid.uuid4())
|
|
|
|
if image_type not in ["png", "jpeg"]:
|
|
return Response("Invalid file type.", 400)
|
|
|
|
if os.path.commonprefix((os.path.realpath(f"{UPLOAD_DIR}/{image_uuid}.{image_type}"), UPLOAD_DIR)) != UPLOAD_DIR:
|
|
return Response("Why are you trying path traversal :C", 400)
|
|
|
|
actual_image_data = base64.b64decode(image_data)
|
|
image_hash = hashlib.sha512(actual_image_data).hexdigest()
|
|
cur = get_db().cursor()
|
|
cur.execute("SELECT image_hash FROM Images WHERE image_hash = ?", (image_hash,))
|
|
if cur.fetchone():
|
|
return Response("You can touch grass multiple times. I believe in you. Dont submit the same images.", 400)
|
|
|
|
cur.execute("INSERT INTO Images (image_hash, username, filename) VALUES (?, ?, ?)", (image_hash, username, image_uuid))
|
|
get_db().commit()
|
|
|
|
with open(f"{UPLOAD_DIR}/{image_uuid}.{image_type}", "wb") as file:
|
|
file.write(actual_image_data)
|
|
|
|
resize_image_file(f"{UPLOAD_DIR}/{image_uuid}.{image_type}", fmt="JPEG" if image_type == "jpeg" else "png")
|
|
|
|
except:
|
|
import traceback; traceback.print_exc()
|
|
return Response("Unknown error", 400)
|
|
|
|
if not challenges.get(username):
|
|
return Response("You havent started a challenge yet.", 400)
|
|
|
|
detected_text, text_similarity = check_text_similarity(f"{UPLOAD_DIR}/{image_uuid}.{image_type}", challenges[username]["text"])
|
|
|
|
if not text_similarity >= MINIMUM_OCR_SIMILARITY:
|
|
return Response(f"The text is incorrect on the image. Similarity: {round(text_similarity * 100, 2)}% Detected Text: {detected_text}", 400)
|
|
|
|
grass_touching_similarity = get_grass_touching_similarity(request.url_root.rstrip('/').replace("http://", "https://") + url_for('uploads', filename=f"{image_uuid}.{image_type}"))
|
|
if not grass_touching_similarity >= MINIMUM_COSINE_SIMILARITY:
|
|
os.remove(f"{UPLOAD_DIR}/{image_uuid}.{image_type}")
|
|
return Response(f"Imagine not touching grass. Cosine similarity: {grass_touching_similarity}", 401)
|
|
|
|
challenges[username]['completed'] = True
|
|
|
|
return Response(f"/uploads/{image_uuid}.{image_type}", 200)
|
|
|
|
@app.route("/")
|
|
def application():
|
|
username = flask_login.current_user.id if hasattr(flask_login.current_user, "id") else ""
|
|
|
|
return render_template("home.jinja2", username=username)
|
|
|
|
@app.route("/leaderboard")
|
|
def leaderboard():
|
|
username = flask_login.current_user.id if hasattr(flask_login.current_user, "id") else ""
|
|
|
|
cur = get_db().cursor()
|
|
|
|
cur.execute("SELECT grass_touching_count, banned, username FROM USERS ORDER BY grass_touching_count DESC, username ASC LIMIT 25")
|
|
|
|
users = cur.fetchall()
|
|
if not users:
|
|
cur.close()
|
|
return Response("DB is not healthy.", 401)
|
|
|
|
cur.close()
|
|
|
|
return render_template("leaderboard.jinja2", users=users, current_username=username)
|
|
|
|
@app.route("/achievements")
|
|
@flask_login.login_required
|
|
def achievements():
|
|
username = flask_login.current_user.id
|
|
|
|
cur = get_db().cursor()
|
|
cur.execute("SELECT grass_touching_count FROM Users WHERE username = ?", (username,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
cur.close()
|
|
|
|
if not row:
|
|
return Response("DB is not healthy.")
|
|
|
|
return render_template("achievements.jinja2", achievements=ACHIEVEMENTS, grass_touching_count=row[0])
|
|
|
|
@app.route("/profile")
|
|
@flask_login.login_required
|
|
def profile():
|
|
username = flask_login.current_user.id
|
|
|
|
cur = get_db().cursor()
|
|
|
|
cur.execute("SELECT grass_touching_count, last_grass_touch_time FROM Users WHERE username = ?", (username,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
cur.close()
|
|
|
|
if not row:
|
|
return Response("DB is not healthy.")
|
|
|
|
grass_touching_count, last_grass_touch_time = row
|
|
|
|
return render_template("profile.jinja2", username=username, grass_touching_count=grass_touching_count, last_grass_touch_time=float(last_grass_touch_time), now=time.time(), achievements=ACHIEVEMENTS, your_account=True)
|
|
|
|
@app.route("/profile/<username>")
|
|
def public_profile(username):
|
|
cur = get_db().cursor()
|
|
|
|
cur.execute("SELECT grass_touching_count, last_grass_touch_time FROM Users WHERE username = ?", (username,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
cur.close()
|
|
|
|
if not row:
|
|
return Response("DB is not healthy.")
|
|
|
|
grass_touching_count, last_grass_touch_time = row
|
|
|
|
return render_template("profile.jinja2", username=username, grass_touching_count=grass_touching_count, last_grass_touch_time=float(last_grass_touch_time), now=time.time(), achievements=ACHIEVEMENTS, your_account=False)
|
|
|
|
@app.route("/login", methods=["GET", "POST"])
|
|
def login():
|
|
if request.method == "GET":
|
|
return render_template("login.jinja2")
|
|
elif request.method == "POST":
|
|
username, password = request.form.get("username"), request.form.get("password")
|
|
|
|
cur = get_db().cursor()
|
|
|
|
cur.execute("SELECT password, password_salt FROM Users WHERE username = ?", (username,))
|
|
|
|
row = cur.fetchone()
|
|
if not row:
|
|
cur.close()
|
|
return Response("Unauthorized", 401)
|
|
|
|
hashed_password, salt = row
|
|
|
|
if secrets.compare_digest(bcrypt.hashpw(password.encode(), salt.encode()), hashed_password.encode()):
|
|
cur.close()
|
|
|
|
user = User()
|
|
user.id = username
|
|
flask_login.login_user(user, remember=True)
|
|
|
|
return redirect(url_for("application"))
|
|
else:
|
|
cur.close()
|
|
return Response("Unathorized access. Just go outside, touch grass and make your own account...", 401)
|
|
|
|
@app.route("/register", methods=["GET", "POST"])
|
|
def register():
|
|
if request.method == "GET":
|
|
return render_template("register.jinja2")
|
|
elif request.method == "POST":
|
|
username, password = request.form.get("username"), request.form.get("password")
|
|
|
|
if username != html.escape(username, quote=True):
|
|
return "No XSS please"
|
|
|
|
username = html.escape(username, quote=True)
|
|
|
|
if not challenges.get(username):
|
|
return Response("Start and finish a challenge before registering.", 401)
|
|
|
|
if not challenges[username]["completed"]:
|
|
return Response("Finish a challenge before registering.", 401)
|
|
|
|
challenges.pop(username)
|
|
|
|
cur = get_db().cursor()
|
|
|
|
cur.execute("SELECT username FROM Users WHERE username = ?", (username,))
|
|
|
|
if cur.fetchone():
|
|
return Response("An account with this username already exists", 400)
|
|
|
|
salt = bcrypt.gensalt()
|
|
hashed_password = bcrypt.hashpw(password.encode(), salt)
|
|
|
|
cur.execute("INSERT INTO Users (username, password, password_salt, last_grass_touch_time, grass_touching_count, banned) VALUES (?, ?, ?, ?, ?, ?)", (username, hashed_password.decode(), salt.decode(), time.time(), 1, False))
|
|
get_db().commit()
|
|
cur.close()
|
|
|
|
return redirect(url_for("login"))
|
|
|
|
@app.route("/change_username", methods=["POST"])
|
|
@flask_login.login_required
|
|
def change_username():
|
|
username = flask_login.current_user.id
|
|
|
|
if request.form["new_username"] != html.escape(request.form["new_username"], quote=True):
|
|
return "No XSS please"
|
|
|
|
new_username = html.escape(request.form["new_username"], quote=True)
|
|
|
|
cur = get_db().cursor()
|
|
|
|
cur.execute("UPDATE Users SET username = ? WHERE username = ?", (new_username, username))
|
|
cur.execute("UPDATE Images SET username = ? WHERE username = ?", (new_username, username))
|
|
|
|
get_db().commit()
|
|
cur.close()
|
|
|
|
flask_login.logout_user()
|
|
return redirect(url_for("login"))
|
|
|
|
@app.route("/change_password", methods=["POST"])
|
|
@flask_login.login_required
|
|
def change_password():
|
|
username = flask_login.current_user.id
|
|
current_password, new_password, confirm_password = request.form["current_password"], request.form["new_password"], request.form["confirm_password"]
|
|
|
|
if not secrets.compare_digest(new_password, confirm_password):
|
|
return Response("Passwords do not match.", 400)
|
|
|
|
cur = get_db().cursor()
|
|
|
|
cur.execute("SELECT password, password_salt FROM Users WHERE username = ?", (username,))
|
|
|
|
row = cur.fetchone()
|
|
|
|
if not row:
|
|
return Response("DB is not healthy", 500)
|
|
|
|
hashed_password, salt = row
|
|
|
|
if not secrets.compare_digest(bcrypt.hashpw(current_password.encode(), salt.encode()), hashed_password.encode()):
|
|
return Response("Unathorized.", 401)
|
|
|
|
new_salt = bcrypt.gensalt()
|
|
hashed_password = bcrypt.hashpw(new_password.encode(), new_salt)
|
|
|
|
cur.execute("UPDATE Users SET password = ?, password_salt = ? WHERE username = ?", (hashed_password.decode(), new_salt.decode(), username))
|
|
|
|
get_db().commit()
|
|
cur.close()
|
|
|
|
flask_login.logout_user()
|
|
return redirect(url_for("login"))
|
|
|
|
@app.route("/delete_account", methods=["POST"])
|
|
@flask_login.login_required
|
|
def delete_data():
|
|
username = flask_login.current_user.id
|
|
|
|
cur = get_db().cursor()
|
|
|
|
cur.execute("DELETE FROM Users WHERE username = ?", (username,))
|
|
|
|
get_db().commit()
|
|
cur.close()
|
|
|
|
flask_login.logout_user()
|
|
return redirect(url_for("login"))
|
|
|
|
@app.route("/reset_data", methods=["POST"])
|
|
@flask_login.login_required
|
|
def reset_data():
|
|
username = flask_login.current_user.id
|
|
|
|
cur = get_db().cursor()
|
|
|
|
cur.execute("UPDATE Users SET last_grass_touch_time = ?, grass_touching_count = ? WHERE username = ?", (time.time(), 1, username))
|
|
|
|
get_db().commit()
|
|
cur.close()
|
|
|
|
return redirect("/")
|
|
|
|
@app.route("/uploads/<filename>")
|
|
def uploads(filename):
|
|
return send_from_directory("uploads", filename)
|
|
|
|
@app.route("/info")
|
|
def info():
|
|
return redirect(RICKROLL_LINK)
|
|
|
|
@app.route('/logout')
|
|
def logout():
|
|
flask_login.logout_user()
|
|
return redirect("/")
|
|
|
|
app.run(port=os.environ.get("PORT"), host=os.environ.get("HOST", "0.0.0.0")) |