Upload files to "/"

This commit is contained in:
spitkov 2024-09-19 22:34:49 +02:00
commit f1a78f83b7

417
app.py Normal file
View File

@ -0,0 +1,417 @@
from flask import Flask, render_template, request, redirect, url_for, flash, jsonify, abort
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
from werkzeug.security import generate_password_hash, check_password_hash
from werkzeug.utils import secure_filename
import os
from datetime import datetime
from flask import request, session
import uuid
from PIL import Image
import io
from flask import jsonify
import base64
from functools import wraps
import os
from flask import send_file
import json
import markdown
import bleach
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your_secret_key'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///techtalks.db'
app.config['UPLOAD_FOLDER'] = 'static/uploads'
db = SQLAlchemy(app)
login_manager = LoginManager(app)
login_manager.login_view = 'login'
# Model definitions
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
password_hash = db.Column(db.String(120), nullable=False)
bio = db.Column(db.Text)
profile_picture = db.Column(db.String(200))
posts = db.relationship('Post', backref='author', lazy=True)
comments = db.relationship('Comment', backref='author', lazy=True)
class Post(db.Model):
id = db.Column(db.Integer, primary_key=True)
content = db.Column(db.Text, nullable=False)
image_url = db.Column(db.String(200))
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True)
anonymous_username = db.Column(db.String(80))
likes = db.relationship('Like', backref='post', lazy=True, cascade='all, delete-orphan')
comments = db.relationship('Comment', backref='post', lazy=True, cascade='all, delete-orphan')
class Like(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.String(120), nullable=False)
post_id = db.Column(db.Integer, db.ForeignKey('post.id'), nullable=False)
class Comment(db.Model):
id = db.Column(db.Integer, primary_key=True)
content = db.Column(db.Text, nullable=False)
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True)
post_id = db.Column(db.Integer, db.ForeignKey('post.id', ondelete='CASCADE'), nullable=False)
likes = db.relationship('CommentLike', backref='comment', lazy=True, cascade='all, delete-orphan')
class CommentLike(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.String(120), nullable=False)
comment_id = db.Column(db.Integer, db.ForeignKey('comment.id'), nullable=False)
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
@app.route('/')
def home():
posts = Post.query.order_by(Post.timestamp.desc()).all()
metadata = {
'title': 'TechTalks - Twitter but for techtokers',
'type': 'website',
'url': url_for('home', _external=True, _scheme='https'),
'description': 'TechTalks is a platform for tech enthusiasts to share and discuss ideas.',
'image': url_for('static', filename='techtalks_logo.png', _external=True, _scheme='https'), # Add a logo image
}
return render_template('index.html', posts=posts, metadata=metadata)
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
user = User.query.filter_by(username=username).first()
if user:
flash('Username already exists')
return redirect(url_for('register'))
new_user = User(username=username, password_hash=generate_password_hash(password, method='pbkdf2:sha256'))
db.session.add(new_user)
db.session.commit()
login_user(new_user) # Log in the user immediately after registration
flash('Registration successful')
return redirect(url_for('home'))
return render_template('register.html')
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
user = User.query.filter_by(username=username).first()
if user and check_password_hash(user.password_hash, password):
login_user(user)
return redirect(url_for('home'))
else:
flash('Invalid username or password')
return render_template('login.html')
@app.route('/logout')
@login_required
def logout():
logout_user()
return redirect(url_for('home'))
@app.route('/post', methods=['GET', 'POST'])
def post():
if request.method == 'POST':
content = request.form.get('content')
image = request.files.get('image')
username = request.form.get('username')
# Process Markdown and sanitize HTML
html_content = markdown.markdown(content)
allowed_tags = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'strong', 'ul', 'p', 'br']
allowed_attributes = {'a': ['href', 'title']}
sanitized_content = bleach.clean(html_content, tags=allowed_tags, attributes=allowed_attributes, strip=True)
if current_user.is_authenticated:
user = current_user
new_post = Post(content=sanitized_content, author=user)
else:
new_post = Post(content=sanitized_content, anonymous_username=username)
if image:
create_upload_directory()
filename = secure_filename(image.filename)
image_path = os.path.join(app.root_path, app.config['UPLOAD_FOLDER'], filename)
image.save(image_path)
new_post.image_url = filename
db.session.add(new_post)
db.session.commit()
return redirect(url_for('home'))
return render_template('post.html')
@app.route('/like/<int:post_id>', methods=['POST'])
def like_post(post_id):
post = Post.query.get_or_404(post_id)
if current_user.is_authenticated:
user_id = current_user.id
else:
# For anonymous users, use IP address as a unique identifier
user_id = request.remote_addr
like = Like.query.filter_by(user_id=user_id, post_id=post.id).first()
if like:
db.session.delete(like)
else:
new_like = Like(user_id=user_id, post_id=post.id)
db.session.add(new_like)
db.session.commit()
return jsonify({'likes': len(post.likes)})
@app.route('/comment/<int:post_id>', methods=['POST'])
def add_comment(post_id):
content = request.form.get('content')
if current_user.is_authenticated:
user = current_user
new_comment = Comment(content=content, author=user, post_id=post_id)
else:
new_comment = Comment(content=content, post_id=post_id)
db.session.add(new_comment)
db.session.commit()
return jsonify({
'success': True,
'comment_id': new_comment.id,
'username': user.username if current_user.is_authenticated else 'Anonymous',
'content': new_comment.content
})
@app.route('/like_comment/<int:comment_id>', methods=['POST'])
def like_comment(comment_id):
comment = Comment.query.get_or_404(comment_id)
if current_user.is_authenticated:
user_id = str(current_user.id)
else:
user_id = request.remote_addr # Use IP address for anonymous users
like = CommentLike.query.filter_by(user_id=user_id, comment_id=comment.id).first()
if like:
db.session.delete(like)
else:
new_like = CommentLike(user_id=user_id, comment_id=comment.id)
db.session.add(new_like)
db.session.commit()
return jsonify({'likes': len(comment.likes)})
@app.route('/profile/<username>', methods=['GET', 'POST'])
def profile(username):
user = User.query.filter_by(username=username).first_or_404()
if request.method == 'POST' and current_user.is_authenticated and current_user.id == user.id:
bio = request.form.get('bio')
cropped_data = request.form.get('cropped_data')
remove_picture = request.form.get('remove_picture')
# Process Markdown and sanitize HTML for bio
if bio:
html = markdown.markdown(bio)
allowed_tags = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'strong', 'ul']
allowed_attributes = {'a': ['href', 'title']}
user.bio = bleach.clean(html, tags=allowed_tags, attributes=allowed_attributes, strip=True)
else:
user.bio = None
if remove_picture:
user.profile_picture = None
elif cropped_data:
filename = secure_filename(f"{user.username}_profile.png")
image_path = os.path.join(app.root_path, app.config['UPLOAD_FOLDER'], filename)
# Decode the base64 image and save it
image_data = base64.b64decode(cropped_data.split(',')[1])
with Image.open(io.BytesIO(image_data)) as img:
if img.mode in ('RGBA', 'LA'):
# Image has an alpha channel, save as PNG
img.save(image_path, 'PNG')
else:
# Image doesn't have an alpha channel, convert to RGB and save as JPEG
img = img.convert('RGB')
img.save(image_path, 'JPEG')
user.profile_picture = filename
db.session.commit()
flash('Profile updated successfully', 'success')
return redirect(url_for('profile', username=username))
posts = Post.query.filter_by(user_id=user.id).order_by(Post.timestamp.desc()).all()
post_count = len(posts)
like_count = sum(len(post.likes) for post in posts)
comment_count = sum(len(post.comments) for post in posts)
return render_template('profile.html', user=user, posts=posts, post_count=post_count, like_count=like_count, comment_count=comment_count)
@app.route('/edit_post/<int:post_id>', methods=['GET', 'POST'])
def edit_post(post_id):
post = Post.query.get_or_404(post_id)
if current_user.is_authenticated:
if post.author != current_user:
abort(403)
else:
if post.user_id != session['user_id']:
abort(403)
if request.method == 'POST':
post.content = request.form.get('content')
db.session.commit()
flash('Your post has been updated!', 'success')
return redirect(url_for('home'))
return render_template('edit_post.html', post=post)
@app.before_request
def before_request():
if 'user_id' not in session:
session['user_id'] = str(uuid.uuid4())
def create_upload_directory():
upload_dir = os.path.join(app.root_path, app.config['UPLOAD_FOLDER'])
if not os.path.exists(upload_dir):
os.makedirs(upload_dir)
print("Uploads directory created.")
else:
print("Uploads directory already exists.")
def init_db():
db_path = os.path.join(app.root_path, 'techtalks.db')
if not os.path.exists(db_path):
with app.app_context():
db.create_all()
print("Database initialized.")
else:
print("Database already exists.")
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'admin' not in session:
return redirect(url_for('admin_login'))
return f(*args, **kwargs)
return decorated_function
@app.route('/admin_login', methods=['GET', 'POST'])
def admin_login():
if request.method == 'POST':
password = request.form.get('password')
if password == "lolmaa":
session['admin'] = True
return redirect(url_for('admin_dashboard'))
else:
flash('Invalid password')
return render_template('admin_login.html')
@app.route('/admin')
@admin_required
def admin_dashboard():
posts = Post.query.order_by(Post.timestamp.desc()).all()
return render_template('admin_dashboard.html', posts=posts)
@app.route('/admin/delete_post/<int:post_id>', methods=['POST'])
@admin_required
def delete_post(post_id):
post = Post.query.get_or_404(post_id)
# Delete all comments associated with the post
Comment.query.filter_by(post_id=post_id).delete()
# Delete all likes associated with the post
Like.query.filter_by(post_id=post_id).delete()
# Delete the post
db.session.delete(post)
try:
db.session.commit()
flash('Post and associated comments deleted successfully')
except Exception as e:
db.session.rollback()
flash('An error occurred while deleting the post: ' + str(e))
return redirect(url_for('admin_dashboard'))
@app.route('/admin/reset_db', methods=['POST'])
@admin_required
def reset_db():
try:
db.drop_all()
db.create_all()
flash('Database has been reset successfully', 'success')
except Exception as e:
flash(f'An error occurred while resetting the database: {str(e)}', 'danger')
return redirect(url_for('admin_dashboard'))
@app.route('/admin/backup_db')
@admin_required
def backup_db():
try:
# Create a JSON representation of the database
data = {
'users': [{'id': u.id, 'username': u.username, 'bio': u.bio} for u in User.query.all()],
'posts': [{'id': p.id, 'content': p.content, 'user_id': p.user_id, 'timestamp': p.timestamp.isoformat()} for p in Post.query.all()],
'comments': [{'id': c.id, 'content': c.content, 'user_id': c.user_id, 'post_id': c.post_id, 'timestamp': c.timestamp.isoformat()} for c in Comment.query.all()],
}
# Save the JSON to a file
with open('db_backup.json', 'w') as f:
json.dump(data, f, indent=2)
# Send the file for download
return send_file('db_backup.json', as_attachment=True)
except Exception as e:
flash(f'An error occurred while creating the database backup: {str(e)}', 'danger')
return redirect(url_for('admin_dashboard'))
@app.route('/admin/clear_uploads', methods=['POST'])
@admin_required
def clear_uploads():
try:
upload_folder = app.config['UPLOAD_FOLDER']
for filename in os.listdir(upload_folder):
file_path = os.path.join(upload_folder, filename)
if os.path.isfile(file_path):
os.unlink(file_path)
flash('Upload folder has been cleared successfully', 'success')
except Exception as e:
flash(f'An error occurred while clearing the upload folder: {str(e)}', 'danger')
return redirect(url_for('admin_dashboard'))
@app.route('/post/<int:post_id>')
def post_detail(post_id):
post = Post.query.get_or_404(post_id)
metadata = generate_opengraph_metadata(post)
return render_template('post_detail.html', post=post, metadata=metadata)
def generate_opengraph_metadata(post):
metadata = {
'title': f"Post by {post.author.username if post.author else 'Anonymous'} on TechTalks",
'type': 'article',
'url': url_for('post_detail', post_id=post.id, _external=True, _scheme='https'),
'description': bleach.clean(post.content, tags=[], strip=True)[:200] + '...' if len(post.content) > 200 else bleach.clean(post.content, tags=[], strip=True),
'image': url_for('static', filename='uploads/' + post.image_url, _external=True, _scheme='https') if post.image_url else None,
}
return metadata
if __name__ == '__main__':
init_db()
create_upload_directory()
app.run(debug=True, port=5051, host='0.0.0.0')