867 lines
37 KiB
Python
867 lines
37 KiB
Python
|
from flask import Flask, render_template, request, redirect, url_for, flash, jsonify, abort, session, send_file, render_template_string
|
||
|
from flask_sqlalchemy import SQLAlchemy
|
||
|
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
|
||
|
from werkzeug.security import generate_password_hash, check_password_hash
|
||
|
from werkzeug.utils import secure_filename
|
||
|
import os
|
||
|
from datetime import datetime, timedelta
|
||
|
from sqlalchemy import func, desc
|
||
|
import uuid
|
||
|
from PIL import Image
|
||
|
import io
|
||
|
import base64
|
||
|
from functools import wraps
|
||
|
import json
|
||
|
import markdown
|
||
|
import bleach
|
||
|
from bs4 import BeautifulSoup
|
||
|
from urllib.parse import urlparse, urljoin
|
||
|
from flask_migrate import Migrate
|
||
|
from sqlalchemy.exc import IntegrityError
|
||
|
|
||
|
app = Flask(__name__)
|
||
|
app.config['SECRET_KEY'] = 'your_secret_key'
|
||
|
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///techtalks.db'
|
||
|
app.config['UPLOAD_FOLDER'] = 'static/uploads'
|
||
|
db = SQLAlchemy(app)
|
||
|
migrate = Migrate(app, db)
|
||
|
login_manager = LoginManager(app)
|
||
|
login_manager.login_view = 'login'
|
||
|
|
||
|
def admin_required(f):
|
||
|
@wraps(f)
|
||
|
def decorated_function(*args, **kwargs):
|
||
|
if 'admin' not in session:
|
||
|
return redirect(url_for('admin_login'))
|
||
|
return f(*args, **kwargs)
|
||
|
return decorated_function
|
||
|
|
||
|
# Association table for group members
|
||
|
group_members = db.Table('group_members',
|
||
|
db.Column('user_id', db.Integer, db.ForeignKey('user.id'), primary_key=True),
|
||
|
db.Column('group_id', db.Integer, db.ForeignKey('group.id'), primary_key=True)
|
||
|
)
|
||
|
|
||
|
# Model definitions
|
||
|
class User(UserMixin, db.Model):
|
||
|
id = db.Column(db.Integer, primary_key=True)
|
||
|
username = db.Column(db.String(80), unique=True, nullable=False)
|
||
|
password_hash = db.Column(db.String(120), nullable=False)
|
||
|
bio = db.Column(db.Text)
|
||
|
rendered_bio = db.Column(db.Text)
|
||
|
profile_picture = db.Column(db.String(200))
|
||
|
posts = db.relationship('Post', backref='author', lazy=True)
|
||
|
comments = db.relationship('Comment', backref='author', lazy=True)
|
||
|
groups = db.relationship('Group', secondary=group_members, back_populates='members', lazy='dynamic')
|
||
|
created_groups = db.relationship('Group', backref='creator', lazy=True)
|
||
|
group_homepage_settings = db.relationship('GroupHomepageSetting', back_populates='user', cascade='all, delete-orphan')
|
||
|
|
||
|
def update_group_homepage_setting(self, group, show_in_homepage):
|
||
|
setting = GroupHomepageSetting.query.filter_by(user_id=self.id, group_id=group.id).first()
|
||
|
if setting:
|
||
|
setting.show_in_homepage = show_in_homepage
|
||
|
else:
|
||
|
new_setting = GroupHomepageSetting(user_id=self.id, group_id=group.id, show_in_homepage=show_in_homepage)
|
||
|
db.session.add(new_setting)
|
||
|
|
||
|
def show_group_posts_in_homepage(self, group):
|
||
|
setting = GroupHomepageSetting.query.filter_by(user_id=self.id, group_id=group.id).first()
|
||
|
return setting.show_in_homepage if setting else False
|
||
|
|
||
|
class Post(db.Model):
|
||
|
id = db.Column(db.Integer, primary_key=True)
|
||
|
content = db.Column(db.Text, nullable=False)
|
||
|
original_content = db.Column(db.Text)
|
||
|
image_url = db.Column(db.String(200))
|
||
|
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
|
||
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True)
|
||
|
anonymous_username = db.Column(db.String(80))
|
||
|
likes = db.relationship('Like', backref='post', lazy=True, cascade='all, delete-orphan')
|
||
|
comments = db.relationship('Comment', backref='post', lazy=True, cascade='all, delete-orphan')
|
||
|
group_id = db.Column(db.Integer, db.ForeignKey('group.id'), nullable=True)
|
||
|
|
||
|
class Group(db.Model):
|
||
|
id = db.Column(db.Integer, primary_key=True)
|
||
|
name = db.Column(db.String(100), nullable=False)
|
||
|
description = db.Column(db.Text)
|
||
|
vanity_url = db.Column(db.String(100), unique=True, nullable=False)
|
||
|
image_url = db.Column(db.String(200))
|
||
|
visibility = db.Column(db.String(20), nullable=False)
|
||
|
allow_requests = db.Column(db.Boolean, default=False)
|
||
|
creator_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
|
||
|
members = db.relationship('User', secondary=group_members, back_populates='groups', lazy='dynamic')
|
||
|
posts = db.relationship('Post', backref='group', lazy='dynamic')
|
||
|
|
||
|
class Like(db.Model):
|
||
|
id = db.Column(db.Integer, primary_key=True)
|
||
|
user_id = db.Column(db.String(120), nullable=False)
|
||
|
post_id = db.Column(db.Integer, db.ForeignKey('post.id'), nullable=False)
|
||
|
|
||
|
class Comment(db.Model):
|
||
|
id = db.Column(db.Integer, primary_key=True)
|
||
|
content = db.Column(db.Text, nullable=False)
|
||
|
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
|
||
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True)
|
||
|
post_id = db.Column(db.Integer, db.ForeignKey('post.id', ondelete='CASCADE'), nullable=False)
|
||
|
likes = db.relationship('CommentLike', backref='comment', lazy=True, cascade='all, delete-orphan')
|
||
|
|
||
|
class CommentLike(db.Model):
|
||
|
id = db.Column(db.Integer, primary_key=True)
|
||
|
user_id = db.Column(db.String(120), nullable=False)
|
||
|
comment_id = db.Column(db.Integer, db.ForeignKey('comment.id'), nullable=False)
|
||
|
|
||
|
|
||
|
class GroupJoinRequest(db.Model):
|
||
|
id = db.Column(db.Integer, primary_key=True)
|
||
|
user_id = db.Column(db.String(120), nullable=False) # Changed to String to handle both authenticated and anonymous users
|
||
|
group_id = db.Column(db.Integer, db.ForeignKey('group.id'), nullable=False)
|
||
|
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
|
||
|
group = db.relationship('Group', backref='join_requests')
|
||
|
|
||
|
class GroupHomepageSetting(db.Model):
|
||
|
id = db.Column(db.Integer, primary_key=True)
|
||
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
|
||
|
group_id = db.Column(db.Integer, db.ForeignKey('group.id'), nullable=False)
|
||
|
show_in_homepage = db.Column(db.Boolean, default=False)
|
||
|
user = db.relationship('User', back_populates='group_homepage_settings')
|
||
|
group = db.relationship('Group')
|
||
|
|
||
|
@login_manager.user_loader
|
||
|
def load_user(user_id):
|
||
|
return User.query.get(int(user_id))
|
||
|
|
||
|
# Routes
|
||
|
@app.route('/')
|
||
|
def home():
|
||
|
sort = request.args.get('sort', 'newest')
|
||
|
posts = sort_posts(sort)
|
||
|
posts = [post for post in posts if post.group_id is None] # Exclude group posts
|
||
|
metadata = {
|
||
|
'title': 'TechTalks - Twitter but for techtokers',
|
||
|
'type': 'website',
|
||
|
'url': url_for('home', _external=True, _scheme='https'),
|
||
|
'description': 'TechTalks is a platform for tech enthusiasts to share and discuss ideas.',
|
||
|
'image': url_for('static', filename='techtalks_logo.png', _external=True, _scheme='https'),
|
||
|
}
|
||
|
return render_template('index.html', posts=posts, metadata=metadata)
|
||
|
|
||
|
@app.route('/sort_posts')
|
||
|
def sort_posts_route():
|
||
|
sort = request.args.get('sort', 'newest')
|
||
|
posts = sort_posts(sort)
|
||
|
return render_template('posts_partial.html', posts=posts)
|
||
|
|
||
|
def sort_posts(sort_option):
|
||
|
if sort_option == 'newest':
|
||
|
return Post.query.order_by(Post.timestamp.desc()).all()
|
||
|
elif sort_option == 'most_liked_24h':
|
||
|
twenty_four_hours_ago = datetime.utcnow() - timedelta(hours=24)
|
||
|
return Post.query.outerjoin(Like).filter(Post.timestamp >= twenty_four_hours_ago).group_by(Post.id).order_by(desc(func.count(Like.id))).all()
|
||
|
elif sort_option == 'most_commented':
|
||
|
return Post.query.outerjoin(Comment).group_by(Post.id).order_by(desc(func.count(Comment.id))).all()
|
||
|
elif sort_option == 'trending':
|
||
|
three_days_ago = datetime.utcnow() - timedelta(days=3)
|
||
|
return Post.query.outerjoin(Like).outerjoin(Comment).filter(Post.timestamp >= three_days_ago).group_by(Post.id).order_by(desc(func.count(Like.id) + func.count(Comment.id) * 2)).all()
|
||
|
elif sort_option == 'controversial':
|
||
|
return Post.query.outerjoin(Like).outerjoin(Comment).group_by(Post.id).order_by(desc(func.count(Like.id) * func.count(Comment.id))).all()
|
||
|
else:
|
||
|
return Post.query.order_by(Post.timestamp.desc()).all()
|
||
|
|
||
|
@app.route('/register', methods=['GET', 'POST'])
|
||
|
def register():
|
||
|
if request.method == 'POST':
|
||
|
username = request.form.get('username')
|
||
|
password = request.form.get('password')
|
||
|
|
||
|
user = User.query.filter_by(username=username).first()
|
||
|
if user:
|
||
|
flash('Username already exists')
|
||
|
return redirect(url_for('register'))
|
||
|
|
||
|
new_user = User(username=username, password_hash=generate_password_hash(password, method='pbkdf2:sha256'))
|
||
|
db.session.add(new_user)
|
||
|
db.session.commit()
|
||
|
|
||
|
login_user(new_user) # Log in the user immediately after registration
|
||
|
flash('Registration successful')
|
||
|
return redirect(url_for('home'))
|
||
|
|
||
|
return render_template('register.html')
|
||
|
|
||
|
@app.route('/login', methods=['GET', 'POST'])
|
||
|
def login():
|
||
|
if request.method == 'POST':
|
||
|
username = request.form.get('username')
|
||
|
password = request.form.get('password')
|
||
|
|
||
|
user = User.query.filter_by(username=username).first()
|
||
|
if user and check_password_hash(user.password_hash, password):
|
||
|
login_user(user, remember=True, duration=timedelta(days=30))
|
||
|
return redirect(url_for('home'))
|
||
|
else:
|
||
|
flash('Invalid username or password')
|
||
|
|
||
|
return render_template('login.html')
|
||
|
|
||
|
@app.route('/logout')
|
||
|
@login_required
|
||
|
def logout():
|
||
|
logout_user()
|
||
|
return redirect(url_for('home'))
|
||
|
|
||
|
@app.route('/post', methods=['GET', 'POST'])
|
||
|
def post():
|
||
|
if request.method == 'POST':
|
||
|
content = request.form.get('content')
|
||
|
image = request.files.get('image')
|
||
|
username = request.form.get('username')
|
||
|
|
||
|
# Sanitize and process the content
|
||
|
sanitized_content = sanitize_html(content)
|
||
|
|
||
|
if current_user.is_authenticated:
|
||
|
user = current_user
|
||
|
new_post = Post(content=sanitized_content, original_content=content, author=user)
|
||
|
else:
|
||
|
new_post = Post(content=sanitized_content, original_content=content, anonymous_username=username)
|
||
|
|
||
|
if image:
|
||
|
create_upload_directory()
|
||
|
filename = secure_filename(image.filename)
|
||
|
image_path = os.path.join(app.root_path, app.config['UPLOAD_FOLDER'], filename)
|
||
|
image.save(image_path)
|
||
|
new_post.image_url = filename
|
||
|
|
||
|
db.session.add(new_post)
|
||
|
db.session.commit()
|
||
|
|
||
|
return redirect(url_for('home'))
|
||
|
|
||
|
return render_template('post.html')
|
||
|
|
||
|
@app.route('/like/<int:post_id>', methods=['POST'])
|
||
|
def like_post(post_id):
|
||
|
post = Post.query.get_or_404(post_id)
|
||
|
|
||
|
if current_user.is_authenticated:
|
||
|
user_id = current_user.id
|
||
|
else:
|
||
|
# For anonymous users, use IP address as a unique identifier
|
||
|
user_id = request.remote_addr
|
||
|
|
||
|
like = Like.query.filter_by(user_id=user_id, post_id=post.id).first()
|
||
|
|
||
|
if like:
|
||
|
db.session.delete(like)
|
||
|
else:
|
||
|
new_like = Like(user_id=user_id, post_id=post.id)
|
||
|
db.session.add(new_like)
|
||
|
|
||
|
db.session.commit()
|
||
|
return jsonify({'likes': len(post.likes)})
|
||
|
|
||
|
@app.route('/comment/<int:post_id>', methods=['POST'])
|
||
|
def add_comment(post_id):
|
||
|
content = request.form.get('content')
|
||
|
|
||
|
if current_user.is_authenticated:
|
||
|
user = current_user
|
||
|
new_comment = Comment(content=content, author=user, post_id=post_id)
|
||
|
else:
|
||
|
new_comment = Comment(content=content, post_id=post_id)
|
||
|
|
||
|
db.session.add(new_comment)
|
||
|
db.session.commit()
|
||
|
|
||
|
return jsonify({
|
||
|
'success': True,
|
||
|
'comment_id': new_comment.id,
|
||
|
'username': user.username if current_user.is_authenticated else 'Anonymous',
|
||
|
'content': new_comment.content
|
||
|
})
|
||
|
|
||
|
@app.route('/like_comment/<int:comment_id>', methods=['POST'])
|
||
|
def like_comment(comment_id):
|
||
|
comment = Comment.query.get_or_404(comment_id)
|
||
|
|
||
|
if current_user.is_authenticated:
|
||
|
user_id = str(current_user.id)
|
||
|
else:
|
||
|
user_id = request.remote_addr # Use IP address for anonymous users
|
||
|
|
||
|
like = CommentLike.query.filter_by(user_id=user_id, comment_id=comment.id).first()
|
||
|
|
||
|
if like:
|
||
|
db.session.delete(like)
|
||
|
else:
|
||
|
new_like = CommentLike(user_id=user_id, comment_id=comment.id)
|
||
|
db.session.add(new_like)
|
||
|
|
||
|
db.session.commit()
|
||
|
return jsonify({'likes': len(comment.likes)})
|
||
|
|
||
|
@app.route('/profile/<username>', methods=['GET', 'POST'])
|
||
|
def profile(username):
|
||
|
user = User.query.filter_by(username=username).first_or_404()
|
||
|
if request.method == 'POST' and current_user.is_authenticated and current_user.id == user.id:
|
||
|
bio = request.form.get('bio')
|
||
|
cropped_data = request.form.get('cropped_data')
|
||
|
remove_picture = request.form.get('remove_picture')
|
||
|
|
||
|
user.bio = bio # Store the original Markdown
|
||
|
user.rendered_bio = sanitize_html(bio) # Store the rendered HTML
|
||
|
|
||
|
if remove_picture:
|
||
|
user.profile_picture = None
|
||
|
elif cropped_data:
|
||
|
filename = secure_filename(f"{user.username}_profile.png")
|
||
|
image_path = os.path.join(app.root_path, app.config['UPLOAD_FOLDER'], filename)
|
||
|
|
||
|
# Decode the base64 image and save it
|
||
|
image_data = base64.b64decode(cropped_data.split(',')[1])
|
||
|
with Image.open(io.BytesIO(image_data)) as img:
|
||
|
if img.mode in ('RGBA', 'LA'):
|
||
|
# Image has an alpha channel, save as PNG
|
||
|
img.save(image_path, 'PNG')
|
||
|
else:
|
||
|
# Image doesn't have an alpha channel, convert to RGB and save as JPEG
|
||
|
img = img.convert('RGB')
|
||
|
img.save(image_path, 'JPEG')
|
||
|
|
||
|
user.profile_picture = filename
|
||
|
|
||
|
db.session.commit()
|
||
|
flash('Profile updated successfully', 'success')
|
||
|
return redirect(url_for('profile', username=username))
|
||
|
|
||
|
posts = Post.query.filter_by(user_id=user.id).order_by(Post.timestamp.desc()).all()
|
||
|
post_count = len(posts)
|
||
|
like_count = sum(len(post.likes) for post in posts)
|
||
|
comment_count = sum(len(post.comments) for post in posts)
|
||
|
return render_template('profile.html', user=user, posts=posts, post_count=post_count, like_count=like_count, comment_count=comment_count)
|
||
|
|
||
|
@app.route('/edit_post/<int:post_id>', methods=['GET', 'POST'])
|
||
|
def edit_post(post_id):
|
||
|
post = Post.query.get_or_404(post_id)
|
||
|
if current_user.is_authenticated:
|
||
|
if post.author != current_user:
|
||
|
abort(403)
|
||
|
else:
|
||
|
if post.user_id != session['user_id']:
|
||
|
abort(403)
|
||
|
|
||
|
if request.method == 'POST':
|
||
|
if 'delete' in request.form:
|
||
|
db.session.delete(post)
|
||
|
db.session.commit()
|
||
|
flash('Your post has been deleted!', 'success')
|
||
|
return redirect(url_for('home'))
|
||
|
else:
|
||
|
content = request.form.get('content')
|
||
|
sanitized_content = sanitize_html(content)
|
||
|
post.content = sanitized_content
|
||
|
post.original_content = content
|
||
|
db.session.commit()
|
||
|
flash('Your post has been updated!', 'success')
|
||
|
return redirect(url_for('home'))
|
||
|
|
||
|
return render_template('edit_post.html', post=post)
|
||
|
|
||
|
@app.route('/post/<int:post_id>')
|
||
|
def post_detail(post_id):
|
||
|
post = Post.query.get_or_404(post_id)
|
||
|
metadata = generate_opengraph_metadata(post)
|
||
|
return render_template('post_detail.html', post=post, metadata=metadata)
|
||
|
|
||
|
# Group-related routes
|
||
|
@app.route('/groups')
|
||
|
def groups():
|
||
|
if current_user.is_authenticated:
|
||
|
public_groups = Group.query.filter(Group.visibility != 'invite_only').all()
|
||
|
user_groups = current_user.groups.all() # Convert AppenderQuery to list
|
||
|
invite_only_groups = Group.query.filter_by(visibility='invite_only', creator=current_user).all()
|
||
|
groups = list(set(public_groups + user_groups + invite_only_groups))
|
||
|
else:
|
||
|
groups = Group.query.filter_by(visibility='public').all()
|
||
|
return render_template('groups.html', groups=groups)
|
||
|
|
||
|
@app.route('/create_group', methods=['GET', 'POST'])
|
||
|
@login_required
|
||
|
def create_group():
|
||
|
if request.method == 'POST':
|
||
|
name = request.form.get('name')
|
||
|
description = request.form.get('description')
|
||
|
vanity_url = request.form.get('vanity_url')
|
||
|
visibility = request.form.get('visibility')
|
||
|
allow_requests = 'allow_requests' in request.form if visibility == 'invite_only' else False
|
||
|
cropped_data = request.form.get('cropped_data')
|
||
|
|
||
|
# Check if the vanity_url is already taken
|
||
|
existing_group = Group.query.filter_by(vanity_url=vanity_url).first()
|
||
|
if existing_group:
|
||
|
flash('This vanity URL is already taken. Please choose a different one.', 'error')
|
||
|
return render_template('create_group.html')
|
||
|
|
||
|
new_group = Group(name=name, description=description, vanity_url=vanity_url,
|
||
|
visibility=visibility, allow_requests=allow_requests,
|
||
|
creator=current_user)
|
||
|
|
||
|
if cropped_data:
|
||
|
filename = secure_filename(f"group_{vanity_url}.png")
|
||
|
image_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
|
||
|
|
||
|
# Decode the base64 image and save it
|
||
|
image_data = base64.b64decode(cropped_data.split(',')[1])
|
||
|
with Image.open(io.BytesIO(image_data)) as img:
|
||
|
img.save(image_path, 'PNG')
|
||
|
|
||
|
new_group.image_url = filename
|
||
|
|
||
|
# Add the creator to the group
|
||
|
new_group.members.append(current_user)
|
||
|
db.session.add(new_group)
|
||
|
db.session.commit()
|
||
|
|
||
|
flash('Group created successfully', 'success')
|
||
|
return redirect(url_for('group_detail', vanity_url=vanity_url))
|
||
|
|
||
|
return render_template('create_group.html')
|
||
|
|
||
|
@app.route('/g/<vanity_url>')
|
||
|
def group_detail(vanity_url):
|
||
|
group = Group.query.filter_by(vanity_url=vanity_url).first_or_404()
|
||
|
|
||
|
if group.visibility == 'invite_only':
|
||
|
if not current_user.is_authenticated:
|
||
|
flash('You need to be logged in to view this invite-only group.', 'warning')
|
||
|
return redirect(url_for('login'))
|
||
|
elif current_user not in group.members and current_user != group.creator:
|
||
|
return render_template('group_detail.html', group=group, show_request_button=group.allow_requests)
|
||
|
elif group.visibility == 'registered_only':
|
||
|
if not current_user.is_authenticated:
|
||
|
flash('You need to be logged in to view this group.', 'warning')
|
||
|
return redirect(url_for('login'))
|
||
|
|
||
|
join_requests = []
|
||
|
if current_user.is_authenticated and current_user == group.creator:
|
||
|
join_requests = GroupJoinRequest.query.filter_by(group_id=group.id).all()
|
||
|
|
||
|
can_join = current_user.is_authenticated and current_user not in group.members and (group.visibility == 'public' or group.visibility == 'registered_only')
|
||
|
|
||
|
# Sort posts by newest first
|
||
|
sorted_posts = sorted(group.posts, key=lambda x: x.timestamp, reverse=True)
|
||
|
|
||
|
return render_template('group_detail.html', group=group, posts=sorted_posts, join_requests=join_requests, can_join=can_join)
|
||
|
|
||
|
@app.route('/join_group/<int:group_id>', methods=['POST'])
|
||
|
@login_required
|
||
|
def join_group(group_id):
|
||
|
group = Group.query.get_or_404(group_id)
|
||
|
if current_user not in group.members:
|
||
|
if group.visibility == 'public':
|
||
|
group.members.append(current_user)
|
||
|
db.session.commit()
|
||
|
flash('You have joined the group.', 'success')
|
||
|
elif group.visibility == 'registered_only':
|
||
|
if current_user.is_authenticated:
|
||
|
group.members.append(current_user)
|
||
|
db.session.commit()
|
||
|
flash('You have joined the group.', 'success')
|
||
|
else:
|
||
|
flash('You need to be registered and logged in to join this group.', 'warning')
|
||
|
elif group.visibility == 'invite_only':
|
||
|
if group.allow_requests:
|
||
|
new_request = GroupJoinRequest(user_id=current_user.id, group_id=group_id)
|
||
|
db.session.add(new_request)
|
||
|
db.session.commit()
|
||
|
flash('Your request to join has been sent.', 'success')
|
||
|
else:
|
||
|
flash('This group is invite-only and not accepting join requests.', 'warning')
|
||
|
else:
|
||
|
flash('You cannot join this group.', 'warning')
|
||
|
else:
|
||
|
flash('You are already a member of this group.', 'info')
|
||
|
return redirect(url_for('group_detail', vanity_url=group.vanity_url))
|
||
|
|
||
|
@app.route('/handle_join_request/<int:group_id>/<int:user_id>', methods=['POST'])
|
||
|
@login_required
|
||
|
def handle_join_request(group_id, user_id):
|
||
|
group = Group.query.get_or_404(group_id)
|
||
|
if current_user != group.creator:
|
||
|
abort(403)
|
||
|
|
||
|
user = User.query.get_or_404(user_id)
|
||
|
action = request.form.get('action')
|
||
|
|
||
|
join_request = GroupJoinRequest.query.filter_by(user_id=user_id, group_id=group_id).first()
|
||
|
if not join_request:
|
||
|
flash('Join request not found.', 'error')
|
||
|
return redirect(url_for('group_detail', vanity_url=group.vanity_url))
|
||
|
|
||
|
if action == 'accept':
|
||
|
group.members.append(user)
|
||
|
db.session.delete(join_request)
|
||
|
flash(f'{user.username} has been added to the group.', 'success')
|
||
|
elif action == 'reject':
|
||
|
db.session.delete(join_request)
|
||
|
flash(f'Join request from {user.username} has been rejected.', 'info')
|
||
|
|
||
|
db.session.commit()
|
||
|
return redirect(url_for('group_detail', vanity_url=group.vanity_url))
|
||
|
|
||
|
@app.route('/update_group_settings/<int:group_id>', methods=['POST'])
|
||
|
@login_required
|
||
|
def update_group_settings(group_id):
|
||
|
group = Group.query.get_or_404(group_id)
|
||
|
if current_user != group.creator:
|
||
|
abort(403)
|
||
|
|
||
|
group.name = request.form.get('name')
|
||
|
group.description = request.form.get('description')
|
||
|
group.vanity_url = request.form.get('vanity_url')
|
||
|
group.visibility = request.form.get('visibility')
|
||
|
group.allow_requests = 'allow_requests' in request.form and request.form.get('visibility') == 'invite_only'
|
||
|
|
||
|
if 'image' in request.files:
|
||
|
image = request.files['image']
|
||
|
if image.filename != '':
|
||
|
filename = secure_filename(f"group_{group.id}_{image.filename}")
|
||
|
image_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
|
||
|
image.save(image_path)
|
||
|
group.image_url = filename
|
||
|
|
||
|
db.session.commit()
|
||
|
flash('Group settings updated successfully', 'success')
|
||
|
return redirect(url_for('group_detail', vanity_url=group.vanity_url))
|
||
|
|
||
|
@app.route('/remove_group_member/<int:group_id>/<int:user_id>', methods=['POST'])
|
||
|
@login_required
|
||
|
def remove_group_member(group_id, user_id):
|
||
|
group = Group.query.get_or_404(group_id)
|
||
|
if current_user != group.creator:
|
||
|
abort(403)
|
||
|
|
||
|
user = User.query.get_or_404(user_id)
|
||
|
if user == group.creator:
|
||
|
flash('Cannot remove the group creator', 'error')
|
||
|
else:
|
||
|
group.members.remove(user)
|
||
|
db.session.commit()
|
||
|
flash(f'{user.username} has been removed from the group.', 'success')
|
||
|
|
||
|
return redirect(url_for('group_detail', vanity_url=group.vanity_url))
|
||
|
|
||
|
@app.route('/leave_group/<int:group_id>', methods=['POST'])
|
||
|
@login_required
|
||
|
def leave_group(group_id):
|
||
|
group = Group.query.get_or_404(group_id)
|
||
|
if current_user not in group.members:
|
||
|
abort(403)
|
||
|
if current_user == group.creator:
|
||
|
flash('The group creator cannot leave the group.', 'error')
|
||
|
else:
|
||
|
group.members.remove(current_user)
|
||
|
db.session.commit()
|
||
|
flash(f'You have left the group {group.name}.', 'success')
|
||
|
|
||
|
return redirect(url_for('groups'))
|
||
|
|
||
|
@app.route('/g/<vanity_url>/post', methods=['POST'])
|
||
|
@login_required
|
||
|
def group_post(vanity_url):
|
||
|
group = Group.query.filter_by(vanity_url=vanity_url).first_or_404()
|
||
|
if current_user not in group.members:
|
||
|
abort(403)
|
||
|
|
||
|
content = request.form.get('content')
|
||
|
if not content or content.strip() == '':
|
||
|
flash('Post content cannot be empty.', 'error')
|
||
|
return redirect(url_for('group_detail', vanity_url=vanity_url))
|
||
|
|
||
|
sanitized_content = sanitize_html(content)
|
||
|
new_post = Post(content=sanitized_content, original_content=content, author=current_user, group=group)
|
||
|
db.session.add(new_post)
|
||
|
db.session.commit()
|
||
|
|
||
|
flash('Your post has been added to the group.', 'success')
|
||
|
return redirect(url_for('group_detail', vanity_url=vanity_url))
|
||
|
|
||
|
@app.route('/g/<vanity_url>/post/<int:post_id>/delete', methods=['POST'])
|
||
|
@login_required
|
||
|
def delete_group_post(vanity_url, post_id):
|
||
|
group = Group.query.filter_by(vanity_url=vanity_url).first_or_404()
|
||
|
post = Post.query.filter_by(id=post_id, group_id=group.id).first_or_404()
|
||
|
|
||
|
if current_user != group.creator and current_user != post.author:
|
||
|
abort(403)
|
||
|
|
||
|
db.session.delete(post)
|
||
|
db.session.commit()
|
||
|
flash('Post has been deleted.', 'success')
|
||
|
return redirect(url_for('group_detail', vanity_url=vanity_url))
|
||
|
|
||
|
# Admin routes
|
||
|
@app.route('/admin_login', methods=['GET', 'POST'])
|
||
|
def admin_login():
|
||
|
if request.method == 'POST':
|
||
|
password = request.form.get('password')
|
||
|
if password == "lolmaa":
|
||
|
session['admin'] = True
|
||
|
return redirect(url_for('admin_dashboard'))
|
||
|
else:
|
||
|
flash('Invalid password')
|
||
|
return render_template('admin_login.html')
|
||
|
|
||
|
@app.route('/admin')
|
||
|
@admin_required
|
||
|
def admin_dashboard():
|
||
|
posts = Post.query.order_by(Post.timestamp.desc()).all()
|
||
|
return render_template('admin_dashboard.html', posts=posts)
|
||
|
|
||
|
@app.route('/admin/delete_post/<int:post_id>', methods=['POST'])
|
||
|
@admin_required
|
||
|
def delete_post(post_id):
|
||
|
post = Post.query.get_or_404(post_id)
|
||
|
|
||
|
# Delete all comments associated with the post
|
||
|
Comment.query.filter_by(post_id=post_id).delete()
|
||
|
|
||
|
# Delete all likes associated with the post
|
||
|
Like.query.filter_by(post_id=post_id).delete()
|
||
|
|
||
|
# Delete the post
|
||
|
db.session.delete(post)
|
||
|
|
||
|
try:
|
||
|
db.session.commit()
|
||
|
flash('Post and associated comments deleted successfully')
|
||
|
except Exception as e:
|
||
|
db.session.rollback()
|
||
|
flash('An error occurred while deleting the post: ' + str(e))
|
||
|
|
||
|
return redirect(url_for('admin_dashboard'))
|
||
|
|
||
|
@app.route('/admin/reset_db', methods=['POST'])
|
||
|
@admin_required
|
||
|
def reset_db():
|
||
|
try:
|
||
|
db.drop_all()
|
||
|
db.create_all()
|
||
|
flash('Database has been reset successfully', 'success')
|
||
|
except Exception as e:
|
||
|
flash(f'An error occurred while resetting the database: {str(e)}', 'danger')
|
||
|
return redirect(url_for('admin_dashboard'))
|
||
|
|
||
|
@app.route('/admin/backup_db')
|
||
|
@admin_required
|
||
|
def backup_db():
|
||
|
try:
|
||
|
# Create a JSON representation of the database
|
||
|
data = {
|
||
|
'users': [{'id': u.id, 'username': u.username, 'bio': u.bio} for u in User.query.all()],
|
||
|
'posts': [{'id': p.id, 'content': p.content, 'user_id': p.user_id, 'timestamp': p.timestamp.isoformat()} for p in Post.query.all()],
|
||
|
'comments': [{'id': c.id, 'content': c.content, 'user_id': c.user_id, 'post_id': c.post_id, 'timestamp': c.timestamp.isoformat()} for c in Comment.query.all()],
|
||
|
}
|
||
|
|
||
|
# Save the JSON to a file
|
||
|
with open('db_backup.json', 'w') as f:
|
||
|
json.dump(data, f, indent=2)
|
||
|
|
||
|
# Send the file for download
|
||
|
return send_file('db_backup.json', as_attachment=True)
|
||
|
except Exception as e:
|
||
|
flash(f'An error occurred while creating the database backup: {str(e)}', 'danger')
|
||
|
return redirect(url_for('admin_dashboard'))
|
||
|
|
||
|
@app.route('/admin/clear_uploads', methods=['POST'])
|
||
|
@admin_required
|
||
|
def clear_uploads():
|
||
|
try:
|
||
|
upload_folder = app.config['UPLOAD_FOLDER']
|
||
|
for filename in os.listdir(upload_folder):
|
||
|
file_path = os.path.join(upload_folder, filename)
|
||
|
if os.path.isfile(file_path):
|
||
|
os.unlink(file_path)
|
||
|
flash('Upload folder has been cleared successfully', 'success')
|
||
|
except Exception as e:
|
||
|
flash(f'An error occurred while clearing the upload folder: {str(e)}', 'danger')
|
||
|
return redirect(url_for('admin_dashboard'))
|
||
|
|
||
|
# Helper functions
|
||
|
def generate_opengraph_metadata(post):
|
||
|
if post.author:
|
||
|
author_name = post.author.username
|
||
|
elif post.anonymous_username:
|
||
|
author_name = post.anonymous_username
|
||
|
else:
|
||
|
author_name = 'Anonymous'
|
||
|
|
||
|
metadata = {
|
||
|
'title': f"Post by {author_name} on TechTalks",
|
||
|
'type': 'article',
|
||
|
'url': url_for('post_detail', post_id=post.id, _external=True, _scheme='https'),
|
||
|
'description': bleach.clean(post.content, tags=[], strip=True)[:200] + '...' if len(post.content) > 200 else bleach.clean(post.content, tags=[], strip=True),
|
||
|
'image': url_for('static', filename='uploads/' + post.image_url, _external=True, _scheme='https') if post.image_url else None,
|
||
|
}
|
||
|
return metadata
|
||
|
|
||
|
def sanitize_html(content):
|
||
|
# Convert Markdown to HTML
|
||
|
html_content = markdown.markdown(content, extensions=['nl2br', 'fenced_code'])
|
||
|
|
||
|
# Define allowed tags and attributes
|
||
|
allowed_tags = ['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'strong', 'ul', 'br', 'hr', 'pre', 'span']
|
||
|
allowed_attributes = {
|
||
|
'a': ['href', 'title', 'target', 'rel'],
|
||
|
'p': ['style'],
|
||
|
'span': ['style'],
|
||
|
'h1': ['style'],
|
||
|
'h2': ['style'],
|
||
|
'h3': ['style'],
|
||
|
'h4': ['style'],
|
||
|
'h5': ['style'],
|
||
|
'h6': ['style'],
|
||
|
}
|
||
|
|
||
|
# Custom linkify function to handle URLs without protocol
|
||
|
def linkify_external(attrs, new=False):
|
||
|
href = attrs.get((None, 'href'), '').strip()
|
||
|
if href.startswith('//'):
|
||
|
attrs[(None, 'href')] = 'http:' + href
|
||
|
elif not urlparse(href).scheme:
|
||
|
tlds = ['.com', '.org', '.net', '.edu', '.gov', '.io', '.gay', '.lol', '.in', '.it', '.hu', '.me', '.uk', '.de', '.fr', '.es', '.ru', '.jp', '.cn', '.au', '.ca', '.br', '.mx', '.nl', '.pl', '.se', '.ch', '.at', '.be', '.dk', '.fi', '.no', '.nz', '.ie', '.sg', '.kr', '.za', '.ph', '.cl', '.ar', '.co', '.ve', '.gr', '.il', '.ae', '.pt', '.cz', '.ro', '.th', '.my', '.tr', '.ua', '.hk', '.vn', '.sa', '.eg', '.ng', '.bd', '.pk', '.ma', '.dz', '.ke', '.tn', '.by', '.lv', '.kz', '.uy', '.pe', '.do', '.ec', '.gt', '.py', '.bo', '.cu', '.hn', '.sv', '.ni', '.pa', '.bz', '.gy', '.tt', '.ag', '.dm', '.kn', '.lc', '.vc', '.jm', '.bs', '.bb', '.vg', '.tc', '.ms', '.ai', '.gd', '.fj', '.to', '.vu', '.sb', '.ws', '.ck', '.nu', '.tv', '.fm', '.ki', '.nr', '.pw', '.as', '.gu', '.mp', '.pr', '.vi', '.um', '.tf', '.yt', '.re', '.mq', '.gp', '.bl', '.mf', '.pm', '.wf', '.pf', '.nc', '.sx', '.aw', '.cw', '.bq']
|
||
|
if any(href.endswith(tld) for tld in tlds):
|
||
|
attrs[(None, 'href')] = 'http://' + href
|
||
|
attrs[(None, 'target')] = '_blank'
|
||
|
attrs[(None, 'rel')] = 'noopener noreferrer'
|
||
|
return attrs
|
||
|
|
||
|
# Clean the HTML
|
||
|
clean_html = bleach.clean(html_content, tags=allowed_tags, attributes=allowed_attributes)
|
||
|
|
||
|
# Linkify the cleaned HTML
|
||
|
linkified_html = bleach.linkify(clean_html, callbacks=[linkify_external])
|
||
|
|
||
|
return linkified_html
|
||
|
|
||
|
@app.before_request
|
||
|
def before_request():
|
||
|
if 'user_id' not in session:
|
||
|
session['user_id'] = str(uuid.uuid4())
|
||
|
|
||
|
def create_upload_directory():
|
||
|
upload_dir = os.path.join(app.root_path, app.config['UPLOAD_FOLDER'])
|
||
|
if not os.path.exists(upload_dir):
|
||
|
os.makedirs(upload_dir)
|
||
|
print("Uploads directory created.")
|
||
|
else:
|
||
|
print("Uploads directory already exists.")
|
||
|
|
||
|
@app.route('/update_user_group_settings/<int:group_id>', methods=['POST'])
|
||
|
@login_required
|
||
|
def update_user_group_settings(group_id):
|
||
|
group = Group.query.get_or_404(group_id)
|
||
|
if current_user not in group.members:
|
||
|
abort(403)
|
||
|
|
||
|
show_in_homepage = 'show_in_homepage' in request.form
|
||
|
|
||
|
# Update the user's settings for this group
|
||
|
current_user.update_group_homepage_setting(group, show_in_homepage)
|
||
|
|
||
|
db.session.commit()
|
||
|
flash('Your settings have been updated.', 'success')
|
||
|
return redirect(url_for('group_detail', vanity_url=group.vanity_url))
|
||
|
|
||
|
@app.route('/sort_group_posts/<int:group_id>')
|
||
|
def sort_group_posts(group_id):
|
||
|
sort = request.args.get('sort', 'newest')
|
||
|
group = Group.query.get_or_404(group_id)
|
||
|
|
||
|
if sort == 'newest':
|
||
|
posts = sorted(group.posts, key=lambda x: x.timestamp)
|
||
|
elif sort == 'oldest':
|
||
|
posts = sorted(group.posts, key=lambda x: x.timestamp, reverse=True)
|
||
|
elif sort == 'most_liked':
|
||
|
posts = sorted(group.posts, key=lambda x: len(x.likes), reverse=True)
|
||
|
elif sort == 'most_commented':
|
||
|
posts = sorted(group.posts, key=lambda x: len(x.comments), reverse=True)
|
||
|
else:
|
||
|
posts = sorted(group.posts, key=lambda x: x.timestamp) # Default to newest
|
||
|
|
||
|
return render_template_string('''
|
||
|
{% for post in posts %}
|
||
|
<div class="card mb-3" id="post-{{ post.id }}">
|
||
|
<div class="card-body">
|
||
|
<div class="d-flex align-items-center mb-2">
|
||
|
{% if post.author %}
|
||
|
{% if post.author.profile_picture %}
|
||
|
<img src="{{ url_for('static', filename='uploads/' + post.author.profile_picture) }}" class="rounded-circle me-2" alt="Profile Picture" style="width: 30px; height: 30px; object-fit: cover;">
|
||
|
{% else %}
|
||
|
<div class="rounded-circle me-2 d-flex justify-content-center align-items-center bg-primary" style="width: 30px; height: 30px;">
|
||
|
<span class="text-white" style="font-size: 0.8rem;">{{ post.author.username[0].upper() }}</span>
|
||
|
</div>
|
||
|
{% endif %}
|
||
|
<small class="text-muted">
|
||
|
<a href="{{ url_for('profile', username=post.author.username) }}" class="text-decoration-none">{{ post.author.username }}</a>
|
||
|
</small>
|
||
|
{% else %}
|
||
|
<small class="text-muted">{{ post.anonymous_username }}</small>
|
||
|
{% endif %}
|
||
|
<span class="ms-auto text-muted">{{ post.timestamp.strftime('%Y-%m-%d %H:%M:%S') }}</span>
|
||
|
</div>
|
||
|
<p class="card-text">{{ post.content|safe }}</p>
|
||
|
{% if post.image_url %}
|
||
|
<img src="{{ url_for('static', filename='uploads/' + post.image_url) }}" class="img-fluid mb-2 post-image" alt="Post image" data-bs-toggle="modal" data-bs-target="#imageModal{{ post.id }}">
|
||
|
{% endif %}
|
||
|
<div class="d-flex flex-wrap justify-content-between align-items-center">
|
||
|
<div class="btn-group mb-2">
|
||
|
<button class="btn btn-outline-primary btn-sm like-btn" data-post-id="{{ post.id }}">
|
||
|
<i class="bi bi-heart-fill"></i> Like (<span class="like-count">{{ post.likes|length }}</span>)
|
||
|
</button>
|
||
|
<button class="btn btn-outline-secondary btn-sm comment-btn" data-post-id="{{ post.id }}">
|
||
|
<i class="bi bi-chat-fill"></i> Comment ({{ post.comments|length }})
|
||
|
</button>
|
||
|
</div>
|
||
|
<div class="btn-group mb-2">
|
||
|
<a href="{{ url_for('post_detail', post_id=post.id) }}" class="btn btn-outline-info btn-sm" target="_blank">
|
||
|
<i class="bi bi-box-arrow-up-right"></i> Open in new tab
|
||
|
</a>
|
||
|
<button class="btn btn-outline-info btn-sm copy-link-btn" data-post-url="{{ url_for('post_detail', post_id=post.id, _external=True, _scheme='https') }}">
|
||
|
<i class="bi bi-link-45deg"></i> Copy Link
|
||
|
</button>
|
||
|
{% if current_user.is_authenticated and current_user.id == post.author.id %}
|
||
|
<a href="{{ url_for('edit_post', post_id=post.id) }}" class="btn btn-outline-warning btn-sm">
|
||
|
<i class="bi bi-pencil-fill"></i> Edit
|
||
|
</a>
|
||
|
{% endif %}
|
||
|
</div>
|
||
|
</div>
|
||
|
</div>
|
||
|
<div class="comment-section" id="comment-section-{{ post.id }}" style="display: none;">
|
||
|
<div class="card-body pt-0">
|
||
|
<h6 class="mb-3">Comments:</h6>
|
||
|
<ul class="list-unstyled mb-3">
|
||
|
{% for comment in post.comments %}
|
||
|
{% include 'comment.html' %}
|
||
|
{% endfor %}
|
||
|
</ul>
|
||
|
<form class="comment-form" data-post-id="{{ post.id }}">
|
||
|
<div class="input-group">
|
||
|
<input type="text" class="form-control" placeholder="Add a comment" name="content" required>
|
||
|
<button class="btn btn-outline-secondary" type="submit">Send</button>
|
||
|
</div>
|
||
|
</form>
|
||
|
</div>
|
||
|
</div>
|
||
|
</div>
|
||
|
{% endfor %}
|
||
|
''', posts=posts)
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
with app.app_context():
|
||
|
create_upload_directory()
|
||
|
|
||
|
db.create_all()
|
||
|
print("Database initialized.")
|
||
|
app.run(debug=True,host='0.0.0.0',port=5051)
|