diff --git a/app.py b/app.py deleted file mode 100644 index 8271937..0000000 --- a/app.py +++ /dev/null @@ -1,417 +0,0 @@ -from flask import Flask, render_template, request, redirect, url_for, flash, jsonify, abort -from flask_sqlalchemy import SQLAlchemy -from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user -from werkzeug.security import generate_password_hash, check_password_hash -from werkzeug.utils import secure_filename -import os -from datetime import datetime -from flask import request, session -import uuid -from PIL import Image -import io -from flask import jsonify -import base64 -from functools import wraps -import os -from flask import send_file -import json -import markdown -import bleach - -app = Flask(__name__) -app.config['SECRET_KEY'] = 'your_secret_key' -app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///techtalks.db' -app.config['UPLOAD_FOLDER'] = 'static/uploads' -db = SQLAlchemy(app) -login_manager = LoginManager(app) -login_manager.login_view = 'login' - -# Model definitions -class User(UserMixin, db.Model): - id = db.Column(db.Integer, primary_key=True) - username = db.Column(db.String(80), unique=True, nullable=False) - password_hash = db.Column(db.String(120), nullable=False) - bio = db.Column(db.Text) - profile_picture = db.Column(db.String(200)) - posts = db.relationship('Post', backref='author', lazy=True) - comments = db.relationship('Comment', backref='author', lazy=True) - -class Post(db.Model): - id = db.Column(db.Integer, primary_key=True) - content = db.Column(db.Text, nullable=False) - image_url = db.Column(db.String(200)) - timestamp = db.Column(db.DateTime, default=datetime.utcnow) - user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True) - anonymous_username = db.Column(db.String(80)) - likes = db.relationship('Like', backref='post', lazy=True, cascade='all, delete-orphan') - comments = db.relationship('Comment', backref='post', lazy=True, cascade='all, delete-orphan') - -class Like(db.Model): - id = db.Column(db.Integer, primary_key=True) - user_id = db.Column(db.String(120), nullable=False) - post_id = db.Column(db.Integer, db.ForeignKey('post.id'), nullable=False) - -class Comment(db.Model): - id = db.Column(db.Integer, primary_key=True) - content = db.Column(db.Text, nullable=False) - timestamp = db.Column(db.DateTime, default=datetime.utcnow) - user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True) - post_id = db.Column(db.Integer, db.ForeignKey('post.id', ondelete='CASCADE'), nullable=False) - likes = db.relationship('CommentLike', backref='comment', lazy=True, cascade='all, delete-orphan') - -class CommentLike(db.Model): - id = db.Column(db.Integer, primary_key=True) - user_id = db.Column(db.String(120), nullable=False) - comment_id = db.Column(db.Integer, db.ForeignKey('comment.id'), nullable=False) - -@login_manager.user_loader -def load_user(user_id): - return User.query.get(int(user_id)) - -@app.route('/') -def home(): - posts = Post.query.order_by(Post.timestamp.desc()).all() - metadata = { - 'title': 'TechTalks - Twitter but for techtokers', - 'type': 'website', - 'url': url_for('home', _external=True, _scheme='https'), - 'description': 'TechTalks is a platform for tech enthusiasts to share and discuss ideas.', - 'image': url_for('static', filename='techtalks_logo.png', _external=True, _scheme='https'), # Add a logo image - } - return render_template('index.html', posts=posts, metadata=metadata) - -@app.route('/register', methods=['GET', 'POST']) -def register(): - if request.method == 'POST': - username = request.form.get('username') - password = request.form.get('password') - - user = User.query.filter_by(username=username).first() - if user: - flash('Username already exists') - return redirect(url_for('register')) - - new_user = User(username=username, password_hash=generate_password_hash(password, method='pbkdf2:sha256')) - db.session.add(new_user) - db.session.commit() - - login_user(new_user) # Log in the user immediately after registration - flash('Registration successful') - return redirect(url_for('home')) - - return render_template('register.html') - -@app.route('/login', methods=['GET', 'POST']) -def login(): - if request.method == 'POST': - username = request.form.get('username') - password = request.form.get('password') - - user = User.query.filter_by(username=username).first() - if user and check_password_hash(user.password_hash, password): - login_user(user) - return redirect(url_for('home')) - else: - flash('Invalid username or password') - - return render_template('login.html') - -@app.route('/logout') -@login_required -def logout(): - logout_user() - return redirect(url_for('home')) - -@app.route('/post', methods=['GET', 'POST']) -def post(): - if request.method == 'POST': - content = request.form.get('content') - image = request.files.get('image') - username = request.form.get('username') - - # Process Markdown and sanitize HTML - html_content = markdown.markdown(content) - allowed_tags = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'strong', 'ul', 'p', 'br'] - allowed_attributes = {'a': ['href', 'title']} - sanitized_content = bleach.clean(html_content, tags=allowed_tags, attributes=allowed_attributes, strip=True) - - if current_user.is_authenticated: - user = current_user - new_post = Post(content=sanitized_content, author=user) - else: - new_post = Post(content=sanitized_content, anonymous_username=username) - - if image: - create_upload_directory() - filename = secure_filename(image.filename) - image_path = os.path.join(app.root_path, app.config['UPLOAD_FOLDER'], filename) - image.save(image_path) - new_post.image_url = filename - - db.session.add(new_post) - db.session.commit() - - return redirect(url_for('home')) - - return render_template('post.html') - -@app.route('/like/', methods=['POST']) -def like_post(post_id): - post = Post.query.get_or_404(post_id) - - if current_user.is_authenticated: - user_id = current_user.id - else: - # For anonymous users, use IP address as a unique identifier - user_id = request.remote_addr - - like = Like.query.filter_by(user_id=user_id, post_id=post.id).first() - - if like: - db.session.delete(like) - else: - new_like = Like(user_id=user_id, post_id=post.id) - db.session.add(new_like) - - db.session.commit() - return jsonify({'likes': len(post.likes)}) - -@app.route('/comment/', methods=['POST']) -def add_comment(post_id): - content = request.form.get('content') - - if current_user.is_authenticated: - user = current_user - new_comment = Comment(content=content, author=user, post_id=post_id) - else: - new_comment = Comment(content=content, post_id=post_id) - - db.session.add(new_comment) - db.session.commit() - - return jsonify({ - 'success': True, - 'comment_id': new_comment.id, - 'username': user.username if current_user.is_authenticated else 'Anonymous', - 'content': new_comment.content - }) - -@app.route('/like_comment/', methods=['POST']) -def like_comment(comment_id): - comment = Comment.query.get_or_404(comment_id) - - if current_user.is_authenticated: - user_id = str(current_user.id) - else: - user_id = request.remote_addr # Use IP address for anonymous users - - like = CommentLike.query.filter_by(user_id=user_id, comment_id=comment.id).first() - - if like: - db.session.delete(like) - else: - new_like = CommentLike(user_id=user_id, comment_id=comment.id) - db.session.add(new_like) - - db.session.commit() - return jsonify({'likes': len(comment.likes)}) - -@app.route('/profile/', methods=['GET', 'POST']) -def profile(username): - user = User.query.filter_by(username=username).first_or_404() - if request.method == 'POST' and current_user.is_authenticated and current_user.id == user.id: - bio = request.form.get('bio') - cropped_data = request.form.get('cropped_data') - remove_picture = request.form.get('remove_picture') - - # Process Markdown and sanitize HTML for bio - if bio: - html = markdown.markdown(bio) - allowed_tags = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'strong', 'ul'] - allowed_attributes = {'a': ['href', 'title']} - user.bio = bleach.clean(html, tags=allowed_tags, attributes=allowed_attributes, strip=True) - else: - user.bio = None - - if remove_picture: - user.profile_picture = None - elif cropped_data: - filename = secure_filename(f"{user.username}_profile.png") - image_path = os.path.join(app.root_path, app.config['UPLOAD_FOLDER'], filename) - - # Decode the base64 image and save it - image_data = base64.b64decode(cropped_data.split(',')[1]) - with Image.open(io.BytesIO(image_data)) as img: - if img.mode in ('RGBA', 'LA'): - # Image has an alpha channel, save as PNG - img.save(image_path, 'PNG') - else: - # Image doesn't have an alpha channel, convert to RGB and save as JPEG - img = img.convert('RGB') - img.save(image_path, 'JPEG') - - user.profile_picture = filename - - db.session.commit() - flash('Profile updated successfully', 'success') - return redirect(url_for('profile', username=username)) - - posts = Post.query.filter_by(user_id=user.id).order_by(Post.timestamp.desc()).all() - post_count = len(posts) - like_count = sum(len(post.likes) for post in posts) - comment_count = sum(len(post.comments) for post in posts) - return render_template('profile.html', user=user, posts=posts, post_count=post_count, like_count=like_count, comment_count=comment_count) - -@app.route('/edit_post/', methods=['GET', 'POST']) -def edit_post(post_id): - post = Post.query.get_or_404(post_id) - if current_user.is_authenticated: - if post.author != current_user: - abort(403) - else: - if post.user_id != session['user_id']: - abort(403) - - if request.method == 'POST': - post.content = request.form.get('content') - db.session.commit() - flash('Your post has been updated!', 'success') - return redirect(url_for('home')) - return render_template('edit_post.html', post=post) - -@app.before_request -def before_request(): - if 'user_id' not in session: - session['user_id'] = str(uuid.uuid4()) - -def create_upload_directory(): - upload_dir = os.path.join(app.root_path, app.config['UPLOAD_FOLDER']) - if not os.path.exists(upload_dir): - os.makedirs(upload_dir) - print("Uploads directory created.") - else: - print("Uploads directory already exists.") - -def init_db(): - db_path = os.path.join(app.root_path, 'techtalks.db') - if not os.path.exists(db_path): - with app.app_context(): - db.create_all() - print("Database initialized.") - else: - print("Database already exists.") - -def admin_required(f): - @wraps(f) - def decorated_function(*args, **kwargs): - if 'admin' not in session: - return redirect(url_for('admin_login')) - return f(*args, **kwargs) - return decorated_function - -@app.route('/admin_login', methods=['GET', 'POST']) -def admin_login(): - if request.method == 'POST': - password = request.form.get('password') - if password == "lolmaa": - session['admin'] = True - return redirect(url_for('admin_dashboard')) - else: - flash('Invalid password') - return render_template('admin_login.html') - -@app.route('/admin') -@admin_required -def admin_dashboard(): - posts = Post.query.order_by(Post.timestamp.desc()).all() - return render_template('admin_dashboard.html', posts=posts) - -@app.route('/admin/delete_post/', methods=['POST']) -@admin_required -def delete_post(post_id): - post = Post.query.get_or_404(post_id) - - # Delete all comments associated with the post - Comment.query.filter_by(post_id=post_id).delete() - - # Delete all likes associated with the post - Like.query.filter_by(post_id=post_id).delete() - - # Delete the post - db.session.delete(post) - - try: - db.session.commit() - flash('Post and associated comments deleted successfully') - except Exception as e: - db.session.rollback() - flash('An error occurred while deleting the post: ' + str(e)) - - return redirect(url_for('admin_dashboard')) - -@app.route('/admin/reset_db', methods=['POST']) -@admin_required -def reset_db(): - try: - db.drop_all() - db.create_all() - flash('Database has been reset successfully', 'success') - except Exception as e: - flash(f'An error occurred while resetting the database: {str(e)}', 'danger') - return redirect(url_for('admin_dashboard')) - -@app.route('/admin/backup_db') -@admin_required -def backup_db(): - try: - # Create a JSON representation of the database - data = { - 'users': [{'id': u.id, 'username': u.username, 'bio': u.bio} for u in User.query.all()], - 'posts': [{'id': p.id, 'content': p.content, 'user_id': p.user_id, 'timestamp': p.timestamp.isoformat()} for p in Post.query.all()], - 'comments': [{'id': c.id, 'content': c.content, 'user_id': c.user_id, 'post_id': c.post_id, 'timestamp': c.timestamp.isoformat()} for c in Comment.query.all()], - } - - # Save the JSON to a file - with open('db_backup.json', 'w') as f: - json.dump(data, f, indent=2) - - # Send the file for download - return send_file('db_backup.json', as_attachment=True) - except Exception as e: - flash(f'An error occurred while creating the database backup: {str(e)}', 'danger') - return redirect(url_for('admin_dashboard')) - -@app.route('/admin/clear_uploads', methods=['POST']) -@admin_required -def clear_uploads(): - try: - upload_folder = app.config['UPLOAD_FOLDER'] - for filename in os.listdir(upload_folder): - file_path = os.path.join(upload_folder, filename) - if os.path.isfile(file_path): - os.unlink(file_path) - flash('Upload folder has been cleared successfully', 'success') - except Exception as e: - flash(f'An error occurred while clearing the upload folder: {str(e)}', 'danger') - return redirect(url_for('admin_dashboard')) - -@app.route('/post/') -def post_detail(post_id): - post = Post.query.get_or_404(post_id) - metadata = generate_opengraph_metadata(post) - return render_template('post_detail.html', post=post, metadata=metadata) - -def generate_opengraph_metadata(post): - metadata = { - 'title': f"Post by {post.author.username if post.author else 'Anonymous'} on TechTalks", - 'type': 'article', - 'url': url_for('post_detail', post_id=post.id, _external=True, _scheme='https'), - 'description': bleach.clean(post.content, tags=[], strip=True)[:200] + '...' if len(post.content) > 200 else bleach.clean(post.content, tags=[], strip=True), - 'image': url_for('static', filename='uploads/' + post.image_url, _external=True, _scheme='https') if post.image_url else None, - } - return metadata - -if __name__ == '__main__': - init_db() - create_upload_directory() - app.run(debug=True, port=5051, host='0.0.0.0') \ No newline at end of file