techtalk/app.py

867 lines
37 KiB
Python
Raw Normal View History

from flask import Flask, render_template, request, redirect, url_for, flash, jsonify, abort, session, send_file, render_template_string
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
from werkzeug.security import generate_password_hash, check_password_hash
from werkzeug.utils import secure_filename
import os
from datetime import datetime, timedelta
from sqlalchemy import func, desc
import uuid
from PIL import Image
import io
import base64
from functools import wraps
import json
import markdown
import bleach
from bs4 import BeautifulSoup
from urllib.parse import urlparse, urljoin
from flask_migrate import Migrate
from sqlalchemy.exc import IntegrityError
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your_secret_key'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///techtalks.db'
app.config['UPLOAD_FOLDER'] = 'static/uploads'
db = SQLAlchemy(app)
migrate = Migrate(app, db)
login_manager = LoginManager(app)
login_manager.login_view = 'login'
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'admin' not in session:
return redirect(url_for('admin_login'))
return f(*args, **kwargs)
return decorated_function
# Association table for group members
group_members = db.Table('group_members',
db.Column('user_id', db.Integer, db.ForeignKey('user.id'), primary_key=True),
db.Column('group_id', db.Integer, db.ForeignKey('group.id'), primary_key=True)
)
# Model definitions
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
password_hash = db.Column(db.String(120), nullable=False)
bio = db.Column(db.Text)
rendered_bio = db.Column(db.Text)
profile_picture = db.Column(db.String(200))
posts = db.relationship('Post', backref='author', lazy=True)
comments = db.relationship('Comment', backref='author', lazy=True)
groups = db.relationship('Group', secondary=group_members, back_populates='members', lazy='dynamic')
created_groups = db.relationship('Group', backref='creator', lazy=True)
group_homepage_settings = db.relationship('GroupHomepageSetting', back_populates='user', cascade='all, delete-orphan')
def update_group_homepage_setting(self, group, show_in_homepage):
setting = GroupHomepageSetting.query.filter_by(user_id=self.id, group_id=group.id).first()
if setting:
setting.show_in_homepage = show_in_homepage
else:
new_setting = GroupHomepageSetting(user_id=self.id, group_id=group.id, show_in_homepage=show_in_homepage)
db.session.add(new_setting)
def show_group_posts_in_homepage(self, group):
setting = GroupHomepageSetting.query.filter_by(user_id=self.id, group_id=group.id).first()
return setting.show_in_homepage if setting else False
class Post(db.Model):
id = db.Column(db.Integer, primary_key=True)
content = db.Column(db.Text, nullable=False)
original_content = db.Column(db.Text)
image_url = db.Column(db.String(200))
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True)
anonymous_username = db.Column(db.String(80))
likes = db.relationship('Like', backref='post', lazy=True, cascade='all, delete-orphan')
comments = db.relationship('Comment', backref='post', lazy=True, cascade='all, delete-orphan')
group_id = db.Column(db.Integer, db.ForeignKey('group.id'), nullable=True)
class Group(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
description = db.Column(db.Text)
vanity_url = db.Column(db.String(100), unique=True, nullable=False)
image_url = db.Column(db.String(200))
visibility = db.Column(db.String(20), nullable=False)
allow_requests = db.Column(db.Boolean, default=False)
creator_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
members = db.relationship('User', secondary=group_members, back_populates='groups', lazy='dynamic')
posts = db.relationship('Post', backref='group', lazy='dynamic')
class Like(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.String(120), nullable=False)
post_id = db.Column(db.Integer, db.ForeignKey('post.id'), nullable=False)
class Comment(db.Model):
id = db.Column(db.Integer, primary_key=True)
content = db.Column(db.Text, nullable=False)
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True)
post_id = db.Column(db.Integer, db.ForeignKey('post.id', ondelete='CASCADE'), nullable=False)
likes = db.relationship('CommentLike', backref='comment', lazy=True, cascade='all, delete-orphan')
class CommentLike(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.String(120), nullable=False)
comment_id = db.Column(db.Integer, db.ForeignKey('comment.id'), nullable=False)
class GroupJoinRequest(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.String(120), nullable=False) # Changed to String to handle both authenticated and anonymous users
group_id = db.Column(db.Integer, db.ForeignKey('group.id'), nullable=False)
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
group = db.relationship('Group', backref='join_requests')
class GroupHomepageSetting(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
group_id = db.Column(db.Integer, db.ForeignKey('group.id'), nullable=False)
show_in_homepage = db.Column(db.Boolean, default=False)
user = db.relationship('User', back_populates='group_homepage_settings')
group = db.relationship('Group')
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
# Routes
@app.route('/')
def home():
sort = request.args.get('sort', 'newest')
posts = sort_posts(sort)
posts = [post for post in posts if post.group_id is None] # Exclude group posts
metadata = {
'title': 'TechTalks - Twitter but for techtokers',
'type': 'website',
'url': url_for('home', _external=True, _scheme='https'),
'description': 'TechTalks is a platform for tech enthusiasts to share and discuss ideas.',
'image': url_for('static', filename='techtalks_logo.png', _external=True, _scheme='https'),
}
return render_template('index.html', posts=posts, metadata=metadata)
@app.route('/sort_posts')
def sort_posts_route():
sort = request.args.get('sort', 'newest')
posts = sort_posts(sort)
return render_template('posts_partial.html', posts=posts)
def sort_posts(sort_option):
if sort_option == 'newest':
return Post.query.order_by(Post.timestamp.desc()).all()
elif sort_option == 'most_liked_24h':
twenty_four_hours_ago = datetime.utcnow() - timedelta(hours=24)
return Post.query.outerjoin(Like).filter(Post.timestamp >= twenty_four_hours_ago).group_by(Post.id).order_by(desc(func.count(Like.id))).all()
elif sort_option == 'most_commented':
return Post.query.outerjoin(Comment).group_by(Post.id).order_by(desc(func.count(Comment.id))).all()
elif sort_option == 'trending':
three_days_ago = datetime.utcnow() - timedelta(days=3)
return Post.query.outerjoin(Like).outerjoin(Comment).filter(Post.timestamp >= three_days_ago).group_by(Post.id).order_by(desc(func.count(Like.id) + func.count(Comment.id) * 2)).all()
elif sort_option == 'controversial':
return Post.query.outerjoin(Like).outerjoin(Comment).group_by(Post.id).order_by(desc(func.count(Like.id) * func.count(Comment.id))).all()
else:
return Post.query.order_by(Post.timestamp.desc()).all()
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
user = User.query.filter_by(username=username).first()
if user:
flash('Username already exists')
return redirect(url_for('register'))
new_user = User(username=username, password_hash=generate_password_hash(password, method='pbkdf2:sha256'))
db.session.add(new_user)
db.session.commit()
login_user(new_user) # Log in the user immediately after registration
flash('Registration successful')
return redirect(url_for('home'))
return render_template('register.html')
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
user = User.query.filter_by(username=username).first()
if user and check_password_hash(user.password_hash, password):
login_user(user, remember=True, duration=timedelta(days=30))
return redirect(url_for('home'))
else:
flash('Invalid username or password')
return render_template('login.html')
@app.route('/logout')
@login_required
def logout():
logout_user()
return redirect(url_for('home'))
@app.route('/post', methods=['GET', 'POST'])
def post():
if request.method == 'POST':
content = request.form.get('content')
image = request.files.get('image')
username = request.form.get('username')
# Sanitize and process the content
sanitized_content = sanitize_html(content)
if current_user.is_authenticated:
user = current_user
new_post = Post(content=sanitized_content, original_content=content, author=user)
else:
new_post = Post(content=sanitized_content, original_content=content, anonymous_username=username)
if image:
create_upload_directory()
filename = secure_filename(image.filename)
image_path = os.path.join(app.root_path, app.config['UPLOAD_FOLDER'], filename)
image.save(image_path)
new_post.image_url = filename
db.session.add(new_post)
db.session.commit()
return redirect(url_for('home'))
return render_template('post.html')
@app.route('/like/<int:post_id>', methods=['POST'])
def like_post(post_id):
post = Post.query.get_or_404(post_id)
if current_user.is_authenticated:
user_id = current_user.id
else:
# For anonymous users, use IP address as a unique identifier
user_id = request.remote_addr
like = Like.query.filter_by(user_id=user_id, post_id=post.id).first()
if like:
db.session.delete(like)
else:
new_like = Like(user_id=user_id, post_id=post.id)
db.session.add(new_like)
db.session.commit()
return jsonify({'likes': len(post.likes)})
@app.route('/comment/<int:post_id>', methods=['POST'])
def add_comment(post_id):
content = request.form.get('content')
if current_user.is_authenticated:
user = current_user
new_comment = Comment(content=content, author=user, post_id=post_id)
else:
new_comment = Comment(content=content, post_id=post_id)
db.session.add(new_comment)
db.session.commit()
return jsonify({
'success': True,
'comment_id': new_comment.id,
'username': user.username if current_user.is_authenticated else 'Anonymous',
'content': new_comment.content
})
@app.route('/like_comment/<int:comment_id>', methods=['POST'])
def like_comment(comment_id):
comment = Comment.query.get_or_404(comment_id)
if current_user.is_authenticated:
user_id = str(current_user.id)
else:
user_id = request.remote_addr # Use IP address for anonymous users
like = CommentLike.query.filter_by(user_id=user_id, comment_id=comment.id).first()
if like:
db.session.delete(like)
else:
new_like = CommentLike(user_id=user_id, comment_id=comment.id)
db.session.add(new_like)
db.session.commit()
return jsonify({'likes': len(comment.likes)})
@app.route('/profile/<username>', methods=['GET', 'POST'])
def profile(username):
user = User.query.filter_by(username=username).first_or_404()
if request.method == 'POST' and current_user.is_authenticated and current_user.id == user.id:
bio = request.form.get('bio')
cropped_data = request.form.get('cropped_data')
remove_picture = request.form.get('remove_picture')
user.bio = bio # Store the original Markdown
user.rendered_bio = sanitize_html(bio) # Store the rendered HTML
if remove_picture:
user.profile_picture = None
elif cropped_data:
filename = secure_filename(f"{user.username}_profile.png")
image_path = os.path.join(app.root_path, app.config['UPLOAD_FOLDER'], filename)
# Decode the base64 image and save it
image_data = base64.b64decode(cropped_data.split(',')[1])
with Image.open(io.BytesIO(image_data)) as img:
if img.mode in ('RGBA', 'LA'):
# Image has an alpha channel, save as PNG
img.save(image_path, 'PNG')
else:
# Image doesn't have an alpha channel, convert to RGB and save as JPEG
img = img.convert('RGB')
img.save(image_path, 'JPEG')
user.profile_picture = filename
db.session.commit()
flash('Profile updated successfully', 'success')
return redirect(url_for('profile', username=username))
posts = Post.query.filter_by(user_id=user.id).order_by(Post.timestamp.desc()).all()
post_count = len(posts)
like_count = sum(len(post.likes) for post in posts)
comment_count = sum(len(post.comments) for post in posts)
return render_template('profile.html', user=user, posts=posts, post_count=post_count, like_count=like_count, comment_count=comment_count)
@app.route('/edit_post/<int:post_id>', methods=['GET', 'POST'])
def edit_post(post_id):
post = Post.query.get_or_404(post_id)
if current_user.is_authenticated:
if post.author != current_user:
abort(403)
else:
if post.user_id != session['user_id']:
abort(403)
if request.method == 'POST':
if 'delete' in request.form:
db.session.delete(post)
db.session.commit()
flash('Your post has been deleted!', 'success')
return redirect(url_for('home'))
else:
content = request.form.get('content')
sanitized_content = sanitize_html(content)
post.content = sanitized_content
post.original_content = content
db.session.commit()
flash('Your post has been updated!', 'success')
return redirect(url_for('home'))
return render_template('edit_post.html', post=post)
@app.route('/post/<int:post_id>')
def post_detail(post_id):
post = Post.query.get_or_404(post_id)
metadata = generate_opengraph_metadata(post)
return render_template('post_detail.html', post=post, metadata=metadata)
# Group-related routes
@app.route('/groups')
def groups():
if current_user.is_authenticated:
public_groups = Group.query.filter(Group.visibility != 'invite_only').all()
user_groups = current_user.groups.all() # Convert AppenderQuery to list
invite_only_groups = Group.query.filter_by(visibility='invite_only', creator=current_user).all()
groups = list(set(public_groups + user_groups + invite_only_groups))
else:
groups = Group.query.filter_by(visibility='public').all()
return render_template('groups.html', groups=groups)
@app.route('/create_group', methods=['GET', 'POST'])
@login_required
def create_group():
if request.method == 'POST':
name = request.form.get('name')
description = request.form.get('description')
vanity_url = request.form.get('vanity_url')
visibility = request.form.get('visibility')
allow_requests = 'allow_requests' in request.form if visibility == 'invite_only' else False
cropped_data = request.form.get('cropped_data')
# Check if the vanity_url is already taken
existing_group = Group.query.filter_by(vanity_url=vanity_url).first()
if existing_group:
flash('This vanity URL is already taken. Please choose a different one.', 'error')
return render_template('create_group.html')
new_group = Group(name=name, description=description, vanity_url=vanity_url,
visibility=visibility, allow_requests=allow_requests,
creator=current_user)
if cropped_data:
filename = secure_filename(f"group_{vanity_url}.png")
image_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
# Decode the base64 image and save it
image_data = base64.b64decode(cropped_data.split(',')[1])
with Image.open(io.BytesIO(image_data)) as img:
img.save(image_path, 'PNG')
new_group.image_url = filename
# Add the creator to the group
new_group.members.append(current_user)
db.session.add(new_group)
db.session.commit()
flash('Group created successfully', 'success')
return redirect(url_for('group_detail', vanity_url=vanity_url))
return render_template('create_group.html')
@app.route('/g/<vanity_url>')
def group_detail(vanity_url):
group = Group.query.filter_by(vanity_url=vanity_url).first_or_404()
if group.visibility == 'invite_only':
if not current_user.is_authenticated:
flash('You need to be logged in to view this invite-only group.', 'warning')
return redirect(url_for('login'))
elif current_user not in group.members and current_user != group.creator:
return render_template('group_detail.html', group=group, show_request_button=group.allow_requests)
elif group.visibility == 'registered_only':
if not current_user.is_authenticated:
flash('You need to be logged in to view this group.', 'warning')
return redirect(url_for('login'))
join_requests = []
if current_user.is_authenticated and current_user == group.creator:
join_requests = GroupJoinRequest.query.filter_by(group_id=group.id).all()
can_join = current_user.is_authenticated and current_user not in group.members and (group.visibility == 'public' or group.visibility == 'registered_only')
# Sort posts by newest first
sorted_posts = sorted(group.posts, key=lambda x: x.timestamp, reverse=True)
return render_template('group_detail.html', group=group, posts=sorted_posts, join_requests=join_requests, can_join=can_join)
@app.route('/join_group/<int:group_id>', methods=['POST'])
@login_required
def join_group(group_id):
group = Group.query.get_or_404(group_id)
if current_user not in group.members:
if group.visibility == 'public':
group.members.append(current_user)
db.session.commit()
flash('You have joined the group.', 'success')
elif group.visibility == 'registered_only':
if current_user.is_authenticated:
group.members.append(current_user)
db.session.commit()
flash('You have joined the group.', 'success')
else:
flash('You need to be registered and logged in to join this group.', 'warning')
elif group.visibility == 'invite_only':
if group.allow_requests:
new_request = GroupJoinRequest(user_id=current_user.id, group_id=group_id)
db.session.add(new_request)
db.session.commit()
flash('Your request to join has been sent.', 'success')
else:
flash('This group is invite-only and not accepting join requests.', 'warning')
else:
flash('You cannot join this group.', 'warning')
else:
flash('You are already a member of this group.', 'info')
return redirect(url_for('group_detail', vanity_url=group.vanity_url))
@app.route('/handle_join_request/<int:group_id>/<int:user_id>', methods=['POST'])
@login_required
def handle_join_request(group_id, user_id):
group = Group.query.get_or_404(group_id)
if current_user != group.creator:
abort(403)
user = User.query.get_or_404(user_id)
action = request.form.get('action')
join_request = GroupJoinRequest.query.filter_by(user_id=user_id, group_id=group_id).first()
if not join_request:
flash('Join request not found.', 'error')
return redirect(url_for('group_detail', vanity_url=group.vanity_url))
if action == 'accept':
group.members.append(user)
db.session.delete(join_request)
flash(f'{user.username} has been added to the group.', 'success')
elif action == 'reject':
db.session.delete(join_request)
flash(f'Join request from {user.username} has been rejected.', 'info')
db.session.commit()
return redirect(url_for('group_detail', vanity_url=group.vanity_url))
@app.route('/update_group_settings/<int:group_id>', methods=['POST'])
@login_required
def update_group_settings(group_id):
group = Group.query.get_or_404(group_id)
if current_user != group.creator:
abort(403)
group.name = request.form.get('name')
group.description = request.form.get('description')
group.vanity_url = request.form.get('vanity_url')
group.visibility = request.form.get('visibility')
group.allow_requests = 'allow_requests' in request.form and request.form.get('visibility') == 'invite_only'
if 'image' in request.files:
image = request.files['image']
if image.filename != '':
filename = secure_filename(f"group_{group.id}_{image.filename}")
image_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
image.save(image_path)
group.image_url = filename
db.session.commit()
flash('Group settings updated successfully', 'success')
return redirect(url_for('group_detail', vanity_url=group.vanity_url))
@app.route('/remove_group_member/<int:group_id>/<int:user_id>', methods=['POST'])
@login_required
def remove_group_member(group_id, user_id):
group = Group.query.get_or_404(group_id)
if current_user != group.creator:
abort(403)
user = User.query.get_or_404(user_id)
if user == group.creator:
flash('Cannot remove the group creator', 'error')
else:
group.members.remove(user)
db.session.commit()
flash(f'{user.username} has been removed from the group.', 'success')
return redirect(url_for('group_detail', vanity_url=group.vanity_url))
@app.route('/leave_group/<int:group_id>', methods=['POST'])
@login_required
def leave_group(group_id):
group = Group.query.get_or_404(group_id)
if current_user not in group.members:
abort(403)
if current_user == group.creator:
flash('The group creator cannot leave the group.', 'error')
else:
group.members.remove(current_user)
db.session.commit()
flash(f'You have left the group {group.name}.', 'success')
return redirect(url_for('groups'))
@app.route('/g/<vanity_url>/post', methods=['POST'])
@login_required
def group_post(vanity_url):
group = Group.query.filter_by(vanity_url=vanity_url).first_or_404()
if current_user not in group.members:
abort(403)
content = request.form.get('content')
if not content or content.strip() == '':
flash('Post content cannot be empty.', 'error')
return redirect(url_for('group_detail', vanity_url=vanity_url))
sanitized_content = sanitize_html(content)
new_post = Post(content=sanitized_content, original_content=content, author=current_user, group=group)
db.session.add(new_post)
db.session.commit()
flash('Your post has been added to the group.', 'success')
return redirect(url_for('group_detail', vanity_url=vanity_url))
@app.route('/g/<vanity_url>/post/<int:post_id>/delete', methods=['POST'])
@login_required
def delete_group_post(vanity_url, post_id):
group = Group.query.filter_by(vanity_url=vanity_url).first_or_404()
post = Post.query.filter_by(id=post_id, group_id=group.id).first_or_404()
if current_user != group.creator and current_user != post.author:
abort(403)
db.session.delete(post)
db.session.commit()
flash('Post has been deleted.', 'success')
return redirect(url_for('group_detail', vanity_url=vanity_url))
# Admin routes
@app.route('/admin_login', methods=['GET', 'POST'])
def admin_login():
if request.method == 'POST':
password = request.form.get('password')
if password == "lolmaa":
session['admin'] = True
return redirect(url_for('admin_dashboard'))
else:
flash('Invalid password')
return render_template('admin_login.html')
@app.route('/admin')
@admin_required
def admin_dashboard():
posts = Post.query.order_by(Post.timestamp.desc()).all()
return render_template('admin_dashboard.html', posts=posts)
@app.route('/admin/delete_post/<int:post_id>', methods=['POST'])
@admin_required
def delete_post(post_id):
post = Post.query.get_or_404(post_id)
# Delete all comments associated with the post
Comment.query.filter_by(post_id=post_id).delete()
# Delete all likes associated with the post
Like.query.filter_by(post_id=post_id).delete()
# Delete the post
db.session.delete(post)
try:
db.session.commit()
flash('Post and associated comments deleted successfully')
except Exception as e:
db.session.rollback()
flash('An error occurred while deleting the post: ' + str(e))
return redirect(url_for('admin_dashboard'))
@app.route('/admin/reset_db', methods=['POST'])
@admin_required
def reset_db():
try:
db.drop_all()
db.create_all()
flash('Database has been reset successfully', 'success')
except Exception as e:
flash(f'An error occurred while resetting the database: {str(e)}', 'danger')
return redirect(url_for('admin_dashboard'))
@app.route('/admin/backup_db')
@admin_required
def backup_db():
try:
# Create a JSON representation of the database
data = {
'users': [{'id': u.id, 'username': u.username, 'bio': u.bio} for u in User.query.all()],
'posts': [{'id': p.id, 'content': p.content, 'user_id': p.user_id, 'timestamp': p.timestamp.isoformat()} for p in Post.query.all()],
'comments': [{'id': c.id, 'content': c.content, 'user_id': c.user_id, 'post_id': c.post_id, 'timestamp': c.timestamp.isoformat()} for c in Comment.query.all()],
}
# Save the JSON to a file
with open('db_backup.json', 'w') as f:
json.dump(data, f, indent=2)
# Send the file for download
return send_file('db_backup.json', as_attachment=True)
except Exception as e:
flash(f'An error occurred while creating the database backup: {str(e)}', 'danger')
return redirect(url_for('admin_dashboard'))
@app.route('/admin/clear_uploads', methods=['POST'])
@admin_required
def clear_uploads():
try:
upload_folder = app.config['UPLOAD_FOLDER']
for filename in os.listdir(upload_folder):
file_path = os.path.join(upload_folder, filename)
if os.path.isfile(file_path):
os.unlink(file_path)
flash('Upload folder has been cleared successfully', 'success')
except Exception as e:
flash(f'An error occurred while clearing the upload folder: {str(e)}', 'danger')
return redirect(url_for('admin_dashboard'))
# Helper functions
def generate_opengraph_metadata(post):
if post.author:
author_name = post.author.username
elif post.anonymous_username:
author_name = post.anonymous_username
else:
author_name = 'Anonymous'
metadata = {
'title': f"Post by {author_name} on TechTalks",
'type': 'article',
'url': url_for('post_detail', post_id=post.id, _external=True, _scheme='https'),
'description': bleach.clean(post.content, tags=[], strip=True)[:200] + '...' if len(post.content) > 200 else bleach.clean(post.content, tags=[], strip=True),
'image': url_for('static', filename='uploads/' + post.image_url, _external=True, _scheme='https') if post.image_url else None,
}
return metadata
def sanitize_html(content):
# Convert Markdown to HTML
html_content = markdown.markdown(content, extensions=['nl2br', 'fenced_code'])
# Define allowed tags and attributes
allowed_tags = ['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'strong', 'ul', 'br', 'hr', 'pre', 'span']
allowed_attributes = {
'a': ['href', 'title', 'target', 'rel'],
'p': ['style'],
'span': ['style'],
'h1': ['style'],
'h2': ['style'],
'h3': ['style'],
'h4': ['style'],
'h5': ['style'],
'h6': ['style'],
}
# Custom linkify function to handle URLs without protocol
def linkify_external(attrs, new=False):
href = attrs.get((None, 'href'), '').strip()
if href.startswith('//'):
attrs[(None, 'href')] = 'http:' + href
elif not urlparse(href).scheme:
tlds = ['.com', '.org', '.net', '.edu', '.gov', '.io', '.gay', '.lol', '.in', '.it', '.hu', '.me', '.uk', '.de', '.fr', '.es', '.ru', '.jp', '.cn', '.au', '.ca', '.br', '.mx', '.nl', '.pl', '.se', '.ch', '.at', '.be', '.dk', '.fi', '.no', '.nz', '.ie', '.sg', '.kr', '.za', '.ph', '.cl', '.ar', '.co', '.ve', '.gr', '.il', '.ae', '.pt', '.cz', '.ro', '.th', '.my', '.tr', '.ua', '.hk', '.vn', '.sa', '.eg', '.ng', '.bd', '.pk', '.ma', '.dz', '.ke', '.tn', '.by', '.lv', '.kz', '.uy', '.pe', '.do', '.ec', '.gt', '.py', '.bo', '.cu', '.hn', '.sv', '.ni', '.pa', '.bz', '.gy', '.tt', '.ag', '.dm', '.kn', '.lc', '.vc', '.jm', '.bs', '.bb', '.vg', '.tc', '.ms', '.ai', '.gd', '.fj', '.to', '.vu', '.sb', '.ws', '.ck', '.nu', '.tv', '.fm', '.ki', '.nr', '.pw', '.as', '.gu', '.mp', '.pr', '.vi', '.um', '.tf', '.yt', '.re', '.mq', '.gp', '.bl', '.mf', '.pm', '.wf', '.pf', '.nc', '.sx', '.aw', '.cw', '.bq']
if any(href.endswith(tld) for tld in tlds):
attrs[(None, 'href')] = 'http://' + href
attrs[(None, 'target')] = '_blank'
attrs[(None, 'rel')] = 'noopener noreferrer'
return attrs
# Clean the HTML
clean_html = bleach.clean(html_content, tags=allowed_tags, attributes=allowed_attributes)
# Linkify the cleaned HTML
linkified_html = bleach.linkify(clean_html, callbacks=[linkify_external])
return linkified_html
@app.before_request
def before_request():
if 'user_id' not in session:
session['user_id'] = str(uuid.uuid4())
def create_upload_directory():
upload_dir = os.path.join(app.root_path, app.config['UPLOAD_FOLDER'])
if not os.path.exists(upload_dir):
os.makedirs(upload_dir)
print("Uploads directory created.")
else:
print("Uploads directory already exists.")
@app.route('/update_user_group_settings/<int:group_id>', methods=['POST'])
@login_required
def update_user_group_settings(group_id):
group = Group.query.get_or_404(group_id)
if current_user not in group.members:
abort(403)
show_in_homepage = 'show_in_homepage' in request.form
# Update the user's settings for this group
current_user.update_group_homepage_setting(group, show_in_homepage)
db.session.commit()
flash('Your settings have been updated.', 'success')
return redirect(url_for('group_detail', vanity_url=group.vanity_url))
@app.route('/sort_group_posts/<int:group_id>')
def sort_group_posts(group_id):
sort = request.args.get('sort', 'newest')
group = Group.query.get_or_404(group_id)
if sort == 'newest':
posts = sorted(group.posts, key=lambda x: x.timestamp)
elif sort == 'oldest':
posts = sorted(group.posts, key=lambda x: x.timestamp, reverse=True)
elif sort == 'most_liked':
posts = sorted(group.posts, key=lambda x: len(x.likes), reverse=True)
elif sort == 'most_commented':
posts = sorted(group.posts, key=lambda x: len(x.comments), reverse=True)
else:
posts = sorted(group.posts, key=lambda x: x.timestamp) # Default to newest
return render_template_string('''
{% for post in posts %}
<div class="card mb-3" id="post-{{ post.id }}">
<div class="card-body">
<div class="d-flex align-items-center mb-2">
{% if post.author %}
{% if post.author.profile_picture %}
<img src="{{ url_for('static', filename='uploads/' + post.author.profile_picture) }}" class="rounded-circle me-2" alt="Profile Picture" style="width: 30px; height: 30px; object-fit: cover;">
{% else %}
<div class="rounded-circle me-2 d-flex justify-content-center align-items-center bg-primary" style="width: 30px; height: 30px;">
<span class="text-white" style="font-size: 0.8rem;">{{ post.author.username[0].upper() }}</span>
</div>
{% endif %}
<small class="text-muted">
<a href="{{ url_for('profile', username=post.author.username) }}" class="text-decoration-none">{{ post.author.username }}</a>
</small>
{% else %}
<small class="text-muted">{{ post.anonymous_username }}</small>
{% endif %}
<span class="ms-auto text-muted">{{ post.timestamp.strftime('%Y-%m-%d %H:%M:%S') }}</span>
</div>
<p class="card-text">{{ post.content|safe }}</p>
{% if post.image_url %}
<img src="{{ url_for('static', filename='uploads/' + post.image_url) }}" class="img-fluid mb-2 post-image" alt="Post image" data-bs-toggle="modal" data-bs-target="#imageModal{{ post.id }}">
{% endif %}
<div class="d-flex flex-wrap justify-content-between align-items-center">
<div class="btn-group mb-2">
<button class="btn btn-outline-primary btn-sm like-btn" data-post-id="{{ post.id }}">
<i class="bi bi-heart-fill"></i> Like (<span class="like-count">{{ post.likes|length }}</span>)
</button>
<button class="btn btn-outline-secondary btn-sm comment-btn" data-post-id="{{ post.id }}">
<i class="bi bi-chat-fill"></i> Comment ({{ post.comments|length }})
</button>
</div>
<div class="btn-group mb-2">
<a href="{{ url_for('post_detail', post_id=post.id) }}" class="btn btn-outline-info btn-sm" target="_blank">
<i class="bi bi-box-arrow-up-right"></i> Open in new tab
</a>
<button class="btn btn-outline-info btn-sm copy-link-btn" data-post-url="{{ url_for('post_detail', post_id=post.id, _external=True, _scheme='https') }}">
<i class="bi bi-link-45deg"></i> Copy Link
</button>
{% if current_user.is_authenticated and current_user.id == post.author.id %}
<a href="{{ url_for('edit_post', post_id=post.id) }}" class="btn btn-outline-warning btn-sm">
<i class="bi bi-pencil-fill"></i> Edit
</a>
{% endif %}
</div>
</div>
</div>
<div class="comment-section" id="comment-section-{{ post.id }}" style="display: none;">
<div class="card-body pt-0">
<h6 class="mb-3">Comments:</h6>
<ul class="list-unstyled mb-3">
{% for comment in post.comments %}
{% include 'comment.html' %}
{% endfor %}
</ul>
<form class="comment-form" data-post-id="{{ post.id }}">
<div class="input-group">
<input type="text" class="form-control" placeholder="Add a comment" name="content" required>
<button class="btn btn-outline-secondary" type="submit">Send</button>
</div>
</form>
</div>
</div>
</div>
{% endfor %}
''', posts=posts)
if __name__ == '__main__':
with app.app_context():
create_upload_directory()
db.create_all()
print("Database initialized.")
app.run(debug=True,host='0.0.0.0',port=5051)