ifnot pfx_password ornot recipient_email: return jsonify({"error": "Recipient email and PFX password are required."}), 400
# Trigger the background task from tasks import send_pfx_email_task send_pfx_email_task.delay(cert_id, pfx_password, recipient_email)
return jsonify({"message": f"Certificate is being sent to {recipient_email} in the background."}), 202
如果在windows中运行,则需要celery -A tasks.celery_app worker -l info -P eventlet
tasks.py
@celery_app.task(name="tasks.send_pfx_file_with_email") defsend_pfx_email_task(cert_id, pfx_password, recipient_email): """ Celery task to generate and email a PFX file in the background. """ with app.app_context(): crypto_key = CryptoKey.query.get(cert_id) cert_info = Certificate.query.get(cert_id)
ifnot crypto_key ornot crypto_key.certificate_pem ornot crypto_key.private_key_pem ornot cert_info: print(f"Error in task: Could not find certificate or key for {cert_id}") return
body = f"Please find your requested certificate ({cert_info.subject}) attached as a password-protected PFX file.\n\nThe password to open the file is: {pfx_password}" msg.attach(MIMEText(body, 'plain'))
part = MIMEBase('application', 'octet-stream') part.set_payload(pfx_bytes) encoders.encode_base64(part) part.add_header('Content-Disposition', f'attachment; filename="{cert_info.subject}.pfx"') msg.attach(part) with smtplib.SMTP(app.config['SMTP_SERVER'], app.config['SMTP_PORT']) as server: server.starttls() server.login(app.config['SMTP_USERNAME'], app.config['SMTP_PASSWORD']) server.send_message(msg) print(f"Successfully sent PFX for {cert_id} to {recipient_email}")
except Exception as e: print(f"Failed to send email for {cert_id}: {e}") traceback.print_exc()
3.4 容器化
使用docker来打包代码, 用来打包python代码的Dockerfile
FROM python:3.10-slim
WORKDIR /app
RUNmkdir -p /app/data
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
import os import unittest import json import sqlite3 from datetime import datetime, timedelta from app import app, get_db_connection, init_db
classCryptoVaultTestCase(unittest.TestCase): """Test suite for the CryptoVault Flask application."""
defsetUp(self): """Set up a new test client and a temporary database.""" # Use an in-memory SQLite database for testing self.db_fd, app.config['DATABASE'] = ":memory:", ":memory:" app.config['TESTING'] = True self.client = app.test_client()
# The application context is needed to work with the database with app.app_context(): init_db()
deftearDown(self): """Clean up the database after each test.""" # The in-memory database vanishes on its own, so no file to close. pass
deftest_01_init_db(self): """Test if the database initialization creates all necessary tables.""" with app.app_context(): conn = get_db_connection() cursor = conn.cursor() tables = ["keys", "certificates", "history", "crypto_keys"] for table in tables: cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table}'") self.assertIsNotNone(cursor.fetchone(), f"Table '{table}' was not created.") conn.close()
deftest_02_create_and_get_ca(self): """Test the creation and retrieval of the internal CA.""" with app.app_context(): # Test CA creation endpoint response = self.client.post('/api/internal_ca/create') self.assertEqual(response.status_code, 200) json_data = json.loads(response.data) self.assertIn("Internal CA created", json_data['message'])
# Test CA retrieval endpoint response = self.client.get('/api/internal_ca') self.assertEqual(response.status_code, 200) json_data = json.loads(response.data) self.assertIn("CN=cryptovault-ca.local", json_data['subject']) self.assertIn(".pem", json_data['pem'])
deftest_03_apply_for_rsa_key(self): """Test applying for a new RSA key.""" response = self.client.post('/api/apply', data=json.dumps({ "type": "key", "name": "Test RSA Key", "keyType": "RSA-4096" }), content_type='application/json' ) self.assertEqual(response.status_code, 201) json_data = json.loads(response.data) self.assertIn('key created successfully', json_data['message']) key_id = json_data['id']
# Verify the key is in the database with app.app_context(): conn = get_db_connection() key_row = conn.execute("SELECT * FROM keys WHERE id = ?", (key_id,)).fetchone() self.assertIsNotNone(key_row) self.assertEqual(key_row['name'], "Test RSA Key") crypto_row = conn.execute("SELECT * FROM crypto_keys WHERE asset_id = ?", (key_id,)).fetchone() self.assertIsNotNone(crypto_row['private_key_pem']) self.assertIsNotNone(crypto_row['public_key_pem']) conn.close()
使用locust进行压力测试
import random import string import time from locust import HttpUser, task, between, events
# --- Configuration --- # The base URL of your running Flask application HOST_URL = "http://127.0.0.1:5000"
# --- Helper Functions --- defget_random_string(length=128): """Generates a random string of fixed length.""" letters = string.ascii_lowercase + string.digits return''.join(random.choice(letters) for i inrange(length))
classCryptoApiUser(HttpUser): """ A user class that simulates a client interacting with the crypto API. It will first ensure an AES key exists, then continuously test the encrypt and decrypt endpoints. """ # Wait between 0.5 and 2 seconds between tasks wait_time = between(0.5, 2.0) host = HOST_URL
defon_start(self): """ Called when a Locust start event is triggered. This method ensures that a valid AES key exists for the test. It will try to find an existing 'AES-256' key, and if none are found, it will create one. """ self.key_id = None print("Initializing user, finding or creating an AES key...")
try: # 1. Fetch all existing keys with self.client.get("/api/keys", catch_response=True) as response: ifnot response.ok: response.failure("Failed to get keys list.") return keys = response.json() # 2. Find the first active AES-256 key for key in keys: if key.get("type") == "AES-256"and key.get("status") == "Active": self.key_id = key["id"] print(f"User found existing active AES key: {self.key_id}") break # 3. If no key was found, create a new one ifnot self.key_id: print("No active AES key found. Creating a new one for the test.") payload = { "type": "key", "name": f"perf-test-key-{int(time.time())}", "keyType": "AES-256" } with self.client.post("/api/apply", json=payload, catch_response=True) as response: if response.ok: self.key_id = response.json().get("id") print(f"User created new AES key: {self.key_id}") else: response.failure("Failed to create a new AES key for the test.") print("Could not create key. User will be unable to run tasks.") except Exception as e: print(f"An exception occurred during user setup: {e}") # This user will not be able to proceed. self.key_id = None
@task defencrypt_and_decrypt_flow(self): """ This task simulates a full user flow: 1. Encrypt a piece of random data. 2. Decrypt the resulting ciphertext. """ ifnot self.key_id: # If the key setup failed, we can't run the test. # We can skip this task for this user. print("Skipping task: key_id not set.") time.sleep(self.wait_time()) return plaintext = get_random_string(256) # Test with 256 bytes of data ciphertext = None
# --- Encrypt Task --- encrypt_payload = {"keyId": self.key_id, "text": plaintext} with self.client.post( "/api/crypto/encrypt", json=encrypt_payload, name="/api/crypto/encrypt", catch_response=True ) as response: if response.ok: try: ciphertext = response.json().get("result") ifnot ciphertext: response.failure("Encrypt endpoint returned OK but no result.") except Exception: response.failure("Failed to parse JSON from encrypt response.") else: response.failure(f"Encrypt request failed with status {response.status_code}") return# Can't proceed to decrypt if encrypt failed
# Wait a moment before decrypting time.sleep(0.1)
# --- Decrypt Task --- if ciphertext: decrypt_payload = {"keyId": self.key_id, "text": ciphertext} with self.client.post( "/api/crypto/decrypt", json=decrypt_payload, name="/api/crypto/decrypt", catch_response=True ) as response: if response.ok: try: decrypted_text = response.json().get("result") if decrypted_text != plaintext: response.failure("Decryption result did not match original plaintext.") except Exception: response.failure("Failed to parse JSON from decrypt response.") else: response.failure(f"Decrypt request failed with status {response.status_code}")