old ver
This commit is contained in:
parent
f1a78f83b7
commit
45f1960afc
417
app.py
417
app.py
@ -1,417 +0,0 @@
|
|||||||
from flask import Flask, render_template, request, redirect, url_for, flash, jsonify, abort
|
|
||||||
from flask_sqlalchemy import SQLAlchemy
|
|
||||||
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
|
|
||||||
from werkzeug.security import generate_password_hash, check_password_hash
|
|
||||||
from werkzeug.utils import secure_filename
|
|
||||||
import os
|
|
||||||
from datetime import datetime
|
|
||||||
from flask import request, session
|
|
||||||
import uuid
|
|
||||||
from PIL import Image
|
|
||||||
import io
|
|
||||||
from flask import jsonify
|
|
||||||
import base64
|
|
||||||
from functools import wraps
|
|
||||||
import os
|
|
||||||
from flask import send_file
|
|
||||||
import json
|
|
||||||
import markdown
|
|
||||||
import bleach
|
|
||||||
|
|
||||||
app = Flask(__name__)
|
|
||||||
app.config['SECRET_KEY'] = 'your_secret_key'
|
|
||||||
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///techtalks.db'
|
|
||||||
app.config['UPLOAD_FOLDER'] = 'static/uploads'
|
|
||||||
db = SQLAlchemy(app)
|
|
||||||
login_manager = LoginManager(app)
|
|
||||||
login_manager.login_view = 'login'
|
|
||||||
|
|
||||||
# Model definitions
|
|
||||||
class User(UserMixin, db.Model):
|
|
||||||
id = db.Column(db.Integer, primary_key=True)
|
|
||||||
username = db.Column(db.String(80), unique=True, nullable=False)
|
|
||||||
password_hash = db.Column(db.String(120), nullable=False)
|
|
||||||
bio = db.Column(db.Text)
|
|
||||||
profile_picture = db.Column(db.String(200))
|
|
||||||
posts = db.relationship('Post', backref='author', lazy=True)
|
|
||||||
comments = db.relationship('Comment', backref='author', lazy=True)
|
|
||||||
|
|
||||||
class Post(db.Model):
|
|
||||||
id = db.Column(db.Integer, primary_key=True)
|
|
||||||
content = db.Column(db.Text, nullable=False)
|
|
||||||
image_url = db.Column(db.String(200))
|
|
||||||
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
|
|
||||||
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True)
|
|
||||||
anonymous_username = db.Column(db.String(80))
|
|
||||||
likes = db.relationship('Like', backref='post', lazy=True, cascade='all, delete-orphan')
|
|
||||||
comments = db.relationship('Comment', backref='post', lazy=True, cascade='all, delete-orphan')
|
|
||||||
|
|
||||||
class Like(db.Model):
|
|
||||||
id = db.Column(db.Integer, primary_key=True)
|
|
||||||
user_id = db.Column(db.String(120), nullable=False)
|
|
||||||
post_id = db.Column(db.Integer, db.ForeignKey('post.id'), nullable=False)
|
|
||||||
|
|
||||||
class Comment(db.Model):
|
|
||||||
id = db.Column(db.Integer, primary_key=True)
|
|
||||||
content = db.Column(db.Text, nullable=False)
|
|
||||||
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
|
|
||||||
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True)
|
|
||||||
post_id = db.Column(db.Integer, db.ForeignKey('post.id', ondelete='CASCADE'), nullable=False)
|
|
||||||
likes = db.relationship('CommentLike', backref='comment', lazy=True, cascade='all, delete-orphan')
|
|
||||||
|
|
||||||
class CommentLike(db.Model):
|
|
||||||
id = db.Column(db.Integer, primary_key=True)
|
|
||||||
user_id = db.Column(db.String(120), nullable=False)
|
|
||||||
comment_id = db.Column(db.Integer, db.ForeignKey('comment.id'), nullable=False)
|
|
||||||
|
|
||||||
@login_manager.user_loader
|
|
||||||
def load_user(user_id):
|
|
||||||
return User.query.get(int(user_id))
|
|
||||||
|
|
||||||
@app.route('/')
|
|
||||||
def home():
|
|
||||||
posts = Post.query.order_by(Post.timestamp.desc()).all()
|
|
||||||
metadata = {
|
|
||||||
'title': 'TechTalks - Twitter but for techtokers',
|
|
||||||
'type': 'website',
|
|
||||||
'url': url_for('home', _external=True, _scheme='https'),
|
|
||||||
'description': 'TechTalks is a platform for tech enthusiasts to share and discuss ideas.',
|
|
||||||
'image': url_for('static', filename='techtalks_logo.png', _external=True, _scheme='https'), # Add a logo image
|
|
||||||
}
|
|
||||||
return render_template('index.html', posts=posts, metadata=metadata)
|
|
||||||
|
|
||||||
@app.route('/register', methods=['GET', 'POST'])
|
|
||||||
def register():
|
|
||||||
if request.method == 'POST':
|
|
||||||
username = request.form.get('username')
|
|
||||||
password = request.form.get('password')
|
|
||||||
|
|
||||||
user = User.query.filter_by(username=username).first()
|
|
||||||
if user:
|
|
||||||
flash('Username already exists')
|
|
||||||
return redirect(url_for('register'))
|
|
||||||
|
|
||||||
new_user = User(username=username, password_hash=generate_password_hash(password, method='pbkdf2:sha256'))
|
|
||||||
db.session.add(new_user)
|
|
||||||
db.session.commit()
|
|
||||||
|
|
||||||
login_user(new_user) # Log in the user immediately after registration
|
|
||||||
flash('Registration successful')
|
|
||||||
return redirect(url_for('home'))
|
|
||||||
|
|
||||||
return render_template('register.html')
|
|
||||||
|
|
||||||
@app.route('/login', methods=['GET', 'POST'])
|
|
||||||
def login():
|
|
||||||
if request.method == 'POST':
|
|
||||||
username = request.form.get('username')
|
|
||||||
password = request.form.get('password')
|
|
||||||
|
|
||||||
user = User.query.filter_by(username=username).first()
|
|
||||||
if user and check_password_hash(user.password_hash, password):
|
|
||||||
login_user(user)
|
|
||||||
return redirect(url_for('home'))
|
|
||||||
else:
|
|
||||||
flash('Invalid username or password')
|
|
||||||
|
|
||||||
return render_template('login.html')
|
|
||||||
|
|
||||||
@app.route('/logout')
|
|
||||||
@login_required
|
|
||||||
def logout():
|
|
||||||
logout_user()
|
|
||||||
return redirect(url_for('home'))
|
|
||||||
|
|
||||||
@app.route('/post', methods=['GET', 'POST'])
|
|
||||||
def post():
|
|
||||||
if request.method == 'POST':
|
|
||||||
content = request.form.get('content')
|
|
||||||
image = request.files.get('image')
|
|
||||||
username = request.form.get('username')
|
|
||||||
|
|
||||||
# Process Markdown and sanitize HTML
|
|
||||||
html_content = markdown.markdown(content)
|
|
||||||
allowed_tags = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'strong', 'ul', 'p', 'br']
|
|
||||||
allowed_attributes = {'a': ['href', 'title']}
|
|
||||||
sanitized_content = bleach.clean(html_content, tags=allowed_tags, attributes=allowed_attributes, strip=True)
|
|
||||||
|
|
||||||
if current_user.is_authenticated:
|
|
||||||
user = current_user
|
|
||||||
new_post = Post(content=sanitized_content, author=user)
|
|
||||||
else:
|
|
||||||
new_post = Post(content=sanitized_content, anonymous_username=username)
|
|
||||||
|
|
||||||
if image:
|
|
||||||
create_upload_directory()
|
|
||||||
filename = secure_filename(image.filename)
|
|
||||||
image_path = os.path.join(app.root_path, app.config['UPLOAD_FOLDER'], filename)
|
|
||||||
image.save(image_path)
|
|
||||||
new_post.image_url = filename
|
|
||||||
|
|
||||||
db.session.add(new_post)
|
|
||||||
db.session.commit()
|
|
||||||
|
|
||||||
return redirect(url_for('home'))
|
|
||||||
|
|
||||||
return render_template('post.html')
|
|
||||||
|
|
||||||
@app.route('/like/<int:post_id>', methods=['POST'])
|
|
||||||
def like_post(post_id):
|
|
||||||
post = Post.query.get_or_404(post_id)
|
|
||||||
|
|
||||||
if current_user.is_authenticated:
|
|
||||||
user_id = current_user.id
|
|
||||||
else:
|
|
||||||
# For anonymous users, use IP address as a unique identifier
|
|
||||||
user_id = request.remote_addr
|
|
||||||
|
|
||||||
like = Like.query.filter_by(user_id=user_id, post_id=post.id).first()
|
|
||||||
|
|
||||||
if like:
|
|
||||||
db.session.delete(like)
|
|
||||||
else:
|
|
||||||
new_like = Like(user_id=user_id, post_id=post.id)
|
|
||||||
db.session.add(new_like)
|
|
||||||
|
|
||||||
db.session.commit()
|
|
||||||
return jsonify({'likes': len(post.likes)})
|
|
||||||
|
|
||||||
@app.route('/comment/<int:post_id>', methods=['POST'])
|
|
||||||
def add_comment(post_id):
|
|
||||||
content = request.form.get('content')
|
|
||||||
|
|
||||||
if current_user.is_authenticated:
|
|
||||||
user = current_user
|
|
||||||
new_comment = Comment(content=content, author=user, post_id=post_id)
|
|
||||||
else:
|
|
||||||
new_comment = Comment(content=content, post_id=post_id)
|
|
||||||
|
|
||||||
db.session.add(new_comment)
|
|
||||||
db.session.commit()
|
|
||||||
|
|
||||||
return jsonify({
|
|
||||||
'success': True,
|
|
||||||
'comment_id': new_comment.id,
|
|
||||||
'username': user.username if current_user.is_authenticated else 'Anonymous',
|
|
||||||
'content': new_comment.content
|
|
||||||
})
|
|
||||||
|
|
||||||
@app.route('/like_comment/<int:comment_id>', methods=['POST'])
|
|
||||||
def like_comment(comment_id):
|
|
||||||
comment = Comment.query.get_or_404(comment_id)
|
|
||||||
|
|
||||||
if current_user.is_authenticated:
|
|
||||||
user_id = str(current_user.id)
|
|
||||||
else:
|
|
||||||
user_id = request.remote_addr # Use IP address for anonymous users
|
|
||||||
|
|
||||||
like = CommentLike.query.filter_by(user_id=user_id, comment_id=comment.id).first()
|
|
||||||
|
|
||||||
if like:
|
|
||||||
db.session.delete(like)
|
|
||||||
else:
|
|
||||||
new_like = CommentLike(user_id=user_id, comment_id=comment.id)
|
|
||||||
db.session.add(new_like)
|
|
||||||
|
|
||||||
db.session.commit()
|
|
||||||
return jsonify({'likes': len(comment.likes)})
|
|
||||||
|
|
||||||
@app.route('/profile/<username>', methods=['GET', 'POST'])
|
|
||||||
def profile(username):
|
|
||||||
user = User.query.filter_by(username=username).first_or_404()
|
|
||||||
if request.method == 'POST' and current_user.is_authenticated and current_user.id == user.id:
|
|
||||||
bio = request.form.get('bio')
|
|
||||||
cropped_data = request.form.get('cropped_data')
|
|
||||||
remove_picture = request.form.get('remove_picture')
|
|
||||||
|
|
||||||
# Process Markdown and sanitize HTML for bio
|
|
||||||
if bio:
|
|
||||||
html = markdown.markdown(bio)
|
|
||||||
allowed_tags = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'strong', 'ul']
|
|
||||||
allowed_attributes = {'a': ['href', 'title']}
|
|
||||||
user.bio = bleach.clean(html, tags=allowed_tags, attributes=allowed_attributes, strip=True)
|
|
||||||
else:
|
|
||||||
user.bio = None
|
|
||||||
|
|
||||||
if remove_picture:
|
|
||||||
user.profile_picture = None
|
|
||||||
elif cropped_data:
|
|
||||||
filename = secure_filename(f"{user.username}_profile.png")
|
|
||||||
image_path = os.path.join(app.root_path, app.config['UPLOAD_FOLDER'], filename)
|
|
||||||
|
|
||||||
# Decode the base64 image and save it
|
|
||||||
image_data = base64.b64decode(cropped_data.split(',')[1])
|
|
||||||
with Image.open(io.BytesIO(image_data)) as img:
|
|
||||||
if img.mode in ('RGBA', 'LA'):
|
|
||||||
# Image has an alpha channel, save as PNG
|
|
||||||
img.save(image_path, 'PNG')
|
|
||||||
else:
|
|
||||||
# Image doesn't have an alpha channel, convert to RGB and save as JPEG
|
|
||||||
img = img.convert('RGB')
|
|
||||||
img.save(image_path, 'JPEG')
|
|
||||||
|
|
||||||
user.profile_picture = filename
|
|
||||||
|
|
||||||
db.session.commit()
|
|
||||||
flash('Profile updated successfully', 'success')
|
|
||||||
return redirect(url_for('profile', username=username))
|
|
||||||
|
|
||||||
posts = Post.query.filter_by(user_id=user.id).order_by(Post.timestamp.desc()).all()
|
|
||||||
post_count = len(posts)
|
|
||||||
like_count = sum(len(post.likes) for post in posts)
|
|
||||||
comment_count = sum(len(post.comments) for post in posts)
|
|
||||||
return render_template('profile.html', user=user, posts=posts, post_count=post_count, like_count=like_count, comment_count=comment_count)
|
|
||||||
|
|
||||||
@app.route('/edit_post/<int:post_id>', methods=['GET', 'POST'])
|
|
||||||
def edit_post(post_id):
|
|
||||||
post = Post.query.get_or_404(post_id)
|
|
||||||
if current_user.is_authenticated:
|
|
||||||
if post.author != current_user:
|
|
||||||
abort(403)
|
|
||||||
else:
|
|
||||||
if post.user_id != session['user_id']:
|
|
||||||
abort(403)
|
|
||||||
|
|
||||||
if request.method == 'POST':
|
|
||||||
post.content = request.form.get('content')
|
|
||||||
db.session.commit()
|
|
||||||
flash('Your post has been updated!', 'success')
|
|
||||||
return redirect(url_for('home'))
|
|
||||||
return render_template('edit_post.html', post=post)
|
|
||||||
|
|
||||||
@app.before_request
|
|
||||||
def before_request():
|
|
||||||
if 'user_id' not in session:
|
|
||||||
session['user_id'] = str(uuid.uuid4())
|
|
||||||
|
|
||||||
def create_upload_directory():
|
|
||||||
upload_dir = os.path.join(app.root_path, app.config['UPLOAD_FOLDER'])
|
|
||||||
if not os.path.exists(upload_dir):
|
|
||||||
os.makedirs(upload_dir)
|
|
||||||
print("Uploads directory created.")
|
|
||||||
else:
|
|
||||||
print("Uploads directory already exists.")
|
|
||||||
|
|
||||||
def init_db():
|
|
||||||
db_path = os.path.join(app.root_path, 'techtalks.db')
|
|
||||||
if not os.path.exists(db_path):
|
|
||||||
with app.app_context():
|
|
||||||
db.create_all()
|
|
||||||
print("Database initialized.")
|
|
||||||
else:
|
|
||||||
print("Database already exists.")
|
|
||||||
|
|
||||||
def admin_required(f):
|
|
||||||
@wraps(f)
|
|
||||||
def decorated_function(*args, **kwargs):
|
|
||||||
if 'admin' not in session:
|
|
||||||
return redirect(url_for('admin_login'))
|
|
||||||
return f(*args, **kwargs)
|
|
||||||
return decorated_function
|
|
||||||
|
|
||||||
@app.route('/admin_login', methods=['GET', 'POST'])
|
|
||||||
def admin_login():
|
|
||||||
if request.method == 'POST':
|
|
||||||
password = request.form.get('password')
|
|
||||||
if password == "lolmaa":
|
|
||||||
session['admin'] = True
|
|
||||||
return redirect(url_for('admin_dashboard'))
|
|
||||||
else:
|
|
||||||
flash('Invalid password')
|
|
||||||
return render_template('admin_login.html')
|
|
||||||
|
|
||||||
@app.route('/admin')
|
|
||||||
@admin_required
|
|
||||||
def admin_dashboard():
|
|
||||||
posts = Post.query.order_by(Post.timestamp.desc()).all()
|
|
||||||
return render_template('admin_dashboard.html', posts=posts)
|
|
||||||
|
|
||||||
@app.route('/admin/delete_post/<int:post_id>', methods=['POST'])
|
|
||||||
@admin_required
|
|
||||||
def delete_post(post_id):
|
|
||||||
post = Post.query.get_or_404(post_id)
|
|
||||||
|
|
||||||
# Delete all comments associated with the post
|
|
||||||
Comment.query.filter_by(post_id=post_id).delete()
|
|
||||||
|
|
||||||
# Delete all likes associated with the post
|
|
||||||
Like.query.filter_by(post_id=post_id).delete()
|
|
||||||
|
|
||||||
# Delete the post
|
|
||||||
db.session.delete(post)
|
|
||||||
|
|
||||||
try:
|
|
||||||
db.session.commit()
|
|
||||||
flash('Post and associated comments deleted successfully')
|
|
||||||
except Exception as e:
|
|
||||||
db.session.rollback()
|
|
||||||
flash('An error occurred while deleting the post: ' + str(e))
|
|
||||||
|
|
||||||
return redirect(url_for('admin_dashboard'))
|
|
||||||
|
|
||||||
@app.route('/admin/reset_db', methods=['POST'])
|
|
||||||
@admin_required
|
|
||||||
def reset_db():
|
|
||||||
try:
|
|
||||||
db.drop_all()
|
|
||||||
db.create_all()
|
|
||||||
flash('Database has been reset successfully', 'success')
|
|
||||||
except Exception as e:
|
|
||||||
flash(f'An error occurred while resetting the database: {str(e)}', 'danger')
|
|
||||||
return redirect(url_for('admin_dashboard'))
|
|
||||||
|
|
||||||
@app.route('/admin/backup_db')
|
|
||||||
@admin_required
|
|
||||||
def backup_db():
|
|
||||||
try:
|
|
||||||
# Create a JSON representation of the database
|
|
||||||
data = {
|
|
||||||
'users': [{'id': u.id, 'username': u.username, 'bio': u.bio} for u in User.query.all()],
|
|
||||||
'posts': [{'id': p.id, 'content': p.content, 'user_id': p.user_id, 'timestamp': p.timestamp.isoformat()} for p in Post.query.all()],
|
|
||||||
'comments': [{'id': c.id, 'content': c.content, 'user_id': c.user_id, 'post_id': c.post_id, 'timestamp': c.timestamp.isoformat()} for c in Comment.query.all()],
|
|
||||||
}
|
|
||||||
|
|
||||||
# Save the JSON to a file
|
|
||||||
with open('db_backup.json', 'w') as f:
|
|
||||||
json.dump(data, f, indent=2)
|
|
||||||
|
|
||||||
# Send the file for download
|
|
||||||
return send_file('db_backup.json', as_attachment=True)
|
|
||||||
except Exception as e:
|
|
||||||
flash(f'An error occurred while creating the database backup: {str(e)}', 'danger')
|
|
||||||
return redirect(url_for('admin_dashboard'))
|
|
||||||
|
|
||||||
@app.route('/admin/clear_uploads', methods=['POST'])
|
|
||||||
@admin_required
|
|
||||||
def clear_uploads():
|
|
||||||
try:
|
|
||||||
upload_folder = app.config['UPLOAD_FOLDER']
|
|
||||||
for filename in os.listdir(upload_folder):
|
|
||||||
file_path = os.path.join(upload_folder, filename)
|
|
||||||
if os.path.isfile(file_path):
|
|
||||||
os.unlink(file_path)
|
|
||||||
flash('Upload folder has been cleared successfully', 'success')
|
|
||||||
except Exception as e:
|
|
||||||
flash(f'An error occurred while clearing the upload folder: {str(e)}', 'danger')
|
|
||||||
return redirect(url_for('admin_dashboard'))
|
|
||||||
|
|
||||||
@app.route('/post/<int:post_id>')
|
|
||||||
def post_detail(post_id):
|
|
||||||
post = Post.query.get_or_404(post_id)
|
|
||||||
metadata = generate_opengraph_metadata(post)
|
|
||||||
return render_template('post_detail.html', post=post, metadata=metadata)
|
|
||||||
|
|
||||||
def generate_opengraph_metadata(post):
|
|
||||||
metadata = {
|
|
||||||
'title': f"Post by {post.author.username if post.author else 'Anonymous'} on TechTalks",
|
|
||||||
'type': 'article',
|
|
||||||
'url': url_for('post_detail', post_id=post.id, _external=True, _scheme='https'),
|
|
||||||
'description': bleach.clean(post.content, tags=[], strip=True)[:200] + '...' if len(post.content) > 200 else bleach.clean(post.content, tags=[], strip=True),
|
|
||||||
'image': url_for('static', filename='uploads/' + post.image_url, _external=True, _scheme='https') if post.image_url else None,
|
|
||||||
}
|
|
||||||
return metadata
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
init_db()
|
|
||||||
create_upload_directory()
|
|
||||||
app.run(debug=True, port=5051, host='0.0.0.0')
|
|
Loading…
Reference in New Issue
Block a user