app.py gelöscht

This commit is contained in:
2025-06-04 06:43:30 +00:00
parent b38153fe33
commit 71ddae09fa

272
app.py
View File

@ -1,272 +0,0 @@
from flask import Flask, render_template, request, redirect, url_for, session, flash
from flask_socketio import SocketIO
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import generate_password_hash, check_password_hash
from terminal_backend import init_terminal_routes
from cryptography.fernet import Fernet
from flask import jsonify, send_from_directory
from werkzeug.utils import secure_filename
from dotenv import load_dotenv
import os
import uuid
import ipaddress
load_dotenv()
FERNET_SECRET = os.getenv('FERNET_SECRET')
SECRET_KEY = os.getenv('SECRET_KEY')
IP_ADDRESS = os.getenv('IP_ADDRESS')
PORT = os.getenv('PORT')
UPLOAD_FOLDER = os.path.join(os.path.dirname(__file__), 'sftp-cache')
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
app = Flask(__name__)
app.secret_key = SECRET_KEY
fernet = Fernet(FERNET_SECRET.encode())
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///vaults.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
db = SQLAlchemy(app)
socketio = SocketIO(app, manage_session=False)
user_vaults = db.Table(
'user_vaults',
db.Column('user_id', db.Integer, db.ForeignKey('local_user.id')),
db.Column('vault_id', db.Integer, db.ForeignKey('vault.id'))
)
class Vault(db.Model):
id = db.Column(db.String(36), primary_key=True)
name = db.Column(db.String(100), nullable=False)
host_ip = db.Column(db.String(100))
hostname = db.Column(db.String(100))
port = db.Column(db.Integer, default=22)
user = db.Column(db.String(100))
password_hash = db.Column(db.String(255))
ssh_key = db.Column(db.Text)
auth_type = db.Column(db.String(50))
class LocalUser(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(100), unique=True, nullable=False)
password_hash = db.Column(db.String(255), nullable=False)
password_changed = db.Column(db.Boolean, default=False)
role = db.Column(db.String(20), default='user')
vaults = db.relationship('Vault', secondary=user_vaults, backref='users')
@app.context_processor
def inject_user():
if 'user' in session:
current_user = LocalUser.query.filter_by(username=session['user']).first()
return {'current_user': current_user}
return {}
@app.route('/', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
user = LocalUser.query.filter_by(username=username).first()
if user and check_password_hash(user.password_hash, password):
session['user'] = username
if not user.password_changed:
return redirect(url_for('change_password'))
return redirect(url_for('view_vaults'))
flash("Login fehlgeschlagen", "error")
return render_template('login.html')
@app.route('/change_password', methods=['GET', 'POST'])
def change_password():
if 'user' not in session:
return redirect(url_for('login'))
if request.method == 'POST':
new_password = request.form['new_password']
if new_password:
user = LocalUser.query.filter_by(username=session['user']).first()
user.password_hash = generate_password_hash(new_password)
user.password_changed = True
db.session.commit()
return redirect(url_for('view_vaults'))
return render_template('change_password.html')
@app.route('/vaults')
def view_vaults():
if 'user' not in session:
return redirect(url_for('login'))
current_user = LocalUser.query.filter_by(username=session['user']).first()
vaults = current_user.vaults
return render_template('dashboard.html', title="Hosts verwalten", vaults=vaults, page='vaults', current_user=current_user)
@app.route('/vaults/add', methods=['POST'])
def add_vault():
name = request.form.get('vault_name')
host_or_name = request.form.get('host_or_name')
port = request.form.get('port')
user_field = request.form.get('user')
password = request.form.get('password')
ssh_key = request.form.get('ssh_key')
auth_type = request.form.get('auth_type')
try:
ipaddress.ip_address(host_or_name)
host_ip = host_or_name
hostname = None
except ValueError:
host_ip = None
hostname = host_or_name
encrypted_pw = fernet.encrypt(password.encode()).decode() if password else None
if name:
new_vault = Vault(
id=str(uuid.uuid4()),
name=name,
host_ip=host_ip,
hostname=hostname,
port=int(port) if port else 22,
user=user_field,
password_hash=encrypted_pw,
ssh_key=ssh_key,
auth_type=auth_type
)
db.session.add(new_vault)
current_user = LocalUser.query.filter_by(username=session['user']).first()
current_user.vaults.append(new_vault)
db.session.commit()
return redirect(url_for('view_vaults'))
@app.route('/users', methods=['GET', 'POST'])
def users():
if 'user' not in session:
return redirect(url_for('login'))
current_user = LocalUser.query.filter_by(username=session['user']).first()
if request.method == 'POST':
if 'password' in request.form:
new_password = request.form.get('password')
if new_password:
current_user.password_hash = generate_password_hash(new_password)
current_user.password_changed = True
db.session.commit()
flash("Passwort wurde geändert.", "success")
elif 'new_username' in request.form:
new_username = request.form.get('new_username')
if new_username and new_username != current_user.username:
if LocalUser.query.filter_by(username=new_username).first():
flash("Benutzername bereits vergeben.", "error")
else:
current_user.username = new_username
session['user'] = new_username
db.session.commit()
flash("Benutzername geändert.", "success")
return render_template('profil.html', title="Profil", current_user=current_user, page='users')
@app.route('/sftp', methods=['GET'])
def sftp():
if 'user' not in session:
return redirect(url_for('login'))
current_user = LocalUser.query.filter_by(username=session['user']).first()
vaults = Vault.query.all() if current_user.role != 'user' else current_user.vaults
return render_template('sftp-modul.html', vaults=vaults)
@app.route('/users/delete/<username>', methods=['POST'])
def delete_user(username):
return redirect(url_for('view_vaults'))
@app.route('/vaults/delete/<vault_id>', methods=['POST'])
def delete_vault(vault_id):
vault = Vault.query.get(vault_id)
if vault:
db.session.delete(vault)
db.session.commit()
return redirect(url_for('view_vaults'))
@app.route('/vaults/edit/<vault_id>', methods=['POST'])
def edit_vault(vault_id):
vault = Vault.query.get(vault_id)
if vault:
vault.name = request.form.get("vault_name")
host_or_name = request.form.get("host_or_name")
try:
ipaddress.ip_address(host_or_name)
vault.host_ip = host_or_name
vault.hostname = None
except ValueError:
vault.host_ip = None
vault.hostname = host_or_name
vault.port = int(request.form.get("port")) if request.form.get("port") else 22
vault.user = request.form.get("user")
password = request.form.get("password")
if password:
vault.password_hash = generate_password_hash(password)
vault.ssh_key = request.form.get("ssh_key")
vault.auth_type = request.form.get("auth_type")
db.session.commit()
return redirect(url_for('view_vaults'))
@app.route('/sftp/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return jsonify({'error': 'Keine Datei hochgeladen'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'Leerer Dateiname'}), 400
filename = secure_filename(file.filename)
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
return jsonify({'success': True, 'filename': filename})
@app.route('/sftp/files', methods=['GET'])
def list_uploaded_files():
files = os.listdir(app.config['UPLOAD_FOLDER'])
return jsonify(files)
@app.route('/sftp/delete/<filename>', methods=['POST'])
def delete_uploaded_file(filename):
filepath = os.path.join(app.config['UPLOAD_FOLDER'], secure_filename(filename))
if os.path.exists(filepath):
os.remove(filepath)
return jsonify({'success': True})
return jsonify({'error': 'Datei nicht gefunden'}), 404
@app.route('/sftp/download/<filename>', methods=['GET'])
def download_uploaded_file(filename):
return send_from_directory(app.config['UPLOAD_FOLDER'], secure_filename(filename), as_attachment=True)
@app.route('/logout')
def logout():
session.clear()
return redirect(url_for('login'))
if __name__ == '__main__':
with app.app_context():
db.create_all()
if not LocalUser.query.filter_by(username='admin').first():
hashed_pw = generate_password_hash('changeme')
db.session.add(LocalUser(
username='admin',
password_hash=hashed_pw,
password_changed=False,
role='admin'
))
db.session.commit()
init_terminal_routes(app, socketio, db, Vault, LocalUser, fernet)
socketio.run(app, host=IP_ADDRESS, port=PORT, debug=True)