1
0
forked from cgcristi/aCloud
aCloudGaborV77/app.py

551 lines
20 KiB
Python
Raw Normal View History

from flask import Flask, request, jsonify, send_from_directory, render_template, url_for, redirect, send_file
from werkzeug.utils import secure_filename
import shortuuid
import os
from datetime import datetime
import zipfile
import sqlite3
import threading
import time
import shutil
from datetime import timedelta
from pygments import highlight
from pygments.lexers import get_lexer_by_name, guess_lexer
from pygments.formatters import HtmlFormatter
from pygments.util import ClassNotFound
import json
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
import hashlib
app = Flask(__name__)
app.secret_key = 'your_secret_key_here' # Add this line
UPLOAD_FOLDER = './uploads'
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
DATABASE = 'data.db'
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
# Database setup and helper functions
def get_db():
db = getattr(threading.current_thread(), '_database', None)
if db is None:
db = threading.current_thread()._database = sqlite3.connect(DATABASE)
return db
def init_db():
with app.app_context():
db = get_db()
with app.open_resource('schema.sql', mode='r') as f:
db.cursor().executescript(f.read())
db.commit()
@app.teardown_appcontext
def close_connection(exception):
db = getattr(threading.current_thread(), '_database', None)
if db is not None:
db.close()
# Initialize database
init_db()
# Add this function to delete old files
def delete_old_files():
while True:
db = get_db()
cursor = db.cursor()
# Delete files older than 30 days
thirty_days_ago = datetime.now() - timedelta(days=30)
cursor.execute("SELECT vanity, type, data FROM content WHERE created_at < ?", (thirty_days_ago,))
old_files = cursor.fetchall()
for vanity, content_type, data in old_files:
if content_type == 'file':
file_path = os.path.join(app.config['UPLOAD_FOLDER'], f'{vanity}_{data}')
if os.path.exists(file_path):
os.remove(file_path)
elif content_type == 'folder':
folder_path = os.path.join(app.config['UPLOAD_FOLDER'], vanity)
if os.path.exists(folder_path):
shutil.rmtree(folder_path)
cursor.execute("DELETE FROM content WHERE created_at < ?", (thirty_days_ago,))
db.commit()
time.sleep(86400) # Sleep for 24 hours
# Start the cleanup thread
cleanup_thread = threading.Thread(target=delete_old_files)
cleanup_thread.daemon = True
cleanup_thread.start()
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
class User(UserMixin):
def __init__(self, id, username, password_hash):
self.id = id
self.username = username
self.password_hash = password_hash
@staticmethod
def hash_password(password):
salt = os.urandom(32)
key = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
return salt + key
@staticmethod
def verify_password(stored_password, provided_password):
salt = stored_password[:32]
stored_key = stored_password[32:]
new_key = hashlib.pbkdf2_hmac('sha256', provided_password.encode('utf-8'), salt, 100000)
return stored_key == new_key
@login_manager.user_loader
def load_user(user_id):
db = get_db()
cursor = db.cursor()
cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
user = cursor.fetchone()
if user:
return User(user[0], user[1], user[2])
return None
@app.route('/')
def index():
return render_template('index.html')
@app.route('/content/<vanity>')
def content(vanity):
db = get_db()
cursor = db.cursor()
cursor.execute("SELECT * FROM content WHERE vanity = ?", (vanity,))
target = cursor.fetchone()
if target:
content_type, content_data = target[1], target[2]
if content_type == 'pastebin':
try:
lexer = guess_lexer(content_data)
language = lexer.aliases[0]
except ClassNotFound:
language = 'text'
lexer = get_lexer_by_name(language)
formatter = HtmlFormatter(style='monokai', linenos=True, cssclass="source")
highlighted_code = highlight(content_data, lexer, formatter)
css = formatter.get_style_defs('.source')
return render_template('content.html',
highlighted_content=highlighted_code,
css=css,
raw_content=content_data,
created_at=target[3],
vanity=vanity,
language=language)
elif content_type == 'file':
file_path = os.path.join(app.config['UPLOAD_FOLDER'], f'{vanity}_{content_data}')
file_info = {
'name': content_data,
'size': os.path.getsize(file_path),
'modified_at': datetime.fromtimestamp(os.path.getmtime(file_path)).strftime('%Y-%m-%d %H:%M:%S'),
'url': url_for('download_file', vanity=vanity)
}
return render_template('file.html', **file_info)
elif content_type == 'url':
return render_template('content.html', url=content_data)
return 'Not Found', 404
@app.route('/download/<vanity>', methods=['GET'])
def download_file(vanity):
db = get_db()
cursor = db.cursor()
cursor.execute("SELECT * FROM content WHERE vanity = ? AND type = 'file'", (vanity,))
target = cursor.fetchone()
if target:
filename = f'{vanity}_{target[2]}'
return send_from_directory(app.config['UPLOAD_FOLDER'], filename, as_attachment=True)
return 'Not Found', 404
@app.route('/upload/pastebin', methods=['POST'])
def upload_pastebin():
content = request.form['content']
2024-09-08 14:12:10 +02:00
vanity = shortuuid.uuid()[:6]
created_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
db = get_db()
cursor = db.cursor()
cursor.execute("INSERT INTO content (vanity, type, data, created_at) VALUES (?, ?, ?, ?)",
(vanity, 'pastebin', content, created_at))
db.commit()
return jsonify({'vanity': vanity})
@app.route('/upload/file', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return 'No file part', 400
file = request.files['file']
if file.filename == '':
return 'No selected file', 400
if file:
vanity = shortuuid.uuid()[:6]
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], f'{vanity}_{filename}')
file.save(filepath)
db = get_db()
cursor = db.cursor()
cursor.execute("INSERT INTO content (vanity, type, data) VALUES (?, ?, ?)",
(vanity, 'file', filename))
db.commit()
return jsonify({'vanity': vanity})
def save_file(file, folder_path):
filename = secure_filename(file.filename)
file_path = os.path.join(folder_path, filename)
file.save(file_path)
def handle_uploaded_folder(files, base_path):
for file in files:
if file.filename.endswith('/'):
subfolder_path = os.path.join(base_path, secure_filename(file.filename))
os.makedirs(subfolder_path, exist_ok=True)
handle_uploaded_folder(request.files.getlist(file.filename), subfolder_path)
else:
save_file(file, base_path)
@app.route('/upload/folder', methods=['POST'])
def upload_folder():
if 'file' not in request.files:
return 'No files uploaded', 400
files = request.files.getlist('file')
if not files:
return 'No files selected', 400
vanity = shortuuid.uuid()[:6]
folder_path = os.path.join(app.config['UPLOAD_FOLDER'], vanity)
os.makedirs(folder_path)
handle_uploaded_folder(files, folder_path)
db = get_db()
cursor = db.cursor()
cursor.execute("INSERT INTO content (vanity, type, data) VALUES (?, ?, ?)",
(vanity, 'folder', ','.join([file.filename for file in files])))
db.commit()
return jsonify({'vanity': vanity})
@app.route('/shorten', methods=['POST'])
def shorten_url():
original_url = request.form['url']
vanity = shortuuid.uuid()[:6]
db = get_db()
cursor = db.cursor()
cursor.execute("INSERT INTO content (vanity, type, data) VALUES (?, ?, ?)",
(vanity, 'url', original_url))
db.commit()
2024-09-08 14:12:10 +02:00
return jsonify({'vanity': vanity})
@app.route('/<vanity>', methods=['GET'])
def redirect_vanity(vanity):
db = get_db()
cursor = db.cursor()
cursor.execute("SELECT * FROM content WHERE vanity = ?", (vanity,))
target = cursor.fetchone()
2024-09-09 09:25:58 +02:00
if target:
content_type, content_data = target[1], target[2]
if content_type == 'pastebin':
try:
lexer = guess_lexer(content_data)
language = lexer.aliases[0]
except ClassNotFound:
language = 'text'
lexer = get_lexer_by_name(language)
formatter = HtmlFormatter(style='monokai', linenos=True, cssclass="source")
highlighted_code = highlight(content_data, lexer, formatter)
css = formatter.get_style_defs('.source')
return render_template('content.html',
highlighted_content=highlighted_code,
css=css,
raw_content=content_data,
created_at=target[3],
vanity=vanity,
language=language)
elif content_type == 'file':
file_path = os.path.join(app.config['UPLOAD_FOLDER'], f'{vanity}_{content_data}')
file_info = {
'name': content_data,
'size': os.path.getsize(file_path),
'modified_at': datetime.fromtimestamp(os.path.getmtime(file_path)).strftime('%Y-%m-%d %H:%M:%S'),
'url': url_for('download_file', vanity=vanity)
2024-09-08 14:04:22 +02:00
}
2024-09-09 09:25:58 +02:00
return render_template('file.html', **file_info)
elif content_type == 'folder':
return redirect(url_for('folder_content', vanity=vanity))
elif content_type == 'url':
return render_template('content.html', content=content_data, url=content_data)
2024-09-09 09:25:58 +02:00
return render_template('404.html'), 404
2024-09-08 14:03:18 +02:00
@app.route('/<vanity>/raw', methods=['GET'])
2024-09-09 09:25:58 +02:00
def raw_vanity(vanity):
db = get_db()
cursor = db.cursor()
cursor.execute("SELECT * FROM content WHERE vanity = ? AND type = 'pastebin'", (vanity,))
target = cursor.fetchone()
2024-09-09 09:25:58 +02:00
2024-09-08 14:03:18 +02:00
if target:
return target[2], 200, {'Content-Type': 'text/plain; charset=utf-8'}
return 'Not Found', 404
2024-09-09 09:25:58 +02:00
@app.route('/folder/<vanity>', methods=['GET'])
def folder_content(vanity):
db = get_db()
cursor = db.cursor()
cursor.execute("SELECT * FROM content WHERE vanity = ? AND type = 'folder'", (vanity,))
target = cursor.fetchone()
if target:
folder_path = os.path.join(app.config['UPLOAD_FOLDER'], vanity)
files = []
for root, _, filenames in os.walk(folder_path):
for filename in filenames:
file_path = os.path.join(root, filename)
relative_path = os.path.relpath(file_path, folder_path)
file_url = url_for('download_folder_file', vanity=vanity, file_name=relative_path)
files.append({'name': relative_path, 'url': file_url})
# Pagination
per_page = 10
page = int(request.args.get('page', 1))
start = (page - 1) * per_page
end = start + per_page
total_files = len(files)
files = files[start:end]
prev_url = url_for('folder_content', vanity=vanity, page=page-1) if page > 1 else None
next_url = url_for('folder_content', vanity=vanity, page=page+1) if end < total_files else None
download_all_url = url_for('download_folder_as_zip', vanity=vanity)
# Get the current folder name
current_folder = os.path.basename(folder_path)
return render_template('folder.html',
files=files,
prev_url=prev_url,
next_url=next_url,
download_all_url=download_all_url,
current_folder=current_folder)
return 'Not Found', 404
@app.route('/folder/<vanity>/download', methods=['GET'])
def download_folder_as_zip(vanity):
db = get_db()
cursor = db.cursor()
cursor.execute("SELECT * FROM content WHERE vanity = ? AND type = 'folder'", (vanity,))
target = cursor.fetchone()
if target:
folder_path = os.path.join(app.config['UPLOAD_FOLDER'], vanity)
zip_path = os.path.join(app.config['UPLOAD_FOLDER'], f'{vanity}.zip')
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, _, files in os.walk(folder_path):
for file in files:
file_path = os.path.join(root, file)
zipf.write(file_path, os.path.relpath(file_path, folder_path))
return send_from_directory(app.config['UPLOAD_FOLDER'], f'{vanity}.zip', as_attachment=True)
return 'Not Found', 404
@app.route('/folder/<vanity>/<file_name>', methods=['GET'])
def download_folder_file(vanity, file_name):
folder_path = os.path.join(app.config['UPLOAD_FOLDER'], vanity)
file_path = os.path.join(folder_path, file_name)
if os.path.isfile(file_path):
return send_from_directory(folder_path, file_name, as_attachment=True)
return 'Not Found', 404
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
db = get_db()
cursor = db.cursor()
cursor.execute("SELECT * FROM users WHERE username = ?", (username,))
if cursor.fetchone():
return "Username already exists"
hashed_password = User.hash_password(password)
cursor.execute("INSERT INTO users (username, password_hash) VALUES (?, ?)",
(username, hashed_password))
db.commit()
return redirect(url_for('login'))
return render_template('register.html')
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
db = get_db()
cursor = db.cursor()
cursor.execute("SELECT * FROM users WHERE username = ?", (username,))
user = cursor.fetchone()
if user and User.verify_password(user[2], password):
login_user(User(user[0], user[1], user[2]))
return redirect(url_for('user_files', username=username))
return "Invalid username or password"
return render_template('login.html')
@app.route('/logout')
@login_required
def logout():
logout_user()
return redirect(url_for('index'))
@app.route('/user/<username>')
def user_files(username):
if current_user.is_authenticated and current_user.username == username:
user_folder = os.path.join(app.config['UPLOAD_FOLDER'], username)
if not os.path.exists(user_folder):
os.makedirs(user_folder)
files = os.listdir(user_folder)
return render_template('user_files.html', username=username, files=files)
return "Unauthorized", 401
@app.route('/user/<username>/upload', methods=['POST'])
@login_required
def upload_user_file(username):
if current_user.username != username:
return "Unauthorized", 401
if 'file' not in request.files:
return 'No file part', 400
file = request.files['file']
if file.filename == '':
return 'No selected file', 400
if file:
filename = secure_filename(file.filename)
file_path = os.path.join(app.config['UPLOAD_FOLDER'], username, filename)
file.save(file_path)
return redirect(url_for('user_files', username=username))
@app.route('/user/<username>/delete/<filename>', methods=['POST'])
@login_required
def delete_user_file(username, filename):
if current_user.username != username:
return "Unauthorized", 401
file_path = os.path.join(app.config['UPLOAD_FOLDER'], username, filename)
if os.path.exists(file_path):
os.remove(file_path)
return redirect(url_for('user_files', username=username))
@app.route('/user/<username>/rename', methods=['POST'])
@login_required
def rename_user_file(username):
if current_user.username != username:
return "Unauthorized", 401
old_filename = request.form['old_filename']
new_filename = secure_filename(request.form['new_filename'])
old_path = os.path.join(app.config['UPLOAD_FOLDER'], username, old_filename)
new_path = os.path.join(app.config['UPLOAD_FOLDER'], username, new_filename)
if os.path.exists(old_path):
os.rename(old_path, new_path)
return redirect(url_for('user_files', username=username))
@app.route('/<username>')
@app.route('/<username>/')
@app.route('/<username>/<path:filename>')
def serve_user_page(username, filename=None):
print(f"Accessing user page: {username}, filename: {filename}") # Debug print
# Check if the username exists in the database
db = get_db()
cursor = db.cursor()
cursor.execute("SELECT * FROM users WHERE username = ?", (username,))
user = cursor.fetchone()
if not user:
print(f"User {username} not found") # Debug print
return "User not found", 404
user_folder = os.path.join(app.config['UPLOAD_FOLDER'], username)
print(f"User folder path: {user_folder}") # Debug print
if not os.path.exists(user_folder):
print(f"User folder does not exist for {username}") # Debug print
os.makedirs(user_folder) # Create the folder if it doesn't exist
if filename is None or filename == '':
# Try to serve index.html
index_path = os.path.join(user_folder, 'index.html')
print(f"Checking for index.html at: {index_path}") # Debug print
if os.path.exists(index_path):
print(f"Serving index.html for {username}") # Debug print
return send_file(index_path)
else:
print(f"No index.html found, listing files for {username}") # Debug print
# If no index.html, list all files
files = os.listdir(user_folder)
print(f"Files in {username}'s folder: {files}") # Debug print
return render_template('user_files_public.html', username=username, files=files)
else:
# Serve the requested file
file_path = os.path.join(user_folder, filename)
print(f"Attempting to serve file: {file_path}") # Debug print
if os.path.exists(file_path) and os.path.isfile(file_path):
print(f"Serving file: {file_path}") # Debug print
return send_file(file_path)
else:
print(f"File not found: {file_path}") # Debug print
return "File not found", 404
@app.route('/debug/users')
def debug_users():
db = get_db()
cursor = db.cursor()
cursor.execute("SELECT username FROM users")
users = cursor.fetchall()
user_files = {}
for user in users:
username = user[0]
user_folder = os.path.join(app.config['UPLOAD_FOLDER'], username)
if os.path.exists(user_folder):
user_files[username] = os.listdir(user_folder)
else:
user_files[username] = []
return jsonify(user_files)
@app.route('/user/<username>/edit/<path:filename>', methods=['GET', 'POST'])
@login_required
def edit_file(username, filename):
if current_user.username != username:
return "Unauthorized", 401
file_path = os.path.join(app.config['UPLOAD_FOLDER'], username, filename)
if request.method == 'POST':
content = request.form['content']
with open(file_path, 'w') as file:
file.write(content)
return redirect(url_for('user_files', username=username))
if os.path.exists(file_path) and os.path.isfile(file_path):
with open(file_path, 'r') as file:
content = file.read()
return render_template('edit_file.html', username=username, filename=filename, content=content)
else:
return "File not found", 404
if __name__ == '__main__':
app.run(debug=True, port=7123)