diff --git a/app.py b/app.py new file mode 100644 index 0000000..cd766e0 --- /dev/null +++ b/app.py @@ -0,0 +1,867 @@ +from flask import Flask, render_template, request, redirect, url_for, flash, jsonify, abort, session, send_file, render_template_string +from flask_sqlalchemy import SQLAlchemy +from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user +from werkzeug.security import generate_password_hash, check_password_hash +from werkzeug.utils import secure_filename +import os +from datetime import datetime, timedelta +from sqlalchemy import func, desc +import uuid +from PIL import Image +import io +import base64 +from functools import wraps +import json +import markdown +import bleach +from bs4 import BeautifulSoup +from urllib.parse import urlparse, urljoin +from flask_migrate import Migrate +from sqlalchemy.exc import IntegrityError + +app = Flask(__name__) +app.config['SECRET_KEY'] = 'your_secret_key' +app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///techtalks.db' +app.config['UPLOAD_FOLDER'] = 'static/uploads' +db = SQLAlchemy(app) +migrate = Migrate(app, db) +login_manager = LoginManager(app) +login_manager.login_view = 'login' + +def admin_required(f): + @wraps(f) + def decorated_function(*args, **kwargs): + if 'admin' not in session: + return redirect(url_for('admin_login')) + return f(*args, **kwargs) + return decorated_function + +# Association table for group members +group_members = db.Table('group_members', + db.Column('user_id', db.Integer, db.ForeignKey('user.id'), primary_key=True), + db.Column('group_id', db.Integer, db.ForeignKey('group.id'), primary_key=True) +) + +# Model definitions +class User(UserMixin, db.Model): + id = db.Column(db.Integer, primary_key=True) + username = db.Column(db.String(80), unique=True, nullable=False) + password_hash = db.Column(db.String(120), nullable=False) + bio = db.Column(db.Text) + rendered_bio = db.Column(db.Text) + profile_picture = db.Column(db.String(200)) + posts = db.relationship('Post', backref='author', lazy=True) + comments = db.relationship('Comment', backref='author', lazy=True) + groups = db.relationship('Group', secondary=group_members, back_populates='members', lazy='dynamic') + created_groups = db.relationship('Group', backref='creator', lazy=True) + group_homepage_settings = db.relationship('GroupHomepageSetting', back_populates='user', cascade='all, delete-orphan') + + def update_group_homepage_setting(self, group, show_in_homepage): + setting = GroupHomepageSetting.query.filter_by(user_id=self.id, group_id=group.id).first() + if setting: + setting.show_in_homepage = show_in_homepage + else: + new_setting = GroupHomepageSetting(user_id=self.id, group_id=group.id, show_in_homepage=show_in_homepage) + db.session.add(new_setting) + + def show_group_posts_in_homepage(self, group): + setting = GroupHomepageSetting.query.filter_by(user_id=self.id, group_id=group.id).first() + return setting.show_in_homepage if setting else False + +class Post(db.Model): + id = db.Column(db.Integer, primary_key=True) + content = db.Column(db.Text, nullable=False) + original_content = db.Column(db.Text) + image_url = db.Column(db.String(200)) + timestamp = db.Column(db.DateTime, default=datetime.utcnow) + user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True) + anonymous_username = db.Column(db.String(80)) + likes = db.relationship('Like', backref='post', lazy=True, cascade='all, delete-orphan') + comments = db.relationship('Comment', backref='post', lazy=True, cascade='all, delete-orphan') + group_id = db.Column(db.Integer, db.ForeignKey('group.id'), nullable=True) + +class Group(db.Model): + id = db.Column(db.Integer, primary_key=True) + name = db.Column(db.String(100), nullable=False) + description = db.Column(db.Text) + vanity_url = db.Column(db.String(100), unique=True, nullable=False) + image_url = db.Column(db.String(200)) + visibility = db.Column(db.String(20), nullable=False) + allow_requests = db.Column(db.Boolean, default=False) + creator_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) + members = db.relationship('User', secondary=group_members, back_populates='groups', lazy='dynamic') + posts = db.relationship('Post', backref='group', lazy='dynamic') + +class Like(db.Model): + id = db.Column(db.Integer, primary_key=True) + user_id = db.Column(db.String(120), nullable=False) + post_id = db.Column(db.Integer, db.ForeignKey('post.id'), nullable=False) + +class Comment(db.Model): + id = db.Column(db.Integer, primary_key=True) + content = db.Column(db.Text, nullable=False) + timestamp = db.Column(db.DateTime, default=datetime.utcnow) + user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True) + post_id = db.Column(db.Integer, db.ForeignKey('post.id', ondelete='CASCADE'), nullable=False) + likes = db.relationship('CommentLike', backref='comment', lazy=True, cascade='all, delete-orphan') + +class CommentLike(db.Model): + id = db.Column(db.Integer, primary_key=True) + user_id = db.Column(db.String(120), nullable=False) + comment_id = db.Column(db.Integer, db.ForeignKey('comment.id'), nullable=False) + + +class GroupJoinRequest(db.Model): + id = db.Column(db.Integer, primary_key=True) + user_id = db.Column(db.String(120), nullable=False) # Changed to String to handle both authenticated and anonymous users + group_id = db.Column(db.Integer, db.ForeignKey('group.id'), nullable=False) + timestamp = db.Column(db.DateTime, default=datetime.utcnow) + group = db.relationship('Group', backref='join_requests') + +class GroupHomepageSetting(db.Model): + id = db.Column(db.Integer, primary_key=True) + user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) + group_id = db.Column(db.Integer, db.ForeignKey('group.id'), nullable=False) + show_in_homepage = db.Column(db.Boolean, default=False) + user = db.relationship('User', back_populates='group_homepage_settings') + group = db.relationship('Group') + +@login_manager.user_loader +def load_user(user_id): + return User.query.get(int(user_id)) + +# Routes +@app.route('/') +def home(): + sort = request.args.get('sort', 'newest') + posts = sort_posts(sort) + posts = [post for post in posts if post.group_id is None] # Exclude group posts + metadata = { + 'title': 'TechTalks - Twitter but for techtokers', + 'type': 'website', + 'url': url_for('home', _external=True, _scheme='https'), + 'description': 'TechTalks is a platform for tech enthusiasts to share and discuss ideas.', + 'image': url_for('static', filename='techtalks_logo.png', _external=True, _scheme='https'), + } + return render_template('index.html', posts=posts, metadata=metadata) + +@app.route('/sort_posts') +def sort_posts_route(): + sort = request.args.get('sort', 'newest') + posts = sort_posts(sort) + return render_template('posts_partial.html', posts=posts) + +def sort_posts(sort_option): + if sort_option == 'newest': + return Post.query.order_by(Post.timestamp.desc()).all() + elif sort_option == 'most_liked_24h': + twenty_four_hours_ago = datetime.utcnow() - timedelta(hours=24) + return Post.query.outerjoin(Like).filter(Post.timestamp >= twenty_four_hours_ago).group_by(Post.id).order_by(desc(func.count(Like.id))).all() + elif sort_option == 'most_commented': + return Post.query.outerjoin(Comment).group_by(Post.id).order_by(desc(func.count(Comment.id))).all() + elif sort_option == 'trending': + three_days_ago = datetime.utcnow() - timedelta(days=3) + return Post.query.outerjoin(Like).outerjoin(Comment).filter(Post.timestamp >= three_days_ago).group_by(Post.id).order_by(desc(func.count(Like.id) + func.count(Comment.id) * 2)).all() + elif sort_option == 'controversial': + return Post.query.outerjoin(Like).outerjoin(Comment).group_by(Post.id).order_by(desc(func.count(Like.id) * func.count(Comment.id))).all() + else: + return Post.query.order_by(Post.timestamp.desc()).all() + +@app.route('/register', methods=['GET', 'POST']) +def register(): + if request.method == 'POST': + username = request.form.get('username') + password = request.form.get('password') + + user = User.query.filter_by(username=username).first() + if user: + flash('Username already exists') + return redirect(url_for('register')) + + new_user = User(username=username, password_hash=generate_password_hash(password, method='pbkdf2:sha256')) + db.session.add(new_user) + db.session.commit() + + login_user(new_user) # Log in the user immediately after registration + flash('Registration successful') + return redirect(url_for('home')) + + return render_template('register.html') + +@app.route('/login', methods=['GET', 'POST']) +def login(): + if request.method == 'POST': + username = request.form.get('username') + password = request.form.get('password') + + user = User.query.filter_by(username=username).first() + if user and check_password_hash(user.password_hash, password): + login_user(user, remember=True, duration=timedelta(days=30)) + return redirect(url_for('home')) + else: + flash('Invalid username or password') + + return render_template('login.html') + +@app.route('/logout') +@login_required +def logout(): + logout_user() + return redirect(url_for('home')) + +@app.route('/post', methods=['GET', 'POST']) +def post(): + if request.method == 'POST': + content = request.form.get('content') + image = request.files.get('image') + username = request.form.get('username') + + # Sanitize and process the content + sanitized_content = sanitize_html(content) + + if current_user.is_authenticated: + user = current_user + new_post = Post(content=sanitized_content, original_content=content, author=user) + else: + new_post = Post(content=sanitized_content, original_content=content, anonymous_username=username) + + if image: + create_upload_directory() + filename = secure_filename(image.filename) + image_path = os.path.join(app.root_path, app.config['UPLOAD_FOLDER'], filename) + image.save(image_path) + new_post.image_url = filename + + db.session.add(new_post) + db.session.commit() + + return redirect(url_for('home')) + + return render_template('post.html') + +@app.route('/like/', methods=['POST']) +def like_post(post_id): + post = Post.query.get_or_404(post_id) + + if current_user.is_authenticated: + user_id = current_user.id + else: + # For anonymous users, use IP address as a unique identifier + user_id = request.remote_addr + + like = Like.query.filter_by(user_id=user_id, post_id=post.id).first() + + if like: + db.session.delete(like) + else: + new_like = Like(user_id=user_id, post_id=post.id) + db.session.add(new_like) + + db.session.commit() + return jsonify({'likes': len(post.likes)}) + +@app.route('/comment/', methods=['POST']) +def add_comment(post_id): + content = request.form.get('content') + + if current_user.is_authenticated: + user = current_user + new_comment = Comment(content=content, author=user, post_id=post_id) + else: + new_comment = Comment(content=content, post_id=post_id) + + db.session.add(new_comment) + db.session.commit() + + return jsonify({ + 'success': True, + 'comment_id': new_comment.id, + 'username': user.username if current_user.is_authenticated else 'Anonymous', + 'content': new_comment.content + }) + +@app.route('/like_comment/', methods=['POST']) +def like_comment(comment_id): + comment = Comment.query.get_or_404(comment_id) + + if current_user.is_authenticated: + user_id = str(current_user.id) + else: + user_id = request.remote_addr # Use IP address for anonymous users + + like = CommentLike.query.filter_by(user_id=user_id, comment_id=comment.id).first() + + if like: + db.session.delete(like) + else: + new_like = CommentLike(user_id=user_id, comment_id=comment.id) + db.session.add(new_like) + + db.session.commit() + return jsonify({'likes': len(comment.likes)}) + +@app.route('/profile/', methods=['GET', 'POST']) +def profile(username): + user = User.query.filter_by(username=username).first_or_404() + if request.method == 'POST' and current_user.is_authenticated and current_user.id == user.id: + bio = request.form.get('bio') + cropped_data = request.form.get('cropped_data') + remove_picture = request.form.get('remove_picture') + + user.bio = bio # Store the original Markdown + user.rendered_bio = sanitize_html(bio) # Store the rendered HTML + + if remove_picture: + user.profile_picture = None + elif cropped_data: + filename = secure_filename(f"{user.username}_profile.png") + image_path = os.path.join(app.root_path, app.config['UPLOAD_FOLDER'], filename) + + # Decode the base64 image and save it + image_data = base64.b64decode(cropped_data.split(',')[1]) + with Image.open(io.BytesIO(image_data)) as img: + if img.mode in ('RGBA', 'LA'): + # Image has an alpha channel, save as PNG + img.save(image_path, 'PNG') + else: + # Image doesn't have an alpha channel, convert to RGB and save as JPEG + img = img.convert('RGB') + img.save(image_path, 'JPEG') + + user.profile_picture = filename + + db.session.commit() + flash('Profile updated successfully', 'success') + return redirect(url_for('profile', username=username)) + + posts = Post.query.filter_by(user_id=user.id).order_by(Post.timestamp.desc()).all() + post_count = len(posts) + like_count = sum(len(post.likes) for post in posts) + comment_count = sum(len(post.comments) for post in posts) + return render_template('profile.html', user=user, posts=posts, post_count=post_count, like_count=like_count, comment_count=comment_count) + +@app.route('/edit_post/', methods=['GET', 'POST']) +def edit_post(post_id): + post = Post.query.get_or_404(post_id) + if current_user.is_authenticated: + if post.author != current_user: + abort(403) + else: + if post.user_id != session['user_id']: + abort(403) + + if request.method == 'POST': + if 'delete' in request.form: + db.session.delete(post) + db.session.commit() + flash('Your post has been deleted!', 'success') + return redirect(url_for('home')) + else: + content = request.form.get('content') + sanitized_content = sanitize_html(content) + post.content = sanitized_content + post.original_content = content + db.session.commit() + flash('Your post has been updated!', 'success') + return redirect(url_for('home')) + + return render_template('edit_post.html', post=post) + +@app.route('/post/') +def post_detail(post_id): + post = Post.query.get_or_404(post_id) + metadata = generate_opengraph_metadata(post) + return render_template('post_detail.html', post=post, metadata=metadata) + +# Group-related routes +@app.route('/groups') +def groups(): + if current_user.is_authenticated: + public_groups = Group.query.filter(Group.visibility != 'invite_only').all() + user_groups = current_user.groups.all() # Convert AppenderQuery to list + invite_only_groups = Group.query.filter_by(visibility='invite_only', creator=current_user).all() + groups = list(set(public_groups + user_groups + invite_only_groups)) + else: + groups = Group.query.filter_by(visibility='public').all() + return render_template('groups.html', groups=groups) + +@app.route('/create_group', methods=['GET', 'POST']) +@login_required +def create_group(): + if request.method == 'POST': + name = request.form.get('name') + description = request.form.get('description') + vanity_url = request.form.get('vanity_url') + visibility = request.form.get('visibility') + allow_requests = 'allow_requests' in request.form if visibility == 'invite_only' else False + cropped_data = request.form.get('cropped_data') + + # Check if the vanity_url is already taken + existing_group = Group.query.filter_by(vanity_url=vanity_url).first() + if existing_group: + flash('This vanity URL is already taken. Please choose a different one.', 'error') + return render_template('create_group.html') + + new_group = Group(name=name, description=description, vanity_url=vanity_url, + visibility=visibility, allow_requests=allow_requests, + creator=current_user) + + if cropped_data: + filename = secure_filename(f"group_{vanity_url}.png") + image_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) + + # Decode the base64 image and save it + image_data = base64.b64decode(cropped_data.split(',')[1]) + with Image.open(io.BytesIO(image_data)) as img: + img.save(image_path, 'PNG') + + new_group.image_url = filename + + # Add the creator to the group + new_group.members.append(current_user) + db.session.add(new_group) + db.session.commit() + + flash('Group created successfully', 'success') + return redirect(url_for('group_detail', vanity_url=vanity_url)) + + return render_template('create_group.html') + +@app.route('/g/') +def group_detail(vanity_url): + group = Group.query.filter_by(vanity_url=vanity_url).first_or_404() + + if group.visibility == 'invite_only': + if not current_user.is_authenticated: + flash('You need to be logged in to view this invite-only group.', 'warning') + return redirect(url_for('login')) + elif current_user not in group.members and current_user != group.creator: + return render_template('group_detail.html', group=group, show_request_button=group.allow_requests) + elif group.visibility == 'registered_only': + if not current_user.is_authenticated: + flash('You need to be logged in to view this group.', 'warning') + return redirect(url_for('login')) + + join_requests = [] + if current_user.is_authenticated and current_user == group.creator: + join_requests = GroupJoinRequest.query.filter_by(group_id=group.id).all() + + can_join = current_user.is_authenticated and current_user not in group.members and (group.visibility == 'public' or group.visibility == 'registered_only') + + # Sort posts by newest first + sorted_posts = sorted(group.posts, key=lambda x: x.timestamp, reverse=True) + + return render_template('group_detail.html', group=group, posts=sorted_posts, join_requests=join_requests, can_join=can_join) + +@app.route('/join_group/', methods=['POST']) +@login_required +def join_group(group_id): + group = Group.query.get_or_404(group_id) + if current_user not in group.members: + if group.visibility == 'public': + group.members.append(current_user) + db.session.commit() + flash('You have joined the group.', 'success') + elif group.visibility == 'registered_only': + if current_user.is_authenticated: + group.members.append(current_user) + db.session.commit() + flash('You have joined the group.', 'success') + else: + flash('You need to be registered and logged in to join this group.', 'warning') + elif group.visibility == 'invite_only': + if group.allow_requests: + new_request = GroupJoinRequest(user_id=current_user.id, group_id=group_id) + db.session.add(new_request) + db.session.commit() + flash('Your request to join has been sent.', 'success') + else: + flash('This group is invite-only and not accepting join requests.', 'warning') + else: + flash('You cannot join this group.', 'warning') + else: + flash('You are already a member of this group.', 'info') + return redirect(url_for('group_detail', vanity_url=group.vanity_url)) + +@app.route('/handle_join_request//', methods=['POST']) +@login_required +def handle_join_request(group_id, user_id): + group = Group.query.get_or_404(group_id) + if current_user != group.creator: + abort(403) + + user = User.query.get_or_404(user_id) + action = request.form.get('action') + + join_request = GroupJoinRequest.query.filter_by(user_id=user_id, group_id=group_id).first() + if not join_request: + flash('Join request not found.', 'error') + return redirect(url_for('group_detail', vanity_url=group.vanity_url)) + + if action == 'accept': + group.members.append(user) + db.session.delete(join_request) + flash(f'{user.username} has been added to the group.', 'success') + elif action == 'reject': + db.session.delete(join_request) + flash(f'Join request from {user.username} has been rejected.', 'info') + + db.session.commit() + return redirect(url_for('group_detail', vanity_url=group.vanity_url)) + +@app.route('/update_group_settings/', methods=['POST']) +@login_required +def update_group_settings(group_id): + group = Group.query.get_or_404(group_id) + if current_user != group.creator: + abort(403) + + group.name = request.form.get('name') + group.description = request.form.get('description') + group.vanity_url = request.form.get('vanity_url') + group.visibility = request.form.get('visibility') + group.allow_requests = 'allow_requests' in request.form and request.form.get('visibility') == 'invite_only' + + if 'image' in request.files: + image = request.files['image'] + if image.filename != '': + filename = secure_filename(f"group_{group.id}_{image.filename}") + image_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) + image.save(image_path) + group.image_url = filename + + db.session.commit() + flash('Group settings updated successfully', 'success') + return redirect(url_for('group_detail', vanity_url=group.vanity_url)) + +@app.route('/remove_group_member//', methods=['POST']) +@login_required +def remove_group_member(group_id, user_id): + group = Group.query.get_or_404(group_id) + if current_user != group.creator: + abort(403) + + user = User.query.get_or_404(user_id) + if user == group.creator: + flash('Cannot remove the group creator', 'error') + else: + group.members.remove(user) + db.session.commit() + flash(f'{user.username} has been removed from the group.', 'success') + + return redirect(url_for('group_detail', vanity_url=group.vanity_url)) + +@app.route('/leave_group/', methods=['POST']) +@login_required +def leave_group(group_id): + group = Group.query.get_or_404(group_id) + if current_user not in group.members: + abort(403) + if current_user == group.creator: + flash('The group creator cannot leave the group.', 'error') + else: + group.members.remove(current_user) + db.session.commit() + flash(f'You have left the group {group.name}.', 'success') + + return redirect(url_for('groups')) + +@app.route('/g//post', methods=['POST']) +@login_required +def group_post(vanity_url): + group = Group.query.filter_by(vanity_url=vanity_url).first_or_404() + if current_user not in group.members: + abort(403) + + content = request.form.get('content') + if not content or content.strip() == '': + flash('Post content cannot be empty.', 'error') + return redirect(url_for('group_detail', vanity_url=vanity_url)) + + sanitized_content = sanitize_html(content) + new_post = Post(content=sanitized_content, original_content=content, author=current_user, group=group) + db.session.add(new_post) + db.session.commit() + + flash('Your post has been added to the group.', 'success') + return redirect(url_for('group_detail', vanity_url=vanity_url)) + +@app.route('/g//post//delete', methods=['POST']) +@login_required +def delete_group_post(vanity_url, post_id): + group = Group.query.filter_by(vanity_url=vanity_url).first_or_404() + post = Post.query.filter_by(id=post_id, group_id=group.id).first_or_404() + + if current_user != group.creator and current_user != post.author: + abort(403) + + db.session.delete(post) + db.session.commit() + flash('Post has been deleted.', 'success') + return redirect(url_for('group_detail', vanity_url=vanity_url)) + +# Admin routes +@app.route('/admin_login', methods=['GET', 'POST']) +def admin_login(): + if request.method == 'POST': + password = request.form.get('password') + if password == "lolmaa": + session['admin'] = True + return redirect(url_for('admin_dashboard')) + else: + flash('Invalid password') + return render_template('admin_login.html') + +@app.route('/admin') +@admin_required +def admin_dashboard(): + posts = Post.query.order_by(Post.timestamp.desc()).all() + return render_template('admin_dashboard.html', posts=posts) + +@app.route('/admin/delete_post/', methods=['POST']) +@admin_required +def delete_post(post_id): + post = Post.query.get_or_404(post_id) + + # Delete all comments associated with the post + Comment.query.filter_by(post_id=post_id).delete() + + # Delete all likes associated with the post + Like.query.filter_by(post_id=post_id).delete() + + # Delete the post + db.session.delete(post) + + try: + db.session.commit() + flash('Post and associated comments deleted successfully') + except Exception as e: + db.session.rollback() + flash('An error occurred while deleting the post: ' + str(e)) + + return redirect(url_for('admin_dashboard')) + +@app.route('/admin/reset_db', methods=['POST']) +@admin_required +def reset_db(): + try: + db.drop_all() + db.create_all() + flash('Database has been reset successfully', 'success') + except Exception as e: + flash(f'An error occurred while resetting the database: {str(e)}', 'danger') + return redirect(url_for('admin_dashboard')) + +@app.route('/admin/backup_db') +@admin_required +def backup_db(): + try: + # Create a JSON representation of the database + data = { + 'users': [{'id': u.id, 'username': u.username, 'bio': u.bio} for u in User.query.all()], + 'posts': [{'id': p.id, 'content': p.content, 'user_id': p.user_id, 'timestamp': p.timestamp.isoformat()} for p in Post.query.all()], + 'comments': [{'id': c.id, 'content': c.content, 'user_id': c.user_id, 'post_id': c.post_id, 'timestamp': c.timestamp.isoformat()} for c in Comment.query.all()], + } + + # Save the JSON to a file + with open('db_backup.json', 'w') as f: + json.dump(data, f, indent=2) + + # Send the file for download + return send_file('db_backup.json', as_attachment=True) + except Exception as e: + flash(f'An error occurred while creating the database backup: {str(e)}', 'danger') + return redirect(url_for('admin_dashboard')) + +@app.route('/admin/clear_uploads', methods=['POST']) +@admin_required +def clear_uploads(): + try: + upload_folder = app.config['UPLOAD_FOLDER'] + for filename in os.listdir(upload_folder): + file_path = os.path.join(upload_folder, filename) + if os.path.isfile(file_path): + os.unlink(file_path) + flash('Upload folder has been cleared successfully', 'success') + except Exception as e: + flash(f'An error occurred while clearing the upload folder: {str(e)}', 'danger') + return redirect(url_for('admin_dashboard')) + +# Helper functions +def generate_opengraph_metadata(post): + if post.author: + author_name = post.author.username + elif post.anonymous_username: + author_name = post.anonymous_username + else: + author_name = 'Anonymous' + + metadata = { + 'title': f"Post by {author_name} on TechTalks", + 'type': 'article', + 'url': url_for('post_detail', post_id=post.id, _external=True, _scheme='https'), + 'description': bleach.clean(post.content, tags=[], strip=True)[:200] + '...' if len(post.content) > 200 else bleach.clean(post.content, tags=[], strip=True), + 'image': url_for('static', filename='uploads/' + post.image_url, _external=True, _scheme='https') if post.image_url else None, + } + return metadata + +def sanitize_html(content): + # Convert Markdown to HTML + html_content = markdown.markdown(content, extensions=['nl2br', 'fenced_code']) + + # Define allowed tags and attributes + allowed_tags = ['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'strong', 'ul', 'br', 'hr', 'pre', 'span'] + allowed_attributes = { + 'a': ['href', 'title', 'target', 'rel'], + 'p': ['style'], + 'span': ['style'], + 'h1': ['style'], + 'h2': ['style'], + 'h3': ['style'], + 'h4': ['style'], + 'h5': ['style'], + 'h6': ['style'], + } + + # Custom linkify function to handle URLs without protocol + def linkify_external(attrs, new=False): + href = attrs.get((None, 'href'), '').strip() + if href.startswith('//'): + attrs[(None, 'href')] = 'http:' + href + elif not urlparse(href).scheme: + tlds = ['.com', '.org', '.net', '.edu', '.gov', '.io', '.gay', '.lol', '.in', '.it', '.hu', '.me', '.uk', '.de', '.fr', '.es', '.ru', '.jp', '.cn', '.au', '.ca', '.br', '.mx', '.nl', '.pl', '.se', '.ch', '.at', '.be', '.dk', '.fi', '.no', '.nz', '.ie', '.sg', '.kr', '.za', '.ph', '.cl', '.ar', '.co', '.ve', '.gr', '.il', '.ae', '.pt', '.cz', '.ro', '.th', '.my', '.tr', '.ua', '.hk', '.vn', '.sa', '.eg', '.ng', '.bd', '.pk', '.ma', '.dz', '.ke', '.tn', '.by', '.lv', '.kz', '.uy', '.pe', '.do', '.ec', '.gt', '.py', '.bo', '.cu', '.hn', '.sv', '.ni', '.pa', '.bz', '.gy', '.tt', '.ag', '.dm', '.kn', '.lc', '.vc', '.jm', '.bs', '.bb', '.vg', '.tc', '.ms', '.ai', '.gd', '.fj', '.to', '.vu', '.sb', '.ws', '.ck', '.nu', '.tv', '.fm', '.ki', '.nr', '.pw', '.as', '.gu', '.mp', '.pr', '.vi', '.um', '.tf', '.yt', '.re', '.mq', '.gp', '.bl', '.mf', '.pm', '.wf', '.pf', '.nc', '.sx', '.aw', '.cw', '.bq'] + if any(href.endswith(tld) for tld in tlds): + attrs[(None, 'href')] = 'http://' + href + attrs[(None, 'target')] = '_blank' + attrs[(None, 'rel')] = 'noopener noreferrer' + return attrs + + # Clean the HTML + clean_html = bleach.clean(html_content, tags=allowed_tags, attributes=allowed_attributes) + + # Linkify the cleaned HTML + linkified_html = bleach.linkify(clean_html, callbacks=[linkify_external]) + + return linkified_html + +@app.before_request +def before_request(): + if 'user_id' not in session: + session['user_id'] = str(uuid.uuid4()) + +def create_upload_directory(): + upload_dir = os.path.join(app.root_path, app.config['UPLOAD_FOLDER']) + if not os.path.exists(upload_dir): + os.makedirs(upload_dir) + print("Uploads directory created.") + else: + print("Uploads directory already exists.") + +@app.route('/update_user_group_settings/', methods=['POST']) +@login_required +def update_user_group_settings(group_id): + group = Group.query.get_or_404(group_id) + if current_user not in group.members: + abort(403) + + show_in_homepage = 'show_in_homepage' in request.form + + # Update the user's settings for this group + current_user.update_group_homepage_setting(group, show_in_homepage) + + db.session.commit() + flash('Your settings have been updated.', 'success') + return redirect(url_for('group_detail', vanity_url=group.vanity_url)) + +@app.route('/sort_group_posts/') +def sort_group_posts(group_id): + sort = request.args.get('sort', 'newest') + group = Group.query.get_or_404(group_id) + + if sort == 'newest': + posts = sorted(group.posts, key=lambda x: x.timestamp) + elif sort == 'oldest': + posts = sorted(group.posts, key=lambda x: x.timestamp, reverse=True) + elif sort == 'most_liked': + posts = sorted(group.posts, key=lambda x: len(x.likes), reverse=True) + elif sort == 'most_commented': + posts = sorted(group.posts, key=lambda x: len(x.comments), reverse=True) + else: + posts = sorted(group.posts, key=lambda x: x.timestamp) # Default to newest + + return render_template_string(''' + {% for post in posts %} +
+
+
+ {% if post.author %} + {% if post.author.profile_picture %} + Profile Picture + {% else %} +
+ {{ post.author.username[0].upper() }} +
+ {% endif %} + + {{ post.author.username }} + + {% else %} + {{ post.anonymous_username }} + {% endif %} + {{ post.timestamp.strftime('%Y-%m-%d %H:%M:%S') }} +
+

{{ post.content|safe }}

+ {% if post.image_url %} + Post image + {% endif %} +
+
+ + +
+
+ + Open in new tab + + + {% if current_user.is_authenticated and current_user.id == post.author.id %} + + Edit + + {% endif %} +
+
+
+ +
+ {% endfor %} + ''', posts=posts) + +if __name__ == '__main__': + with app.app_context(): + create_upload_directory() + + db.create_all() + print("Database initialized.") + app.run(debug=True,host='0.0.0.0',port=5051) \ No newline at end of file