from flask import Flask, render_template, request, redirect, url_for, flash, jsonify, abort, session, send_file, render_template_string from flask_sqlalchemy import SQLAlchemy from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user from werkzeug.security import generate_password_hash, check_password_hash from werkzeug.utils import secure_filename import os from datetime import datetime, timedelta from sqlalchemy import func, desc import uuid from PIL import Image import io import base64 from functools import wraps import json import markdown import bleach from bs4 import BeautifulSoup from urllib.parse import urlparse, urljoin from flask_migrate import Migrate from sqlalchemy.exc import IntegrityError app = Flask(__name__) app.config['SECRET_KEY'] = 'your_secret_key' app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///techtalks.db' app.config['UPLOAD_FOLDER'] = 'static/uploads' db = SQLAlchemy(app) migrate = Migrate(app, db) login_manager = LoginManager(app) login_manager.login_view = 'login' def admin_required(f): @wraps(f) def decorated_function(*args, **kwargs): if 'admin' not in session: return redirect(url_for('admin_login')) return f(*args, **kwargs) return decorated_function # Association table for group members group_members = db.Table('group_members', db.Column('user_id', db.Integer, db.ForeignKey('user.id'), primary_key=True), db.Column('group_id', db.Integer, db.ForeignKey('group.id'), primary_key=True) ) # Model definitions class User(UserMixin, db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True, nullable=False) password_hash = db.Column(db.String(120), nullable=False) bio = db.Column(db.Text) rendered_bio = db.Column(db.Text) profile_picture = db.Column(db.String(200)) posts = db.relationship('Post', backref='author', lazy=True) comments = db.relationship('Comment', backref='author', lazy=True) groups = db.relationship('Group', secondary=group_members, back_populates='members', lazy='dynamic') created_groups = db.relationship('Group', backref='creator', lazy=True) group_homepage_settings = db.relationship('GroupHomepageSetting', back_populates='user', cascade='all, delete-orphan') def update_group_homepage_setting(self, group, show_in_homepage): setting = GroupHomepageSetting.query.filter_by(user_id=self.id, group_id=group.id).first() if setting: setting.show_in_homepage = show_in_homepage else: new_setting = GroupHomepageSetting(user_id=self.id, group_id=group.id, show_in_homepage=show_in_homepage) db.session.add(new_setting) def show_group_posts_in_homepage(self, group): setting = GroupHomepageSetting.query.filter_by(user_id=self.id, group_id=group.id).first() return setting.show_in_homepage if setting else False class Post(db.Model): id = db.Column(db.Integer, primary_key=True) content = db.Column(db.Text, nullable=False) original_content = db.Column(db.Text) image_url = db.Column(db.String(200)) timestamp = db.Column(db.DateTime, default=datetime.utcnow) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True) anonymous_username = db.Column(db.String(80)) likes = db.relationship('Like', backref='post', lazy=True, cascade='all, delete-orphan') comments = db.relationship('Comment', backref='post', lazy=True, cascade='all, delete-orphan') group_id = db.Column(db.Integer, db.ForeignKey('group.id'), nullable=True) class Group(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(100), nullable=False) description = db.Column(db.Text) vanity_url = db.Column(db.String(100), unique=True, nullable=False) image_url = db.Column(db.String(200)) visibility = db.Column(db.String(20), nullable=False) allow_requests = db.Column(db.Boolean, default=False) creator_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) members = db.relationship('User', secondary=group_members, back_populates='groups', lazy='dynamic') posts = db.relationship('Post', backref='group', lazy='dynamic') class Like(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.String(120), nullable=False) post_id = db.Column(db.Integer, db.ForeignKey('post.id'), nullable=False) class Comment(db.Model): id = db.Column(db.Integer, primary_key=True) content = db.Column(db.Text, nullable=False) timestamp = db.Column(db.DateTime, default=datetime.utcnow) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True) post_id = db.Column(db.Integer, db.ForeignKey('post.id', ondelete='CASCADE'), nullable=False) likes = db.relationship('CommentLike', backref='comment', lazy=True, cascade='all, delete-orphan') class CommentLike(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.String(120), nullable=False) comment_id = db.Column(db.Integer, db.ForeignKey('comment.id'), nullable=False) class GroupJoinRequest(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.String(120), nullable=False) # Changed to String to handle both authenticated and anonymous users group_id = db.Column(db.Integer, db.ForeignKey('group.id'), nullable=False) timestamp = db.Column(db.DateTime, default=datetime.utcnow) group = db.relationship('Group', backref='join_requests') class GroupHomepageSetting(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) group_id = db.Column(db.Integer, db.ForeignKey('group.id'), nullable=False) show_in_homepage = db.Column(db.Boolean, default=False) user = db.relationship('User', back_populates='group_homepage_settings') group = db.relationship('Group') @login_manager.user_loader def load_user(user_id): return User.query.get(int(user_id)) # Routes @app.route('/') def home(): sort = request.args.get('sort', 'newest') posts = sort_posts(sort) posts = [post for post in posts if post.group_id is None] # Exclude group posts metadata = { 'title': 'TechTalks - Twitter but for techtokers', 'type': 'website', 'url': url_for('home', _external=True, _scheme='https'), 'description': 'TechTalks is a platform for tech enthusiasts to share and discuss ideas.', 'image': url_for('static', filename='techtalks_logo.png', _external=True, _scheme='https'), } return render_template('index.html', posts=posts, metadata=metadata) @app.route('/sort_posts') def sort_posts_route(): sort = request.args.get('sort', 'newest') posts = sort_posts(sort) return render_template('posts_partial.html', posts=posts) def sort_posts(sort_option): if sort_option == 'newest': return Post.query.order_by(Post.timestamp.desc()).all() elif sort_option == 'most_liked_24h': twenty_four_hours_ago = datetime.utcnow() - timedelta(hours=24) return Post.query.outerjoin(Like).filter(Post.timestamp >= twenty_four_hours_ago).group_by(Post.id).order_by(desc(func.count(Like.id))).all() elif sort_option == 'most_commented': return Post.query.outerjoin(Comment).group_by(Post.id).order_by(desc(func.count(Comment.id))).all() elif sort_option == 'trending': three_days_ago = datetime.utcnow() - timedelta(days=3) return Post.query.outerjoin(Like).outerjoin(Comment).filter(Post.timestamp >= three_days_ago).group_by(Post.id).order_by(desc(func.count(Like.id) + func.count(Comment.id) * 2)).all() elif sort_option == 'controversial': return Post.query.outerjoin(Like).outerjoin(Comment).group_by(Post.id).order_by(desc(func.count(Like.id) * func.count(Comment.id))).all() else: return Post.query.order_by(Post.timestamp.desc()).all() @app.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') user = User.query.filter_by(username=username).first() if user: flash('Username already exists') return redirect(url_for('register')) new_user = User(username=username, password_hash=generate_password_hash(password, method='pbkdf2:sha256')) db.session.add(new_user) db.session.commit() login_user(new_user) # Log in the user immediately after registration flash('Registration successful') return redirect(url_for('home')) return render_template('register.html') @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') user = User.query.filter_by(username=username).first() if user and check_password_hash(user.password_hash, password): login_user(user, remember=True, duration=timedelta(days=30)) return redirect(url_for('home')) else: flash('Invalid username or password') return render_template('login.html') @app.route('/logout') @login_required def logout(): logout_user() return redirect(url_for('home')) @app.route('/post', methods=['GET', 'POST']) def post(): if request.method == 'POST': content = request.form.get('content') image = request.files.get('image') username = request.form.get('username') # Sanitize and process the content sanitized_content = sanitize_html(content) if current_user.is_authenticated: user = current_user new_post = Post(content=sanitized_content, original_content=content, author=user) else: new_post = Post(content=sanitized_content, original_content=content, anonymous_username=username) if image: create_upload_directory() filename = secure_filename(image.filename) image_path = os.path.join(app.root_path, app.config['UPLOAD_FOLDER'], filename) image.save(image_path) new_post.image_url = filename db.session.add(new_post) db.session.commit() return redirect(url_for('home')) return render_template('post.html') @app.route('/like/', methods=['POST']) def like_post(post_id): post = Post.query.get_or_404(post_id) if current_user.is_authenticated: user_id = current_user.id else: # For anonymous users, use IP address as a unique identifier user_id = request.remote_addr like = Like.query.filter_by(user_id=user_id, post_id=post.id).first() if like: db.session.delete(like) else: new_like = Like(user_id=user_id, post_id=post.id) db.session.add(new_like) db.session.commit() return jsonify({'likes': len(post.likes)}) @app.route('/comment/', methods=['POST']) def add_comment(post_id): content = request.form.get('content') if current_user.is_authenticated: user = current_user new_comment = Comment(content=content, author=user, post_id=post_id) else: new_comment = Comment(content=content, post_id=post_id) db.session.add(new_comment) db.session.commit() return jsonify({ 'success': True, 'comment_id': new_comment.id, 'username': user.username if current_user.is_authenticated else 'Anonymous', 'content': new_comment.content }) @app.route('/like_comment/', methods=['POST']) def like_comment(comment_id): comment = Comment.query.get_or_404(comment_id) if current_user.is_authenticated: user_id = str(current_user.id) else: user_id = request.remote_addr # Use IP address for anonymous users like = CommentLike.query.filter_by(user_id=user_id, comment_id=comment.id).first() if like: db.session.delete(like) else: new_like = CommentLike(user_id=user_id, comment_id=comment.id) db.session.add(new_like) db.session.commit() return jsonify({'likes': len(comment.likes)}) @app.route('/profile/', methods=['GET', 'POST']) def profile(username): user = User.query.filter_by(username=username).first_or_404() if request.method == 'POST' and current_user.is_authenticated and current_user.id == user.id: bio = request.form.get('bio') cropped_data = request.form.get('cropped_data') remove_picture = request.form.get('remove_picture') user.bio = bio # Store the original Markdown user.rendered_bio = sanitize_html(bio) # Store the rendered HTML if remove_picture: user.profile_picture = None elif cropped_data: filename = secure_filename(f"{user.username}_profile.png") image_path = os.path.join(app.root_path, app.config['UPLOAD_FOLDER'], filename) # Decode the base64 image and save it image_data = base64.b64decode(cropped_data.split(',')[1]) with Image.open(io.BytesIO(image_data)) as img: if img.mode in ('RGBA', 'LA'): # Image has an alpha channel, save as PNG img.save(image_path, 'PNG') else: # Image doesn't have an alpha channel, convert to RGB and save as JPEG img = img.convert('RGB') img.save(image_path, 'JPEG') user.profile_picture = filename db.session.commit() flash('Profile updated successfully', 'success') return redirect(url_for('profile', username=username)) posts = Post.query.filter_by(user_id=user.id).order_by(Post.timestamp.desc()).all() post_count = len(posts) like_count = sum(len(post.likes) for post in posts) comment_count = sum(len(post.comments) for post in posts) return render_template('profile.html', user=user, posts=posts, post_count=post_count, like_count=like_count, comment_count=comment_count) @app.route('/edit_post/', methods=['GET', 'POST']) def edit_post(post_id): post = Post.query.get_or_404(post_id) if current_user.is_authenticated: if post.author != current_user: abort(403) else: if post.user_id != session['user_id']: abort(403) if request.method == 'POST': if 'delete' in request.form: db.session.delete(post) db.session.commit() flash('Your post has been deleted!', 'success') return redirect(url_for('home')) else: content = request.form.get('content') sanitized_content = sanitize_html(content) post.content = sanitized_content post.original_content = content db.session.commit() flash('Your post has been updated!', 'success') return redirect(url_for('home')) return render_template('edit_post.html', post=post) @app.route('/post/') def post_detail(post_id): post = Post.query.get_or_404(post_id) metadata = generate_opengraph_metadata(post) return render_template('post_detail.html', post=post, metadata=metadata) # Group-related routes @app.route('/groups') def groups(): if current_user.is_authenticated: public_groups = Group.query.filter(Group.visibility != 'invite_only').all() user_groups = current_user.groups.all() # Convert AppenderQuery to list invite_only_groups = Group.query.filter_by(visibility='invite_only', creator=current_user).all() groups = list(set(public_groups + user_groups + invite_only_groups)) else: groups = Group.query.filter_by(visibility='public').all() return render_template('groups.html', groups=groups) @app.route('/create_group', methods=['GET', 'POST']) @login_required def create_group(): if request.method == 'POST': name = request.form.get('name') description = request.form.get('description') vanity_url = request.form.get('vanity_url') visibility = request.form.get('visibility') allow_requests = 'allow_requests' in request.form if visibility == 'invite_only' else False cropped_data = request.form.get('cropped_data') # Check if the vanity_url is already taken existing_group = Group.query.filter_by(vanity_url=vanity_url).first() if existing_group: flash('This vanity URL is already taken. Please choose a different one.', 'error') return render_template('create_group.html') new_group = Group(name=name, description=description, vanity_url=vanity_url, visibility=visibility, allow_requests=allow_requests, creator=current_user) if cropped_data: filename = secure_filename(f"group_{vanity_url}.png") image_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) # Decode the base64 image and save it image_data = base64.b64decode(cropped_data.split(',')[1]) with Image.open(io.BytesIO(image_data)) as img: img.save(image_path, 'PNG') new_group.image_url = filename # Add the creator to the group new_group.members.append(current_user) db.session.add(new_group) db.session.commit() flash('Group created successfully', 'success') return redirect(url_for('group_detail', vanity_url=vanity_url)) return render_template('create_group.html') @app.route('/g/') def group_detail(vanity_url): group = Group.query.filter_by(vanity_url=vanity_url).first_or_404() if group.visibility == 'invite_only': if not current_user.is_authenticated: flash('You need to be logged in to view this invite-only group.', 'warning') return redirect(url_for('login')) elif current_user not in group.members and current_user != group.creator: return render_template('group_detail.html', group=group, show_request_button=group.allow_requests) elif group.visibility == 'registered_only': if not current_user.is_authenticated: flash('You need to be logged in to view this group.', 'warning') return redirect(url_for('login')) join_requests = [] if current_user.is_authenticated and current_user == group.creator: join_requests = GroupJoinRequest.query.filter_by(group_id=group.id).all() can_join = current_user.is_authenticated and current_user not in group.members and (group.visibility == 'public' or group.visibility == 'registered_only') # Sort posts by newest first sorted_posts = sorted(group.posts, key=lambda x: x.timestamp, reverse=True) return render_template('group_detail.html', group=group, posts=sorted_posts, join_requests=join_requests, can_join=can_join) @app.route('/join_group/', methods=['POST']) @login_required def join_group(group_id): group = Group.query.get_or_404(group_id) if current_user not in group.members: if group.visibility == 'public': group.members.append(current_user) db.session.commit() flash('You have joined the group.', 'success') elif group.visibility == 'registered_only': if current_user.is_authenticated: group.members.append(current_user) db.session.commit() flash('You have joined the group.', 'success') else: flash('You need to be registered and logged in to join this group.', 'warning') elif group.visibility == 'invite_only': if group.allow_requests: new_request = GroupJoinRequest(user_id=current_user.id, group_id=group_id) db.session.add(new_request) db.session.commit() flash('Your request to join has been sent.', 'success') else: flash('This group is invite-only and not accepting join requests.', 'warning') else: flash('You cannot join this group.', 'warning') else: flash('You are already a member of this group.', 'info') return redirect(url_for('group_detail', vanity_url=group.vanity_url)) @app.route('/handle_join_request//', methods=['POST']) @login_required def handle_join_request(group_id, user_id): group = Group.query.get_or_404(group_id) if current_user != group.creator: abort(403) user = User.query.get_or_404(user_id) action = request.form.get('action') join_request = GroupJoinRequest.query.filter_by(user_id=user_id, group_id=group_id).first() if not join_request: flash('Join request not found.', 'error') return redirect(url_for('group_detail', vanity_url=group.vanity_url)) if action == 'accept': group.members.append(user) db.session.delete(join_request) flash(f'{user.username} has been added to the group.', 'success') elif action == 'reject': db.session.delete(join_request) flash(f'Join request from {user.username} has been rejected.', 'info') db.session.commit() return redirect(url_for('group_detail', vanity_url=group.vanity_url)) @app.route('/update_group_settings/', methods=['POST']) @login_required def update_group_settings(group_id): group = Group.query.get_or_404(group_id) if current_user != group.creator: abort(403) group.name = request.form.get('name') group.description = request.form.get('description') group.vanity_url = request.form.get('vanity_url') group.visibility = request.form.get('visibility') group.allow_requests = 'allow_requests' in request.form and request.form.get('visibility') == 'invite_only' if 'image' in request.files: image = request.files['image'] if image.filename != '': filename = secure_filename(f"group_{group.id}_{image.filename}") image_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) image.save(image_path) group.image_url = filename db.session.commit() flash('Group settings updated successfully', 'success') return redirect(url_for('group_detail', vanity_url=group.vanity_url)) @app.route('/remove_group_member//', methods=['POST']) @login_required def remove_group_member(group_id, user_id): group = Group.query.get_or_404(group_id) if current_user != group.creator: abort(403) user = User.query.get_or_404(user_id) if user == group.creator: flash('Cannot remove the group creator', 'error') else: group.members.remove(user) db.session.commit() flash(f'{user.username} has been removed from the group.', 'success') return redirect(url_for('group_detail', vanity_url=group.vanity_url)) @app.route('/leave_group/', methods=['POST']) @login_required def leave_group(group_id): group = Group.query.get_or_404(group_id) if current_user not in group.members: abort(403) if current_user == group.creator: flash('The group creator cannot leave the group.', 'error') else: group.members.remove(current_user) db.session.commit() flash(f'You have left the group {group.name}.', 'success') return redirect(url_for('groups')) @app.route('/g//post', methods=['POST']) @login_required def group_post(vanity_url): group = Group.query.filter_by(vanity_url=vanity_url).first_or_404() if current_user not in group.members: abort(403) content = request.form.get('content') if not content or content.strip() == '': flash('Post content cannot be empty.', 'error') return redirect(url_for('group_detail', vanity_url=vanity_url)) sanitized_content = sanitize_html(content) new_post = Post(content=sanitized_content, original_content=content, author=current_user, group=group) db.session.add(new_post) db.session.commit() flash('Your post has been added to the group.', 'success') return redirect(url_for('group_detail', vanity_url=vanity_url)) @app.route('/g//post//delete', methods=['POST']) @login_required def delete_group_post(vanity_url, post_id): group = Group.query.filter_by(vanity_url=vanity_url).first_or_404() post = Post.query.filter_by(id=post_id, group_id=group.id).first_or_404() if current_user != group.creator and current_user != post.author: abort(403) db.session.delete(post) db.session.commit() flash('Post has been deleted.', 'success') return redirect(url_for('group_detail', vanity_url=vanity_url)) # Admin routes @app.route('/admin_login', methods=['GET', 'POST']) def admin_login(): if request.method == 'POST': password = request.form.get('password') if password == "lolmaa": session['admin'] = True return redirect(url_for('admin_dashboard')) else: flash('Invalid password') return render_template('admin_login.html') @app.route('/admin') @admin_required def admin_dashboard(): posts = Post.query.order_by(Post.timestamp.desc()).all() return render_template('admin_dashboard.html', posts=posts) @app.route('/admin/delete_post/', methods=['POST']) @admin_required def delete_post(post_id): post = Post.query.get_or_404(post_id) # Delete all comments associated with the post Comment.query.filter_by(post_id=post_id).delete() # Delete all likes associated with the post Like.query.filter_by(post_id=post_id).delete() # Delete the post db.session.delete(post) try: db.session.commit() flash('Post and associated comments deleted successfully') except Exception as e: db.session.rollback() flash('An error occurred while deleting the post: ' + str(e)) return redirect(url_for('admin_dashboard')) @app.route('/admin/reset_db', methods=['POST']) @admin_required def reset_db(): try: db.drop_all() db.create_all() flash('Database has been reset successfully', 'success') except Exception as e: flash(f'An error occurred while resetting the database: {str(e)}', 'danger') return redirect(url_for('admin_dashboard')) @app.route('/admin/backup_db') @admin_required def backup_db(): try: # Create a JSON representation of the database data = { 'users': [{'id': u.id, 'username': u.username, 'bio': u.bio} for u in User.query.all()], 'posts': [{'id': p.id, 'content': p.content, 'user_id': p.user_id, 'timestamp': p.timestamp.isoformat()} for p in Post.query.all()], 'comments': [{'id': c.id, 'content': c.content, 'user_id': c.user_id, 'post_id': c.post_id, 'timestamp': c.timestamp.isoformat()} for c in Comment.query.all()], } # Save the JSON to a file with open('db_backup.json', 'w') as f: json.dump(data, f, indent=2) # Send the file for download return send_file('db_backup.json', as_attachment=True) except Exception as e: flash(f'An error occurred while creating the database backup: {str(e)}', 'danger') return redirect(url_for('admin_dashboard')) @app.route('/admin/clear_uploads', methods=['POST']) @admin_required def clear_uploads(): try: upload_folder = app.config['UPLOAD_FOLDER'] for filename in os.listdir(upload_folder): file_path = os.path.join(upload_folder, filename) if os.path.isfile(file_path): os.unlink(file_path) flash('Upload folder has been cleared successfully', 'success') except Exception as e: flash(f'An error occurred while clearing the upload folder: {str(e)}', 'danger') return redirect(url_for('admin_dashboard')) # Helper functions def generate_opengraph_metadata(post): if post.author: author_name = post.author.username elif post.anonymous_username: author_name = post.anonymous_username else: author_name = 'Anonymous' metadata = { 'title': f"Post by {author_name} on TechTalks", 'type': 'article', 'url': url_for('post_detail', post_id=post.id, _external=True, _scheme='https'), 'description': bleach.clean(post.content, tags=[], strip=True)[:200] + '...' if len(post.content) > 200 else bleach.clean(post.content, tags=[], strip=True), 'image': url_for('static', filename='uploads/' + post.image_url, _external=True, _scheme='https') if post.image_url else None, } return metadata def sanitize_html(content): # Convert Markdown to HTML html_content = markdown.markdown(content, extensions=['nl2br', 'fenced_code']) # Define allowed tags and attributes allowed_tags = ['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'strong', 'ul', 'br', 'hr', 'pre', 'span'] allowed_attributes = { 'a': ['href', 'title', 'target', 'rel'], 'p': ['style'], 'span': ['style'], 'h1': ['style'], 'h2': ['style'], 'h3': ['style'], 'h4': ['style'], 'h5': ['style'], 'h6': ['style'], } # Custom linkify function to handle URLs without protocol def linkify_external(attrs, new=False): href = attrs.get((None, 'href'), '').strip() if href.startswith('//'): attrs[(None, 'href')] = 'http:' + href elif not urlparse(href).scheme: tlds = ['.com', '.org', '.net', '.edu', '.gov', '.io', '.gay', '.lol', '.in', '.it', '.hu', '.me', '.uk', '.de', '.fr', '.es', '.ru', '.jp', '.cn', '.au', '.ca', '.br', '.mx', '.nl', '.pl', '.se', '.ch', '.at', '.be', '.dk', '.fi', '.no', '.nz', '.ie', '.sg', '.kr', '.za', '.ph', '.cl', '.ar', '.co', '.ve', '.gr', '.il', '.ae', '.pt', '.cz', '.ro', '.th', '.my', '.tr', '.ua', '.hk', '.vn', '.sa', '.eg', '.ng', '.bd', '.pk', '.ma', '.dz', '.ke', '.tn', '.by', '.lv', '.kz', '.uy', '.pe', '.do', '.ec', '.gt', '.py', '.bo', '.cu', '.hn', '.sv', '.ni', '.pa', '.bz', '.gy', '.tt', '.ag', '.dm', '.kn', '.lc', '.vc', '.jm', '.bs', '.bb', '.vg', '.tc', '.ms', '.ai', '.gd', '.fj', '.to', '.vu', '.sb', '.ws', '.ck', '.nu', '.tv', '.fm', '.ki', '.nr', '.pw', '.as', '.gu', '.mp', '.pr', '.vi', '.um', '.tf', '.yt', '.re', '.mq', '.gp', '.bl', '.mf', '.pm', '.wf', '.pf', '.nc', '.sx', '.aw', '.cw', '.bq'] if any(href.endswith(tld) for tld in tlds): attrs[(None, 'href')] = 'http://' + href attrs[(None, 'target')] = '_blank' attrs[(None, 'rel')] = 'noopener noreferrer' return attrs # Clean the HTML clean_html = bleach.clean(html_content, tags=allowed_tags, attributes=allowed_attributes) # Linkify the cleaned HTML linkified_html = bleach.linkify(clean_html, callbacks=[linkify_external]) return linkified_html @app.before_request def before_request(): if 'user_id' not in session: session['user_id'] = str(uuid.uuid4()) def create_upload_directory(): upload_dir = os.path.join(app.root_path, app.config['UPLOAD_FOLDER']) if not os.path.exists(upload_dir): os.makedirs(upload_dir) print("Uploads directory created.") else: print("Uploads directory already exists.") @app.route('/update_user_group_settings/', methods=['POST']) @login_required def update_user_group_settings(group_id): group = Group.query.get_or_404(group_id) if current_user not in group.members: abort(403) show_in_homepage = 'show_in_homepage' in request.form # Update the user's settings for this group current_user.update_group_homepage_setting(group, show_in_homepage) db.session.commit() flash('Your settings have been updated.', 'success') return redirect(url_for('group_detail', vanity_url=group.vanity_url)) @app.route('/sort_group_posts/') def sort_group_posts(group_id): sort = request.args.get('sort', 'newest') group = Group.query.get_or_404(group_id) if sort == 'newest': posts = sorted(group.posts, key=lambda x: x.timestamp) elif sort == 'oldest': posts = sorted(group.posts, key=lambda x: x.timestamp, reverse=True) elif sort == 'most_liked': posts = sorted(group.posts, key=lambda x: len(x.likes), reverse=True) elif sort == 'most_commented': posts = sorted(group.posts, key=lambda x: len(x.comments), reverse=True) else: posts = sorted(group.posts, key=lambda x: x.timestamp) # Default to newest return render_template_string(''' {% for post in posts %}
{% if post.author %} {% if post.author.profile_picture %} Profile Picture {% else %}
{{ post.author.username[0].upper() }}
{% endif %} {{ post.author.username }} {% else %} {{ post.anonymous_username }} {% endif %} {{ post.timestamp.strftime('%Y-%m-%d %H:%M:%S') }}

{{ post.content|safe }}

{% if post.image_url %} Post image {% endif %}
Open in new tab {% if current_user.is_authenticated and current_user.id == post.author.id %} Edit {% endif %}
{% endfor %} ''', posts=posts) if __name__ == '__main__': with app.app_context(): create_upload_directory() db.create_all() print("Database initialized.") app.run(debug=True,host='0.0.0.0',port=5051)