Files
grass_touching_captcha/main.py

457 lines
15 KiB
Python

from flask import Flask, redirect, url_for, render_template, request, Response, send_from_directory, g, flash
from dotenv import load_dotenv
from constants import RICKROLL_LINK, UPLOAD_DIR, MINIMUM_COSINE_SIMILARITY, MINIMUM_OCR_SIMILARITY, DATABASE_FILE, ACHIEVEMENTS
from PIL import Image
from jina import get_grass_touching_similarity
from ocr_check import generate_challenge, check_text_similarity
import os, flask_login, uuid, base64, sqlite3, bcrypt, secrets, hashlib, time, threading, html
if os.path.exists(".env"):
load_dotenv(".env")
app = Flask(__name__)
app.secret_key = os.environ["FLASK_SECRET_KEY"]
login_manager = flask_login.LoginManager()
login_manager.init_app(app)
global challenges
challenges = {}
os.makedirs("uploads", exist_ok=True)
def time_ago(seconds):
seconds = int(seconds)
if seconds < 60:
return f"{seconds} second{'s' if seconds != 1 else ''} ago"
elif seconds < 3600:
minutes = seconds // 60
return f"{minutes} minute{'s' if minutes != 1 else ''} ago"
elif seconds < 86400:
hours = seconds // 3600
return f"{hours} hour{'s' if hours != 1 else ''} ago"
else:
days = seconds // 86400
return f"{days} day{'s' if days != 1 else ''} ago"
app.jinja_env.filters['timeago'] = time_ago
def get_db():
db = getattr(g, '_database', None)
if db is None:
db = g._database = sqlite3.connect(DATABASE_FILE)
db.execute("""
CREATE TABLE IF NOT EXISTS Users (
username TEXT PRIMARY KEY,
last_grass_touch_time TEXT NOT NULL,
grass_touching_count INT NOT NULL,
banned BOOL NOT NULL,
password TEXT NOT NULL,
password_salt TEXT NOT NULL
)
""")
db.execute("""
CREATE TABLE IF NOT EXISTS Images (
image_hash TEXT PRIMARY KEY,
username TEXT NOT NULL,
filename TEXT NOT NULL
)
""")
db.commit()
return db
class User(flask_login.UserMixin):
pass
@login_manager.user_loader
def user_loader(user_id):
user = User()
user.id = user_id
return user
@login_manager.unauthorized_handler
def unauthorized_handler():
flash("Why are you trying to access content reserved for logged in users? Just go outside, touch some grass and register your own account.", "error")
return redirect(url_for("login"))
@app.before_request
def check_banned():
if not hasattr(flask_login.current_user, "id"):
return
username = flask_login.current_user.id
cur = get_db().cursor()
cur.execute("SELECT banned FROM Users WHERE username = ?", (username,))
row = cur.fetchone()
cur.close()
if row is None or row[0]:
flash("Imagine forgetting to touch grass so you get banned from my app. Such a discord moderator you are. You have no life. Just go outside.", "error")
flask_login.logout_user()
return redirect("/")
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, '_database', None)
if db is not None:
db.close()
def check_grass_touching_bans():
while True:
with app.app_context():
cur = get_db().cursor()
cur.execute("SELECT username, last_grass_touch_time FROM Users")
for user in cur.fetchall():
if time.time() - float(user[1]) >= (24 * 3600):
cur.execute("UPDATE users SET banned = ? WHERE username = ?", (True, user[0]))
get_db().commit()
cur.close()
time.sleep(60)
threading.Thread(target=check_grass_touching_bans, daemon=True).start()
def resize_image_file(path, max_side=256, fmt="JPEG"):
img = Image.open(path)
scale = max_side / max(img.size)
if scale < 1:
img = img.resize((int(img.width * scale), int(img.height * scale)), Image.LANCZOS)
img.save(path, format=fmt)
@app.route("/grass_touch_submit", methods=["POST"])
@flask_login.login_required
def submit_grass_touching():
username = flask_login.current_user.id
if not challenges.get(username):
return Response("Start and finish a challenge before submitting the grass touching.", 401)
if not challenges[username]["completed"]:
return Response("Finish a challenge before submitting the grass touching.", 401)
cur = get_db().cursor()
cur.execute("UPDATE Users SET grass_touching_count = grass_touching_count + 1 WHERE username = ?", (username,))
cur.execute("UPDATE Users SET last_grass_touch_time = ? WHERE username = ?", (time.time(), username))
get_db().commit()
cur.close()
return redirect("/")
@app.route("/submit_grasstouching")
@flask_login.login_required
def submit_grasstouching():
username = flask_login.current_user.id
return render_template("submit_grass_touching.jinja2", username=username)
@app.route("/generate_challenge", methods=["POST"])
def generate_challenge_route():
username = request.json["username"]
if not username in challenges:
challenges[username] = {"text": generate_challenge(username), "completed": False}
return challenges[username]["text"]
@app.route("/submit_challenge", methods=["POST"])
def submit_challenge():
try:
username, image_type, image_data = request.json["username"], request.json["image_type"], request.json["image_data"].encode("utf-8")
if image_type == "jpeg":
image_data = image_data[23:] # data:image/jpeg;base64,
else:
image_data = image_data[22:] # data:image/png;base64,
image_uuid = str(uuid.uuid4())
if image_type not in ["png", "jpeg"]:
return Response("Invalid file type.", 400)
if os.path.commonprefix((os.path.realpath(f"{UPLOAD_DIR}/{image_uuid}.{image_type}"), UPLOAD_DIR)) != UPLOAD_DIR:
return Response("Why are you trying path traversal :C", 400)
actual_image_data = base64.b64decode(image_data)
image_hash = hashlib.sha512(actual_image_data).hexdigest()
cur = get_db().cursor()
cur.execute("SELECT image_hash FROM Images WHERE image_hash = ?", (image_hash,))
if cur.fetchone():
return Response("You can touch grass multiple times. I believe in you. Dont submit the same images.", 400)
cur.execute("INSERT INTO Images (image_hash, username, filename) VALUES (?, ?, ?)", (image_hash, username, image_uuid))
get_db().commit()
with open(f"{UPLOAD_DIR}/{image_uuid}.{image_type}", "wb") as file:
file.write(actual_image_data)
resize_image_file(f"{UPLOAD_DIR}/{image_uuid}.{image_type}", fmt="JPEG" if image_type == "jpeg" else "png")
except:
import traceback; traceback.print_exc()
return Response("Unknown error", 400)
if not challenges.get(username):
return Response("You havent started a challenge yet.", 400)
detected_text, text_similarity = check_text_similarity(f"{UPLOAD_DIR}/{image_uuid}.{image_type}", challenges[username]["text"])
if not text_similarity >= MINIMUM_OCR_SIMILARITY:
return Response(f"The text is incorrect on the image. Similarity: {round(text_similarity * 100, 2)}% Detected Text: {detected_text}", 400)
grass_touching_similarity = get_grass_touching_similarity(request.url_root.rstrip('/').replace("http://", "https://") + url_for('uploads', filename=f"{image_uuid}.{image_type}"))
if not grass_touching_similarity >= MINIMUM_COSINE_SIMILARITY:
os.remove(f"{UPLOAD_DIR}/{image_uuid}.{image_type}")
return Response(f"Imagine not touching grass. Cosine similarity: {grass_touching_similarity}", 401)
challenges[username]['completed'] = True
return Response(f"/uploads/{image_uuid}.{image_type}", 200)
@app.route("/")
def application():
username = flask_login.current_user.id if hasattr(flask_login.current_user, "id") else ""
return render_template("home.jinja2", username=username)
@app.route("/leaderboard")
def leaderboard():
username = flask_login.current_user.id if hasattr(flask_login.current_user, "id") else ""
cur = get_db().cursor()
cur.execute("SELECT grass_touching_count, banned, username FROM USERS ORDER BY grass_touching_count DESC, username ASC LIMIT 25")
users = cur.fetchall()
if not users:
cur.close()
return Response("DB is not healthy.", 401)
cur.close()
return render_template("leaderboard.jinja2", users=users, current_username=username)
@app.route("/achievements")
@flask_login.login_required
def achievements():
username = flask_login.current_user.id
cur = get_db().cursor()
cur.execute("SELECT grass_touching_count FROM Users WHERE username = ?", (username,))
row = cur.fetchone()
cur.close()
if not row:
return Response("DB is not healthy.")
return render_template("achievements.jinja2", achievements=ACHIEVEMENTS, grass_touching_count=row[0])
@app.route("/profile")
@flask_login.login_required
def profile():
username = flask_login.current_user.id
cur = get_db().cursor()
cur.execute("SELECT grass_touching_count, last_grass_touch_time FROM Users WHERE username = ?", (username,))
row = cur.fetchone()
cur.close()
if not row:
return Response("DB is not healthy.")
grass_touching_count, last_grass_touch_time = row
return render_template("profile.jinja2", username=username, grass_touching_count=grass_touching_count, last_grass_touch_time=float(last_grass_touch_time), now=time.time(), achievements=ACHIEVEMENTS, your_account=True)
@app.route("/profile/<username>")
def public_profile(username):
cur = get_db().cursor()
cur.execute("SELECT grass_touching_count, last_grass_touch_time FROM Users WHERE username = ?", (username,))
row = cur.fetchone()
cur.close()
if not row:
return Response("DB is not healthy.")
grass_touching_count, last_grass_touch_time = row
return render_template("profile.jinja2", username=username, grass_touching_count=grass_touching_count, last_grass_touch_time=float(last_grass_touch_time), now=time.time(), achievements=ACHIEVEMENTS, your_account=False)
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "GET":
return render_template("login.jinja2")
elif request.method == "POST":
username, password = request.form.get("username"), request.form.get("password")
cur = get_db().cursor()
cur.execute("SELECT password, password_salt FROM Users WHERE username = ?", (username,))
row = cur.fetchone()
if not row:
cur.close()
return Response("Unauthorized", 401)
hashed_password, salt = row
if secrets.compare_digest(bcrypt.hashpw(password.encode(), salt.encode()), hashed_password.encode()):
cur.close()
user = User()
user.id = username
flask_login.login_user(user, remember=True)
return redirect(url_for("application"))
else:
cur.close()
return Response("Unathorized access. Just go outside, touch grass and make your own account...", 401)
@app.route("/register", methods=["GET", "POST"])
def register():
if request.method == "GET":
return render_template("register.jinja2")
elif request.method == "POST":
username, password = request.form.get("username"), request.form.get("password")
if username != html.escape(username, quote=True):
return "No XSS please"
username = html.escape(username, quote=True)
if not challenges.get(username):
return Response("Start and finish a challenge before registering.", 401)
if not challenges[username]["completed"]:
return Response("Finish a challenge before registering.", 401)
challenges.pop(username)
cur = get_db().cursor()
cur.execute("SELECT username FROM Users WHERE username = ?", (username,))
if cur.fetchone():
return Response("An account with this username already exists", 400)
salt = bcrypt.gensalt()
hashed_password = bcrypt.hashpw(password.encode(), salt)
cur.execute("INSERT INTO Users (username, password, password_salt, last_grass_touch_time, grass_touching_count, banned) VALUES (?, ?, ?, ?, ?, ?)", (username, hashed_password.decode(), salt.decode(), time.time(), 1, False))
get_db().commit()
cur.close()
return redirect(url_for("login"))
@app.route("/change_username", methods=["POST"])
@flask_login.login_required
def change_username():
username = flask_login.current_user.id
if request.form["new_username"] != html.escape(request.form["new_username"], quote=True):
return "No XSS please"
new_username = html.escape(request.form["new_username"], quote=True)
cur = get_db().cursor()
cur.execute("UPDATE Users SET username = ? WHERE username = ?", (new_username, username))
cur.execute("UPDATE Images SET username = ? WHERE username = ?", (new_username, username))
get_db().commit()
cur.close()
flask_login.logout_user()
return redirect(url_for("login"))
@app.route("/change_password", methods=["POST"])
@flask_login.login_required
def change_password():
username = flask_login.current_user.id
current_password, new_password, confirm_password = request.form["current_password"], request.form["new_password"], request.form["confirm_password"]
if not secrets.compare_digest(new_password, confirm_password):
return Response("Passwords do not match.", 400)
cur = get_db().cursor()
cur.execute("SELECT password, password_salt FROM Users WHERE username = ?", (username,))
row = cur.fetchone()
if not row:
return Response("DB is not healthy", 500)
hashed_password, salt = row
if not secrets.compare_digest(bcrypt.hashpw(current_password.encode(), salt.encode()), hashed_password.encode()):
return Response("Unathorized.", 401)
new_salt = bcrypt.gensalt()
hashed_password = bcrypt.hashpw(new_password.encode(), new_salt)
cur.execute("UPDATE Users SET password = ?, password_salt = ? WHERE username = ?", (hashed_password.decode(), new_salt.decode(), username))
get_db().commit()
cur.close()
flask_login.logout_user()
return redirect(url_for("login"))
@app.route("/delete_account", methods=["POST"])
@flask_login.login_required
def delete_data():
username = flask_login.current_user.id
cur = get_db().cursor()
cur.execute("DELETE FROM Users WHERE username = ?", (username,))
get_db().commit()
cur.close()
flask_login.logout_user()
return redirect(url_for("login"))
@app.route("/reset_data", methods=["POST"])
@flask_login.login_required
def reset_data():
username = flask_login.current_user.id
cur = get_db().cursor()
cur.execute("UPDATE Users SET last_grass_touch_time = ?, grass_touching_count = ? WHERE username = ?", (time.time(), 1, username))
get_db().commit()
cur.close()
return redirect("/")
@app.route("/uploads/<filename>")
def uploads(filename):
return send_from_directory("uploads", filename)
@app.route("/info")
def info():
return redirect(RICKROLL_LINK)
@app.route('/logout')
def logout():
flask_login.logout_user()
return redirect("/")
app.run(port=os.environ.get("PORT"), host=os.environ.get("HOST", "0.0.0.0"))