From f1a78f83b7aabcbf7f3bb9997bd6faffa0963350 Mon Sep 17 00:00:00 2001 From: spitkov Date: Thu, 19 Sep 2024 22:34:49 +0200 Subject: [PATCH] Upload files to "/" --- app.py | 417 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 417 insertions(+) create mode 100644 app.py diff --git a/app.py b/app.py new file mode 100644 index 0000000..8271937 --- /dev/null +++ b/app.py @@ -0,0 +1,417 @@ +from flask import Flask, render_template, request, redirect, url_for, flash, jsonify, abort +from flask_sqlalchemy import SQLAlchemy +from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user +from werkzeug.security import generate_password_hash, check_password_hash +from werkzeug.utils import secure_filename +import os +from datetime import datetime +from flask import request, session +import uuid +from PIL import Image +import io +from flask import jsonify +import base64 +from functools import wraps +import os +from flask import send_file +import json +import markdown +import bleach + +app = Flask(__name__) +app.config['SECRET_KEY'] = 'your_secret_key' +app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///techtalks.db' +app.config['UPLOAD_FOLDER'] = 'static/uploads' +db = SQLAlchemy(app) +login_manager = LoginManager(app) +login_manager.login_view = 'login' + +# Model definitions +class User(UserMixin, db.Model): + id = db.Column(db.Integer, primary_key=True) + username = db.Column(db.String(80), unique=True, nullable=False) + password_hash = db.Column(db.String(120), nullable=False) + bio = db.Column(db.Text) + profile_picture = db.Column(db.String(200)) + posts = db.relationship('Post', backref='author', lazy=True) + comments = db.relationship('Comment', backref='author', lazy=True) + +class Post(db.Model): + id = db.Column(db.Integer, primary_key=True) + content = db.Column(db.Text, nullable=False) + image_url = db.Column(db.String(200)) + timestamp = db.Column(db.DateTime, default=datetime.utcnow) + user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True) + anonymous_username = db.Column(db.String(80)) + likes = db.relationship('Like', backref='post', lazy=True, cascade='all, delete-orphan') + comments = db.relationship('Comment', backref='post', lazy=True, cascade='all, delete-orphan') + +class Like(db.Model): + id = db.Column(db.Integer, primary_key=True) + user_id = db.Column(db.String(120), nullable=False) + post_id = db.Column(db.Integer, db.ForeignKey('post.id'), nullable=False) + +class Comment(db.Model): + id = db.Column(db.Integer, primary_key=True) + content = db.Column(db.Text, nullable=False) + timestamp = db.Column(db.DateTime, default=datetime.utcnow) + user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True) + post_id = db.Column(db.Integer, db.ForeignKey('post.id', ondelete='CASCADE'), nullable=False) + likes = db.relationship('CommentLike', backref='comment', lazy=True, cascade='all, delete-orphan') + +class CommentLike(db.Model): + id = db.Column(db.Integer, primary_key=True) + user_id = db.Column(db.String(120), nullable=False) + comment_id = db.Column(db.Integer, db.ForeignKey('comment.id'), nullable=False) + +@login_manager.user_loader +def load_user(user_id): + return User.query.get(int(user_id)) + +@app.route('/') +def home(): + posts = Post.query.order_by(Post.timestamp.desc()).all() + metadata = { + 'title': 'TechTalks - Twitter but for techtokers', + 'type': 'website', + 'url': url_for('home', _external=True, _scheme='https'), + 'description': 'TechTalks is a platform for tech enthusiasts to share and discuss ideas.', + 'image': url_for('static', filename='techtalks_logo.png', _external=True, _scheme='https'), # Add a logo image + } + return render_template('index.html', posts=posts, metadata=metadata) + +@app.route('/register', methods=['GET', 'POST']) +def register(): + if request.method == 'POST': + username = request.form.get('username') + password = request.form.get('password') + + user = User.query.filter_by(username=username).first() + if user: + flash('Username already exists') + return redirect(url_for('register')) + + new_user = User(username=username, password_hash=generate_password_hash(password, method='pbkdf2:sha256')) + db.session.add(new_user) + db.session.commit() + + login_user(new_user) # Log in the user immediately after registration + flash('Registration successful') + return redirect(url_for('home')) + + return render_template('register.html') + +@app.route('/login', methods=['GET', 'POST']) +def login(): + if request.method == 'POST': + username = request.form.get('username') + password = request.form.get('password') + + user = User.query.filter_by(username=username).first() + if user and check_password_hash(user.password_hash, password): + login_user(user) + return redirect(url_for('home')) + else: + flash('Invalid username or password') + + return render_template('login.html') + +@app.route('/logout') +@login_required +def logout(): + logout_user() + return redirect(url_for('home')) + +@app.route('/post', methods=['GET', 'POST']) +def post(): + if request.method == 'POST': + content = request.form.get('content') + image = request.files.get('image') + username = request.form.get('username') + + # Process Markdown and sanitize HTML + html_content = markdown.markdown(content) + allowed_tags = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'strong', 'ul', 'p', 'br'] + allowed_attributes = {'a': ['href', 'title']} + sanitized_content = bleach.clean(html_content, tags=allowed_tags, attributes=allowed_attributes, strip=True) + + if current_user.is_authenticated: + user = current_user + new_post = Post(content=sanitized_content, author=user) + else: + new_post = Post(content=sanitized_content, anonymous_username=username) + + if image: + create_upload_directory() + filename = secure_filename(image.filename) + image_path = os.path.join(app.root_path, app.config['UPLOAD_FOLDER'], filename) + image.save(image_path) + new_post.image_url = filename + + db.session.add(new_post) + db.session.commit() + + return redirect(url_for('home')) + + return render_template('post.html') + +@app.route('/like/', methods=['POST']) +def like_post(post_id): + post = Post.query.get_or_404(post_id) + + if current_user.is_authenticated: + user_id = current_user.id + else: + # For anonymous users, use IP address as a unique identifier + user_id = request.remote_addr + + like = Like.query.filter_by(user_id=user_id, post_id=post.id).first() + + if like: + db.session.delete(like) + else: + new_like = Like(user_id=user_id, post_id=post.id) + db.session.add(new_like) + + db.session.commit() + return jsonify({'likes': len(post.likes)}) + +@app.route('/comment/', methods=['POST']) +def add_comment(post_id): + content = request.form.get('content') + + if current_user.is_authenticated: + user = current_user + new_comment = Comment(content=content, author=user, post_id=post_id) + else: + new_comment = Comment(content=content, post_id=post_id) + + db.session.add(new_comment) + db.session.commit() + + return jsonify({ + 'success': True, + 'comment_id': new_comment.id, + 'username': user.username if current_user.is_authenticated else 'Anonymous', + 'content': new_comment.content + }) + +@app.route('/like_comment/', methods=['POST']) +def like_comment(comment_id): + comment = Comment.query.get_or_404(comment_id) + + if current_user.is_authenticated: + user_id = str(current_user.id) + else: + user_id = request.remote_addr # Use IP address for anonymous users + + like = CommentLike.query.filter_by(user_id=user_id, comment_id=comment.id).first() + + if like: + db.session.delete(like) + else: + new_like = CommentLike(user_id=user_id, comment_id=comment.id) + db.session.add(new_like) + + db.session.commit() + return jsonify({'likes': len(comment.likes)}) + +@app.route('/profile/', methods=['GET', 'POST']) +def profile(username): + user = User.query.filter_by(username=username).first_or_404() + if request.method == 'POST' and current_user.is_authenticated and current_user.id == user.id: + bio = request.form.get('bio') + cropped_data = request.form.get('cropped_data') + remove_picture = request.form.get('remove_picture') + + # Process Markdown and sanitize HTML for bio + if bio: + html = markdown.markdown(bio) + allowed_tags = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'strong', 'ul'] + allowed_attributes = {'a': ['href', 'title']} + user.bio = bleach.clean(html, tags=allowed_tags, attributes=allowed_attributes, strip=True) + else: + user.bio = None + + if remove_picture: + user.profile_picture = None + elif cropped_data: + filename = secure_filename(f"{user.username}_profile.png") + image_path = os.path.join(app.root_path, app.config['UPLOAD_FOLDER'], filename) + + # Decode the base64 image and save it + image_data = base64.b64decode(cropped_data.split(',')[1]) + with Image.open(io.BytesIO(image_data)) as img: + if img.mode in ('RGBA', 'LA'): + # Image has an alpha channel, save as PNG + img.save(image_path, 'PNG') + else: + # Image doesn't have an alpha channel, convert to RGB and save as JPEG + img = img.convert('RGB') + img.save(image_path, 'JPEG') + + user.profile_picture = filename + + db.session.commit() + flash('Profile updated successfully', 'success') + return redirect(url_for('profile', username=username)) + + posts = Post.query.filter_by(user_id=user.id).order_by(Post.timestamp.desc()).all() + post_count = len(posts) + like_count = sum(len(post.likes) for post in posts) + comment_count = sum(len(post.comments) for post in posts) + return render_template('profile.html', user=user, posts=posts, post_count=post_count, like_count=like_count, comment_count=comment_count) + +@app.route('/edit_post/', methods=['GET', 'POST']) +def edit_post(post_id): + post = Post.query.get_or_404(post_id) + if current_user.is_authenticated: + if post.author != current_user: + abort(403) + else: + if post.user_id != session['user_id']: + abort(403) + + if request.method == 'POST': + post.content = request.form.get('content') + db.session.commit() + flash('Your post has been updated!', 'success') + return redirect(url_for('home')) + return render_template('edit_post.html', post=post) + +@app.before_request +def before_request(): + if 'user_id' not in session: + session['user_id'] = str(uuid.uuid4()) + +def create_upload_directory(): + upload_dir = os.path.join(app.root_path, app.config['UPLOAD_FOLDER']) + if not os.path.exists(upload_dir): + os.makedirs(upload_dir) + print("Uploads directory created.") + else: + print("Uploads directory already exists.") + +def init_db(): + db_path = os.path.join(app.root_path, 'techtalks.db') + if not os.path.exists(db_path): + with app.app_context(): + db.create_all() + print("Database initialized.") + else: + print("Database already exists.") + +def admin_required(f): + @wraps(f) + def decorated_function(*args, **kwargs): + if 'admin' not in session: + return redirect(url_for('admin_login')) + return f(*args, **kwargs) + return decorated_function + +@app.route('/admin_login', methods=['GET', 'POST']) +def admin_login(): + if request.method == 'POST': + password = request.form.get('password') + if password == "lolmaa": + session['admin'] = True + return redirect(url_for('admin_dashboard')) + else: + flash('Invalid password') + return render_template('admin_login.html') + +@app.route('/admin') +@admin_required +def admin_dashboard(): + posts = Post.query.order_by(Post.timestamp.desc()).all() + return render_template('admin_dashboard.html', posts=posts) + +@app.route('/admin/delete_post/', methods=['POST']) +@admin_required +def delete_post(post_id): + post = Post.query.get_or_404(post_id) + + # Delete all comments associated with the post + Comment.query.filter_by(post_id=post_id).delete() + + # Delete all likes associated with the post + Like.query.filter_by(post_id=post_id).delete() + + # Delete the post + db.session.delete(post) + + try: + db.session.commit() + flash('Post and associated comments deleted successfully') + except Exception as e: + db.session.rollback() + flash('An error occurred while deleting the post: ' + str(e)) + + return redirect(url_for('admin_dashboard')) + +@app.route('/admin/reset_db', methods=['POST']) +@admin_required +def reset_db(): + try: + db.drop_all() + db.create_all() + flash('Database has been reset successfully', 'success') + except Exception as e: + flash(f'An error occurred while resetting the database: {str(e)}', 'danger') + return redirect(url_for('admin_dashboard')) + +@app.route('/admin/backup_db') +@admin_required +def backup_db(): + try: + # Create a JSON representation of the database + data = { + 'users': [{'id': u.id, 'username': u.username, 'bio': u.bio} for u in User.query.all()], + 'posts': [{'id': p.id, 'content': p.content, 'user_id': p.user_id, 'timestamp': p.timestamp.isoformat()} for p in Post.query.all()], + 'comments': [{'id': c.id, 'content': c.content, 'user_id': c.user_id, 'post_id': c.post_id, 'timestamp': c.timestamp.isoformat()} for c in Comment.query.all()], + } + + # Save the JSON to a file + with open('db_backup.json', 'w') as f: + json.dump(data, f, indent=2) + + # Send the file for download + return send_file('db_backup.json', as_attachment=True) + except Exception as e: + flash(f'An error occurred while creating the database backup: {str(e)}', 'danger') + return redirect(url_for('admin_dashboard')) + +@app.route('/admin/clear_uploads', methods=['POST']) +@admin_required +def clear_uploads(): + try: + upload_folder = app.config['UPLOAD_FOLDER'] + for filename in os.listdir(upload_folder): + file_path = os.path.join(upload_folder, filename) + if os.path.isfile(file_path): + os.unlink(file_path) + flash('Upload folder has been cleared successfully', 'success') + except Exception as e: + flash(f'An error occurred while clearing the upload folder: {str(e)}', 'danger') + return redirect(url_for('admin_dashboard')) + +@app.route('/post/') +def post_detail(post_id): + post = Post.query.get_or_404(post_id) + metadata = generate_opengraph_metadata(post) + return render_template('post_detail.html', post=post, metadata=metadata) + +def generate_opengraph_metadata(post): + metadata = { + 'title': f"Post by {post.author.username if post.author else 'Anonymous'} on TechTalks", + 'type': 'article', + 'url': url_for('post_detail', post_id=post.id, _external=True, _scheme='https'), + 'description': bleach.clean(post.content, tags=[], strip=True)[:200] + '...' if len(post.content) > 200 else bleach.clean(post.content, tags=[], strip=True), + 'image': url_for('static', filename='uploads/' + post.image_url, _external=True, _scheme='https') if post.image_url else None, + } + return metadata + +if __name__ == '__main__': + init_db() + create_upload_directory() + app.run(debug=True, port=5051, host='0.0.0.0') \ No newline at end of file