957 lines
34 KiB
Python
957 lines
34 KiB
Python
from flask import Flask, render_template, request, redirect, url_for, session, g, flash
|
|
import sqlite3
|
|
import os
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
from werkzeug.utils import secure_filename
|
|
from datetime import datetime
|
|
from flask import current_app
|
|
from flask import jsonify
|
|
import markdown
|
|
import re
|
|
import bleach
|
|
|
|
def render_markdown(text):
|
|
html = markdown.markdown(text, extensions=['nl2br'])
|
|
allowed_tags = ['p', 'br', 'strong', 'em', 'a', 'ul', 'ol', 'li']
|
|
allowed_attributes = {'a': ['href', 'title']}
|
|
return bleach.clean(html, tags=allowed_tags, attributes=allowed_attributes, strip=True)
|
|
|
|
|
|
def process_tweet_content(content):
|
|
content = re.sub(r'#(\w+)', r'<a href="/hashtag/\1" class="hashtag">#\1</a>', content)
|
|
content = re.sub(r'@(\w+)', r'<a href="/profile/\1" class="mention">@\1</a>', content)
|
|
|
|
return content
|
|
|
|
app = Flask(__name__)
|
|
def get_user_by_id(user_id):
|
|
db = get_db()
|
|
user = db.execute('SELECT * FROM users WHERE id = ?', (user_id,)).fetchone()
|
|
return dict(user) if user else None
|
|
|
|
def init_jinja_env(app):
|
|
app.jinja_env.globals.update(get_user_by_id=get_user_by_id)
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = 'verysecretok'
|
|
init_jinja_env(app)
|
|
|
|
@app.template_filter('replace_mentions')
|
|
def replace_mentions(content):
|
|
return bleach.linkify(re.sub(r'@(\w+)', r'<a href="/profile/\1">@\1</a>', content))
|
|
|
|
DATABASE = 'db.sqlite3'
|
|
UPLOAD_FOLDER = 'static/uploads/'
|
|
app.config['UPLOAD_FOLDER'] = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static', 'uploads')
|
|
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}
|
|
|
|
def add_rendered_content_column():
|
|
db = get_db()
|
|
try:
|
|
db.execute('ALTER TABLE tweets ADD COLUMN rendered_content TEXT')
|
|
db.commit()
|
|
print("Added rendered_content column to tweets table")
|
|
except sqlite3.OperationalError as e:
|
|
if "duplicate column name" in str(e):
|
|
print("rendered_content column already exists")
|
|
else:
|
|
raise e
|
|
|
|
|
|
def get_db():
|
|
db = getattr(g, '_database', None)
|
|
if db is None:
|
|
db = g._database = sqlite3.connect(DATABASE)
|
|
db.row_factory = sqlite3.Row
|
|
return db
|
|
|
|
|
|
def create_tables():
|
|
db = get_db()
|
|
db.execute('''CREATE TABLE IF NOT EXISTS users (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
username TEXT NOT NULL UNIQUE,
|
|
password TEXT NOT NULL,
|
|
email TEXT NOT NULL,
|
|
pfp TEXT,
|
|
banner TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)''')
|
|
|
|
|
|
cursor = db.execute("PRAGMA table_info(users)")
|
|
columns = [column[1] for column in cursor.fetchall()]
|
|
|
|
if 'created_at' not in columns:
|
|
|
|
db.execute('ALTER TABLE users RENAME TO users_old')
|
|
|
|
|
|
db.execute('''CREATE TABLE users (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
username TEXT NOT NULL UNIQUE,
|
|
password TEXT NOT NULL,
|
|
email TEXT NOT NULL,
|
|
pfp TEXT,
|
|
banner TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)''')
|
|
|
|
|
|
cursor = db.execute("PRAGMA table_info(users_old)")
|
|
old_columns = [column[1] for column in cursor.fetchall()]
|
|
|
|
|
|
insert_columns = ['id', 'username', 'password']
|
|
select_columns = ['id', 'username', 'password']
|
|
|
|
if 'email' in old_columns:
|
|
insert_columns.append('email')
|
|
select_columns.append('email')
|
|
else:
|
|
insert_columns.append('email')
|
|
select_columns.append("'example@email.com' AS email")
|
|
|
|
if 'pfp' in old_columns:
|
|
insert_columns.append('pfp')
|
|
select_columns.append('pfp')
|
|
|
|
if 'banner' in old_columns:
|
|
insert_columns.append('banner')
|
|
select_columns.append('banner')
|
|
|
|
|
|
db.execute(f'''
|
|
INSERT INTO users({', '.join(insert_columns)})
|
|
SELECT {', '.join(select_columns)} FROM users_old
|
|
''')
|
|
|
|
|
|
db.execute('DROP TABLE users_old')
|
|
|
|
db.commit()
|
|
|
|
|
|
db.execute('''CREATE TABLE IF NOT EXISTS tweets (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
content TEXT NOT NULL,
|
|
user_id INTEGER,
|
|
likes INTEGER DEFAULT 0,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY(user_id) REFERENCES users(id)
|
|
)''')
|
|
db.execute('''CREATE TABLE IF NOT EXISTS comments (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
content TEXT NOT NULL,
|
|
tweet_id INTEGER,
|
|
user_id INTEGER,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY(tweet_id) REFERENCES tweets(id),
|
|
FOREIGN KEY(user_id) REFERENCES users(id)
|
|
)''')
|
|
db.execute('''CREATE TABLE IF NOT EXISTS likes (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER,
|
|
tweet_id INTEGER,
|
|
FOREIGN KEY(user_id) REFERENCES users(id),
|
|
FOREIGN KEY(tweet_id) REFERENCES tweets(id),
|
|
UNIQUE(user_id, tweet_id)
|
|
)''')
|
|
db.execute('''CREATE TABLE IF NOT EXISTS group_members (
|
|
group_id INTEGER,
|
|
user_id INTEGER,
|
|
joined_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (group_id) REFERENCES groups (id),
|
|
FOREIGN KEY (user_id) REFERENCES users (id),
|
|
PRIMARY KEY (group_id, user_id)
|
|
)''')
|
|
db.execute('''CREATE TABLE IF NOT EXISTS messages (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
sender_id INTEGER,
|
|
receiver_id INTEGER,
|
|
content TEXT,
|
|
image TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (sender_id) REFERENCES users (id),
|
|
FOREIGN KEY (receiver_id) REFERENCES users (id)
|
|
)''')
|
|
db.execute('''CREATE TABLE IF NOT EXISTS posts (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER,
|
|
group_id INTEGER,
|
|
content TEXT NOT NULL,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (user_id) REFERENCES users (id),
|
|
FOREIGN KEY (group_id) REFERENCES groups (id)
|
|
)''')
|
|
db.execute('''CREATE TABLE IF NOT EXISTS groups (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL,
|
|
description TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
avatar TEXT,
|
|
vanity_url TEXT
|
|
)''')
|
|
|
|
|
|
cursor = db.execute("PRAGMA table_info(groups)")
|
|
columns = [column[1] for column in cursor.fetchall()]
|
|
|
|
if 'vanity_url' not in columns:
|
|
|
|
db.execute('''CREATE TABLE groups_new (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL,
|
|
description TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
avatar TEXT,
|
|
vanity_url TEXT UNIQUE
|
|
)''')
|
|
|
|
|
|
db.execute('INSERT INTO groups_new SELECT id, name, description, created_at, avatar, NULL FROM groups')
|
|
|
|
|
|
db.execute('DROP TABLE groups')
|
|
|
|
|
|
db.execute('ALTER TABLE groups_new RENAME TO groups')
|
|
|
|
|
|
db.execute('CREATE UNIQUE INDEX IF NOT EXISTS idx_groups_vanity_url ON groups (vanity_url)')
|
|
|
|
|
|
cursor = db.execute("PRAGMA table_info(users)")
|
|
columns = [column[1] for column in cursor.fetchall()]
|
|
if 'created_at' not in columns:
|
|
add_rendered_content_column()
|
|
update_existing_tweets()
|
|
db.commit()
|
|
|
|
@app.teardown_appcontext
|
|
def close_connection(exception):
|
|
db = getattr(g, '_database', None)
|
|
if db is not None:
|
|
db.close()
|
|
|
|
|
|
|
|
def add_columns():
|
|
db = get_db()
|
|
try:
|
|
db.execute('ALTER TABLE users ADD COLUMN pfp TEXT')
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
|
|
try:
|
|
db.execute('ALTER TABLE tweets ADD COLUMN likes INTEGER DEFAULT 0')
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
|
|
try:
|
|
db.execute('ALTER TABLE users ADD COLUMN banner TEXT')
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
|
|
try:
|
|
db.execute('ALTER TABLE messages ADD COLUMN image TEXT')
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
|
|
try:
|
|
db.execute('''
|
|
CREATE TABLE IF NOT EXISTS followers (
|
|
follower_id INTEGER,
|
|
followed_id INTEGER,
|
|
FOREIGN KEY (follower_id) REFERENCES users (id),
|
|
FOREIGN KEY (followed_id) REFERENCES users (id),
|
|
PRIMARY KEY (follower_id, followed_id)
|
|
)
|
|
''')
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
|
|
|
|
|
|
def allowed_file(filename):
|
|
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
|
|
|
|
|
|
@app.route('/signup', methods=['GET', 'POST'])
|
|
def signup():
|
|
if request.method == 'POST':
|
|
username = request.form['username']
|
|
password = generate_password_hash(request.form['password'])
|
|
email = request.form['email']
|
|
|
|
restricted_usernames = ['avery', 'cgcristi', 'cg', 'ceegee']
|
|
if username.lower() in restricted_usernames:
|
|
flash('This username is not allowed. Please choose a different one.', 'error')
|
|
return render_template('signup.html')
|
|
|
|
db = get_db()
|
|
try:
|
|
db.execute('INSERT INTO users (username, password, email) VALUES (?, ?, ?)', (username, password, email))
|
|
db.commit()
|
|
flash('Account created successfully. Please log in.', 'success')
|
|
return redirect(url_for('login'))
|
|
except sqlite3.IntegrityError:
|
|
flash('Username already exists. Please choose a different one.', 'error')
|
|
|
|
return render_template('signup.html')
|
|
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
if request.method == 'POST':
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
db = get_db()
|
|
user = db.execute('SELECT * FROM users WHERE username = ?', (username,)).fetchone()
|
|
|
|
if user and check_password_hash(user['password'], password):
|
|
session['user_id'] = user['id']
|
|
session['username'] = user['username']
|
|
if not user['pfp']:
|
|
return redirect(url_for('profile', username=user['username']))
|
|
return redirect(url_for('index'))
|
|
|
|
return render_template('login.html')
|
|
|
|
@app.route('/logout')
|
|
def logout():
|
|
session.pop('user_id', None)
|
|
session.pop('username', None)
|
|
session.pop('pfp', None)
|
|
return redirect(url_for('login'))
|
|
|
|
@app.route('/')
|
|
def index():
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('login'))
|
|
|
|
db = get_db()
|
|
tweets = db.execute('''
|
|
SELECT t.*, u.username, u.pfp as user_pfp,
|
|
COALESCE(t.rendered_content, t.content) as displayed_content
|
|
FROM tweets t
|
|
JOIN users u ON t.user_id = u.id
|
|
ORDER BY t.created_at DESC
|
|
''').fetchall()
|
|
|
|
tweets = [dict(tweet) for tweet in tweets]
|
|
for tweet in tweets:
|
|
tweet['created_at'] = datetime.strptime(tweet['created_at'], '%Y-%m-%d %H:%M:%S')
|
|
tweet['displayed_content'] = bleach.clean(tweet['displayed_content'], strip=True)
|
|
|
|
liked_tweet_ids = {like['tweet_id'] for like in db.execute('SELECT tweet_id FROM likes WHERE user_id = ?', (session['user_id'],)).fetchall()}
|
|
user = db.execute('SELECT * FROM users WHERE id = ?', (session['user_id'],)).fetchone()
|
|
|
|
if user is None:
|
|
flash('User not found. Please log in again.', 'error')
|
|
return redirect(url_for('logout'))
|
|
|
|
user = dict(user)
|
|
user['created_at'] = datetime.strptime(user['created_at'], '%Y-%m-%d %H:%M:%S')
|
|
|
|
return render_template('index.html', tweets=tweets, liked_tweet_ids=liked_tweet_ids, user=user, get_user_by_id=get_user_by_id)
|
|
|
|
|
|
def update_existing_tweets():
|
|
db = get_db()
|
|
tweets = db.execute('SELECT id, content FROM tweets WHERE rendered_content IS NULL').fetchall()
|
|
for tweet in tweets:
|
|
rendered_content = render_markdown(tweet['content'])
|
|
db.execute('UPDATE tweets SET rendered_content = ? WHERE id = ?', (rendered_content, tweet['id']))
|
|
db.commit()
|
|
print(f"Updated {len(tweets)} existing tweets with rendered content")
|
|
|
|
|
|
@app.route('/tweet', methods=['POST'])
|
|
def tweet():
|
|
if 'user_id' in session:
|
|
content = request.form['content']
|
|
if len(content) > 280:
|
|
return redirect(url_for('index'))
|
|
processed_content = process_tweet_content(content)
|
|
rendered_content = render_markdown(processed_content)
|
|
image = request.files.get('image')
|
|
db = get_db()
|
|
if image and allowed_file(image.filename):
|
|
filename = secure_filename(image.filename)
|
|
image.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
|
|
db.execute('INSERT INTO tweets (content, rendered_content, user_id, image) VALUES (?, ?, ?, ?)',
|
|
(content, rendered_content, session['user_id'], filename))
|
|
else:
|
|
db.execute('INSERT INTO tweets (content, rendered_content, user_id) VALUES (?, ?, ?)',
|
|
(content, rendered_content, session['user_id']))
|
|
db.commit()
|
|
return redirect(url_for('index'))
|
|
|
|
@app.route('/retweet/<int:tweet_id>', methods=['POST'])
|
|
def retweet(tweet_id):
|
|
if 'user_id' in session:
|
|
db = get_db()
|
|
original_tweet = db.execute('SELECT * FROM tweets WHERE id = ?', (tweet_id,)).fetchone()
|
|
if original_tweet:
|
|
db.execute('INSERT INTO tweets (content, rendered_content, user_id, original_tweet_id) VALUES (?, ?, ?, ?)',
|
|
(original_tweet['content'], original_tweet['rendered_content'], session['user_id'], tweet_id))
|
|
db.commit()
|
|
return redirect(url_for('index'))
|
|
|
|
@app.route('/search', methods=['GET'])
|
|
def search():
|
|
query = request.args.get('q', '')
|
|
db = get_db()
|
|
users = db.execute('SELECT * FROM users WHERE username LIKE ? LIMIT 10', ('%' + query + '%',)).fetchall()
|
|
return render_template('search_results.html', users=users, query=query)
|
|
|
|
@app.route('/hashtag/<hashtag>')
|
|
def hashtag(hashtag):
|
|
db = get_db()
|
|
tweets = db.execute('''
|
|
SELECT t.*, u.username, u.pfp as user_pfp
|
|
FROM tweets t
|
|
JOIN users u ON t.user_id = u.id
|
|
WHERE t.content LIKE ?
|
|
ORDER BY t.created_at DESC
|
|
''', ('%#' + hashtag + '%',)).fetchall()
|
|
return render_template('hashtag.html', tweets=tweets, hashtag=hashtag)
|
|
|
|
@app.route('/tweet/<int:tweet_id>')
|
|
def tweet_detail(tweet_id):
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('login'))
|
|
db = get_db()
|
|
tweet = db.execute('SELECT t.id, t.content, u.username, u.pfp, t.likes FROM tweets t JOIN users u ON t.user_id = u.id WHERE t.id = ?', (tweet_id,)).fetchone()
|
|
comments = db.execute('SELECT c.content, u.username FROM comments c JOIN users u ON c.user_id = u.id WHERE c.tweet_id = ?', (tweet_id,)).fetchall()
|
|
liked_tweet_ids = {like['tweet_id'] for like in db.execute('SELECT tweet_id FROM likes WHERE user_id = ?', (session['user_id'],)).fetchall()}
|
|
return render_template('tweet_detail.html', tweet=tweet, comments=comments, liked_tweet_ids=liked_tweet_ids)
|
|
|
|
@app.route('/comment/<int:tweet_id>', methods=['POST'])
|
|
def comment(tweet_id):
|
|
if 'user_id' not in session:
|
|
return jsonify({'success': False, 'message': 'User not logged in'})
|
|
|
|
content = request.form['content']
|
|
db = get_db()
|
|
db.execute('INSERT INTO comments (content, tweet_id, user_id) VALUES (?, ?, ?)', (content, tweet_id, session['user_id']))
|
|
db.commit()
|
|
return jsonify({'success': True})
|
|
|
|
|
|
@app.route('/like/<int:tweet_id>', methods=['POST'])
|
|
def like(tweet_id):
|
|
if 'user_id' not in session:
|
|
return jsonify({'success': False, 'message': 'User not logged in'}), 401
|
|
|
|
db = get_db()
|
|
user_id = session['user_id']
|
|
|
|
try:
|
|
existing_like = db.execute('SELECT * FROM likes WHERE user_id = ? AND tweet_id = ?', (user_id, tweet_id)).fetchone()
|
|
|
|
if existing_like:
|
|
db.execute('DELETE FROM likes WHERE user_id = ? AND tweet_id = ?', (user_id, tweet_id))
|
|
db.execute('UPDATE tweets SET likes = likes - 1 WHERE id = ?', (tweet_id,))
|
|
else:
|
|
db.execute('INSERT INTO likes (user_id, tweet_id) VALUES (?, ?)', (user_id, tweet_id))
|
|
db.execute('UPDATE tweets SET likes = likes + 1 WHERE id = ?', (tweet_id,))
|
|
|
|
db.commit()
|
|
updated_likes = db.execute('SELECT likes FROM tweets WHERE id = ?', (tweet_id,)).fetchone()['likes']
|
|
return jsonify({'success': True, 'likes': updated_likes})
|
|
except Exception as e:
|
|
db.rollback()
|
|
print(f"Error in like route: {str(e)}")
|
|
return jsonify({'success': False, 'message': 'An error occurred while processing your request'}), 500
|
|
|
|
@app.route('/profile/<username>')
|
|
def profile(username):
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('login'))
|
|
|
|
db = get_db()
|
|
user = db.execute('SELECT * FROM users WHERE username = ?', (username,)).fetchone()
|
|
if not user:
|
|
abort(404)
|
|
|
|
user = dict(user)
|
|
|
|
if 'created_at' not in user or not user['created_at']:
|
|
first_tweet = db.execute('SELECT MIN(created_at) as first_tweet_date FROM tweets WHERE user_id = ?', (user['id'],)).fetchone()
|
|
if first_tweet and first_tweet['first_tweet_date']:
|
|
user['created_at'] = datetime.strptime(first_tweet['first_tweet_date'], '%Y-%m-%d %H:%M:%S')
|
|
else:
|
|
user['created_at'] = datetime.now()
|
|
else:
|
|
user['created_at'] = datetime.strptime(user['created_at'], '%Y-%m-%d %H:%M:%S')
|
|
|
|
tweets = db.execute('SELECT * FROM tweets WHERE user_id = ? ORDER BY created_at DESC', (user['id'],)).fetchall()
|
|
tweets = [dict(tweet) for tweet in tweets]
|
|
for tweet in tweets:
|
|
tweet['created_at'] = datetime.strptime(tweet['created_at'], '%Y-%m-%d %H:%M:%S')
|
|
|
|
return render_template('profile.html', user=user, tweets=tweets, get_user_by_id=get_user_by_id)
|
|
|
|
|
|
@app.route('/change_profile_picture', methods=['POST'])
|
|
def change_profile_picture():
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('login'))
|
|
|
|
user_id = session['user_id']
|
|
|
|
if 'profile_picture' in request.files:
|
|
file = request.files['profile_picture']
|
|
if file and allowed_file(file.filename):
|
|
filename = secure_filename(file.filename)
|
|
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
|
|
file.save(file_path)
|
|
|
|
db = get_db()
|
|
db.execute("UPDATE users SET pfp = ? WHERE id = ?", (filename, user_id))
|
|
db.commit()
|
|
flash("Profile picture updated successfully.")
|
|
else:
|
|
flash("Invalid file type. Please upload a PNG, JPG, JPEG, or GIF.")
|
|
else:
|
|
flash("No file uploaded.")
|
|
|
|
return redirect(url_for('profile'))
|
|
|
|
@app.route('/change_banner', methods=['POST'])
|
|
def change_banner():
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('login'))
|
|
|
|
user_id = session['user_id']
|
|
if 'banner' in request.files:
|
|
file = request.files['banner']
|
|
if file and allowed_file(file.filename):
|
|
filename = secure_filename(file.filename)
|
|
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
|
|
file.save(file_path)
|
|
|
|
db = get_db()
|
|
db.execute("UPDATE users SET banner = ? WHERE id = ?", (filename, user_id))
|
|
db.commit()
|
|
flash("Banner updated successfully.")
|
|
else:
|
|
flash("Invalid file type. Please upload a PNG, JPG, JPEG, or GIF.")
|
|
else:
|
|
flash("No file uploaded.")
|
|
|
|
return redirect(url_for('profile'))
|
|
|
|
@app.route('/edit_tweet/<int:tweet_id>', methods=['GET', 'POST'])
|
|
def edit_tweet(tweet_id):
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('login'))
|
|
|
|
db = get_db()
|
|
tweet = db.execute('SELECT * FROM tweets WHERE id = ? AND user_id = ?', (tweet_id, session['user_id'])).fetchone()
|
|
|
|
if request.method == 'POST':
|
|
new_content = request.form['content']
|
|
db.execute('UPDATE tweets SET content = ? WHERE id = ?', (new_content, tweet_id))
|
|
db.commit()
|
|
return redirect(url_for('index'))
|
|
|
|
return render_template('edit_tweet.html', tweet=tweet)
|
|
|
|
@app.route('/delete_tweet/<int:tweet_id>', methods=['POST'])
|
|
def delete_tweet(tweet_id):
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('login'))
|
|
|
|
db = get_db()
|
|
db.execute('DELETE FROM tweets WHERE id = ? AND user_id = ?', (tweet_id, session['user_id']))
|
|
db.commit()
|
|
return redirect(url_for('index'))
|
|
|
|
@app.route('/groups')
|
|
def groups():
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('login'))
|
|
db = get_db()
|
|
search_query = request.args.get('search', '')
|
|
if search_query:
|
|
groups = db.execute('SELECT * FROM groups WHERE name LIKE ? ORDER BY name', ('%' + search_query + '%',)).fetchall()
|
|
else:
|
|
groups = db.execute('SELECT * FROM groups ORDER BY name').fetchall()
|
|
return render_template('g.html', groups=groups, get_user_by_id=get_user_by_id, search_query=search_query)
|
|
|
|
@app.route('/new_group', methods=['POST'])
|
|
def new_group():
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('login'))
|
|
|
|
name = request.form['group_name']
|
|
vanity_url = request.form['vanity_url']
|
|
description = request.form['description']
|
|
|
|
db = get_db()
|
|
try:
|
|
db.execute('INSERT INTO groups (name, vanity_url, description) VALUES (?, ?, ?)', (name, vanity_url, description))
|
|
db.commit()
|
|
|
|
group_id = db.execute('SELECT last_insert_rowid()').fetchone()[0]
|
|
db.execute('INSERT INTO group_members (group_id, user_id) VALUES (?, ?)', (group_id, session['user_id']))
|
|
db.commit()
|
|
|
|
if 'group_picture' in request.files:
|
|
file = request.files['group_picture']
|
|
if file and allowed_file(file.filename):
|
|
filename = secure_filename(file.filename)
|
|
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
|
|
file.save(file_path)
|
|
db.execute('UPDATE groups SET avatar = ? WHERE id = ?', (filename, group_id))
|
|
db.commit()
|
|
|
|
flash('Group created successfully!', 'success')
|
|
except sqlite3.IntegrityError:
|
|
flash('Group name or vanity URL already exists. Please choose a different one.', 'error')
|
|
|
|
return redirect(url_for('groups'))
|
|
|
|
@app.route('/group/<vanity_url>')
|
|
def group_detail(vanity_url):
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('login'))
|
|
|
|
db = get_db()
|
|
group = db.execute('SELECT * FROM groups WHERE vanity_url = ?', (vanity_url,)).fetchone()
|
|
if not group:
|
|
abort(404)
|
|
|
|
posts = db.execute('''
|
|
SELECT p.*, u.username, u.pfp
|
|
FROM posts p
|
|
JOIN users u ON p.user_id = u.id
|
|
WHERE p.group_id = ?
|
|
ORDER BY p.created_at DESC
|
|
''', (group['id'],)).fetchall()
|
|
|
|
is_member = db.execute('SELECT * FROM group_members WHERE group_id = ? AND user_id = ?',
|
|
(group['id'], session['user_id'])).fetchone() is not None
|
|
|
|
return render_template('group_detail.html', group=group, posts=posts, is_member=is_member, get_user_by_id=get_user_by_id)
|
|
|
|
|
|
@app.route('/join_group/<int:group_id>', methods=['POST'])
|
|
def join_group(group_id):
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('login'))
|
|
|
|
db = get_db()
|
|
db.execute('INSERT INTO group_members (group_id, user_id) VALUES (?, ?)', (group_id, session['user_id']))
|
|
db.commit()
|
|
|
|
group = db.execute('SELECT vanity_url FROM groups WHERE id = ?', (group_id,)).fetchone()
|
|
flash('You have joined the group!', 'success')
|
|
return redirect(url_for('group_detail', vanity_url=group['vanity_url']))
|
|
|
|
@app.route('/leave_group/<int:group_id>', methods=['POST'])
|
|
def leave_group(group_id):
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('login'))
|
|
db = get_db()
|
|
db.execute('DELETE FROM group_members WHERE user_id = ? AND group_id = ?', (session['user_id'], group_id))
|
|
db.commit()
|
|
group = db.execute('SELECT * FROM groups WHERE id = ?', (group_id,)).fetchone()
|
|
return redirect(url_for('group_detail', vanity_url=group['vanity_url']))
|
|
|
|
@app.route('/post_in_group/<int:group_id>', methods=['POST'])
|
|
def post_in_group(group_id):
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('login'))
|
|
|
|
db = get_db()
|
|
is_member = db.execute('SELECT * FROM group_members WHERE group_id = ? AND user_id = ?',
|
|
(group_id, session['user_id'])).fetchone() is not None
|
|
|
|
if not is_member:
|
|
flash('You must be a member of the group to post.', 'error')
|
|
else:
|
|
content = request.form['content']
|
|
db.execute('INSERT INTO posts (user_id, group_id, content) VALUES (?, ?, ?)',
|
|
(session['user_id'], group_id, content))
|
|
db.commit()
|
|
flash('Your post has been added to the group!', 'success')
|
|
|
|
group = db.execute('SELECT vanity_url FROM groups WHERE id = ?', (group_id,)).fetchone()
|
|
return redirect(url_for('group_detail', vanity_url=group['vanity_url']))
|
|
|
|
|
|
@app.route('/dms')
|
|
def dms():
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('login'))
|
|
|
|
db = get_db()
|
|
page = request.args.get('page', 1, type=int)
|
|
per_page = 10
|
|
offset = (page - 1) * per_page
|
|
|
|
conversations = db.execute('''
|
|
SELECT DISTINCT
|
|
CASE
|
|
WHEN sender_id = ? THEN receiver_id
|
|
ELSE sender_id
|
|
END AS other_user_id,
|
|
MAX(created_at) as last_message_time
|
|
FROM messages
|
|
WHERE sender_id = ? OR receiver_id = ?
|
|
GROUP BY other_user_id
|
|
ORDER BY last_message_time DESC
|
|
LIMIT ? OFFSET ?
|
|
''', (session['user_id'], session['user_id'], session['user_id'], per_page, offset)).fetchall()
|
|
|
|
dms = []
|
|
for conv in conversations:
|
|
other_user = get_user_by_id(conv['other_user_id'])
|
|
dms.append({
|
|
'username': other_user['username'],
|
|
'pfp': other_user['pfp']
|
|
})
|
|
|
|
has_more = len(dms) == per_page
|
|
return render_template('dm.html', dms=dms, page=page, has_more=has_more)
|
|
|
|
@app.route('/dm/<username>')
|
|
def dm_conversation(username):
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('login'))
|
|
|
|
db = get_db()
|
|
other_user = db.execute('SELECT * FROM users WHERE username = ?', (username,)).fetchone()
|
|
if not other_user:
|
|
abort(404)
|
|
|
|
messages = db.execute('''
|
|
SELECT m.*, u.username, u.pfp
|
|
FROM messages m
|
|
JOIN users u ON m.sender_id = u.id
|
|
WHERE (m.sender_id = ? AND m.receiver_id = ?) OR (m.sender_id = ? AND m.receiver_id = ?)
|
|
ORDER BY m.created_at ASC
|
|
''', (session['user_id'], other_user['id'], other_user['id'], session['user_id'])).fetchall()
|
|
|
|
return render_template('dm_conversation.html', other_user=other_user, messages=messages)
|
|
|
|
|
|
@app.route('/edit_group/<int:group_id>', methods=['GET', 'POST'])
|
|
def edit_group(group_id):
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('login'))
|
|
|
|
db = get_db()
|
|
group = db.execute('SELECT * FROM groups WHERE id = ?', (group_id,)).fetchone()
|
|
|
|
if request.method == 'POST':
|
|
name = request.form['name']
|
|
description = request.form['description']
|
|
|
|
db.execute('UPDATE groups SET name = ?, description = ? WHERE id = ?', (name, description, group_id))
|
|
db.commit()
|
|
|
|
flash('Group updated successfully!', 'success')
|
|
return redirect(url_for('group_detail', vanity_url=group['vanity_url']))
|
|
|
|
return render_template('edit_group.html', group=group)
|
|
|
|
|
|
@app.route('/dm/<username>/send', methods=['POST'])
|
|
def send_message(username):
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('login'))
|
|
|
|
content = request.form['content']
|
|
image = request.files.get('image')
|
|
|
|
db = get_db()
|
|
receiver = db.execute('SELECT id FROM users WHERE username = ?', (username,)).fetchone()
|
|
|
|
if receiver:
|
|
image_filename = None
|
|
if image and allowed_file(image.filename):
|
|
image_filename = secure_filename(image.filename)
|
|
image.save(os.path.join(app.config['UPLOAD_FOLDER'], image_filename))
|
|
|
|
db.execute('INSERT INTO messages (sender_id, receiver_id, content, image, created_at) VALUES (?, ?, ?, ?, ?)',
|
|
(session['user_id'], receiver['id'], content, image_filename, datetime.now()))
|
|
db.commit()
|
|
|
|
return redirect(url_for('dm_conversation', username=username))
|
|
|
|
@app.route('/check_new_messages/<username>', methods=['GET'])
|
|
def check_new_messages(username):
|
|
if 'user_id' not in session:
|
|
return jsonify({'success': False, 'message': 'Unauthorized'})
|
|
|
|
db = get_db()
|
|
other_user = db.execute('SELECT id FROM users WHERE username = ?', (username,)).fetchone()
|
|
|
|
if not other_user:
|
|
return jsonify({'success': False, 'message': 'User not found'})
|
|
|
|
last_message_id = request.args.get('last_message_id', 0, type=int)
|
|
|
|
new_messages = db.execute('''
|
|
SELECT m.*, u.username, u.pfp
|
|
FROM messages m
|
|
JOIN users u ON m.sender_id = u.id
|
|
WHERE ((m.sender_id = ? AND m.receiver_id = ?) OR (m.sender_id = ? AND m.receiver_id = ?))
|
|
AND m.id > ?
|
|
ORDER BY m.created_at ASC
|
|
''', (session['user_id'], other_user['id'], other_user['id'], session['user_id'], last_message_id)).fetchall()
|
|
|
|
if new_messages:
|
|
return jsonify({
|
|
'success': True,
|
|
'messages': [{
|
|
'id': message['id'],
|
|
'content': message['content'],
|
|
'sender_id': message['sender_id'],
|
|
'username': message['username'],
|
|
'pfp': message['pfp'],
|
|
'created_at': message['created_at'],
|
|
'image': message['image']
|
|
} for message in new_messages]
|
|
})
|
|
else:
|
|
return jsonify({'success': False, 'message': 'No new messages'})
|
|
|
|
@app.route('/delete_group/<int:group_id>', methods=['POST'])
|
|
def delete_group(group_id):
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('login'))
|
|
|
|
db = get_db()
|
|
db.execute('DELETE FROM groups WHERE id = ?', (group_id,))
|
|
db.commit()
|
|
|
|
return redirect(url_for('groups'))
|
|
|
|
@app.route('/start_dm', methods=['POST'])
|
|
def start_dm():
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('login'))
|
|
|
|
username = request.form['username']
|
|
return redirect(url_for('dm_conversation', username=username))
|
|
|
|
@app.route('/edit_message/<int:message_id>', methods=['POST'])
|
|
def edit_message(message_id):
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('login'))
|
|
|
|
db = get_db()
|
|
message = db.execute('SELECT * FROM messages WHERE id = ? AND sender_id = ?', (message_id, session['user_id'])).fetchone()
|
|
if not message:
|
|
abort(404)
|
|
|
|
new_content = request.form['content']
|
|
db.execute('UPDATE messages SET content = ? WHERE id = ?', (new_content, message_id))
|
|
db.commit()
|
|
|
|
return redirect(url_for('dm_conversation', username=request.form['username']))
|
|
|
|
@app.route('/delete_message/<int:message_id>', methods=['POST'])
|
|
def delete_message(message_id):
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('login'))
|
|
|
|
db = get_db()
|
|
db.execute('DELETE FROM messages WHERE id = ? AND sender_id = ?', (message_id, session['user_id']))
|
|
db.commit()
|
|
|
|
return redirect(url_for('dm_conversation', username=request.form['username']))
|
|
|
|
@app.route('/admin')
|
|
def admin_panel():
|
|
if 'user_id' not in session:
|
|
return redirect(url_for('login'))
|
|
|
|
user = get_user_by_id(session['user_id'])
|
|
if not user or user['username'] != '123':
|
|
return redirect(url_for('login'))
|
|
|
|
db = get_db()
|
|
users = db.execute('SELECT * FROM users ORDER BY created_at DESC').fetchall()
|
|
tweets = db.execute('''
|
|
SELECT t.*, u.username
|
|
FROM tweets t
|
|
JOIN users u ON t.user_id = u.id
|
|
ORDER BY t.created_at DESC
|
|
''').fetchall()
|
|
groups = db.execute('''
|
|
SELECT g.*, COUNT(gm.user_id) as member_count
|
|
FROM groups g
|
|
LEFT JOIN group_members gm ON g.id = gm.group_id
|
|
GROUP BY g.id
|
|
ORDER BY g.created_at DESC
|
|
''').fetchall()
|
|
|
|
user_count = db.execute('SELECT COUNT(*) as count FROM users').fetchone()['count']
|
|
tweet_count = db.execute('SELECT COUNT(*) as count FROM tweets').fetchone()['count']
|
|
group_count = db.execute('SELECT COUNT(*) as count FROM groups').fetchone()['count']
|
|
|
|
return render_template('admin_panel.html',
|
|
users=users,
|
|
tweets=tweets,
|
|
groups=groups,
|
|
user_count=user_count,
|
|
tweet_count=tweet_count,
|
|
group_count=group_count)
|
|
|
|
def get_all_users():
|
|
db = get_db()
|
|
users = db.execute('SELECT * FROM users').fetchall()
|
|
return [dict(user) for user in users]
|
|
|
|
def get_all_tweets():
|
|
db = get_db()
|
|
tweets = db.execute('SELECT t.*, u.username FROM tweets t JOIN users u ON t.user_id = u.id ORDER BY t.created_at DESC').fetchall()
|
|
return [dict(tweet) for tweet in tweets]
|
|
|
|
def get_all_groups():
|
|
db = get_db()
|
|
groups = db.execute('SELECT * FROM groups ORDER BY created_at DESC').fetchall()
|
|
return [dict(group) for group in groups]
|
|
|
|
@app.route('/admin/delete_user/<int:user_id>', methods=['POST'])
|
|
def delete_user(user_id):
|
|
if 'user_id' not in session or get_user_by_id(session['user_id']).username != 'avery':
|
|
return jsonify({'success': False, 'message': 'Unauthorized'})
|
|
|
|
db = get_db()
|
|
db.execute('DELETE FROM users WHERE id = ?', (user_id,))
|
|
db.commit()
|
|
return jsonify({'success': True})
|
|
|
|
@app.route('/admin/delete_tweet/<int:tweet_id>', methods=['POST'])
|
|
def delete_tweet_admin(tweet_id):
|
|
if 'user_id' not in session or get_user_by_id(session['user_id']).username != 'avery':
|
|
return jsonify({'success': False, 'message': 'Unauthorized'})
|
|
|
|
db = get_db()
|
|
db.execute('DELETE FROM tweets WHERE id = ?', (tweet_id,))
|
|
db.commit()
|
|
return jsonify({'success': True})
|
|
|
|
@app.route('/admin/delete_group/<int:group_id>', methods=['POST'])
|
|
def delete_group_admin(group_id):
|
|
if 'user_id' not in session or get_user_by_id(session['user_id'])['username'] != 'avery':
|
|
return jsonify({'success': False, 'message': 'Unauthorized'})
|
|
|
|
db = get_db()
|
|
db.execute('DELETE FROM groups WHERE id = ?', (group_id,))
|
|
db.execute('DELETE FROM group_members WHERE group_id = ?', (group_id,))
|
|
db.execute('DELETE FROM posts WHERE group_id = ?', (group_id,))
|
|
db.commit()
|
|
return jsonify({'success': True})
|
|
|
|
|
|
if __name__ == '__main__':
|
|
app.run(debug=True, port=4771)
|