"""API endpoint and Celery task for accepting and saving pipeline input files.
This module provides a Flask API endpoint that accepts file uploads (data_table,
sample_table, pipeline_toml, and optionally proteomics_comparisons) and a Celery
background task that saves these files to a specified directory.
The API runs continuously as part of the Flask server, and file saving is handled
asynchronously by Celery workers.
"""
import os
import logging
import zipfile
import tempfile
from pathlib import Path
from typing import Optional, Dict, Any
from datetime import datetime
from flask import request, jsonify, send_file
from werkzeug.utils import secure_filename
from celery import shared_task
from uuid import uuid4
from components.tools.utils import save_toml, read_toml, load_toml
logger = logging.getLogger("file_upload_api")
@shared_task
def save_uploaded_files(
data_table_content: bytes,
data_table_filename: str,
sample_table_content: bytes,
sample_table_filename: str,
pipeline_toml_content: bytes,
pipeline_toml_filename: str,
proteomics_comparisons_content: Optional[bytes] = None,
proteomics_comparisons_filename: Optional[str] = None,
output_directory: Optional[str] = None,
upload_dir_name: Optional[str] = None,
) -> Dict[str, Any]:
"""Save uploaded files to the specified directory.
This Celery task saves the uploaded files (data_table, sample_table,
pipeline_toml, and optionally proteomics_comparisons) to a specified
output directory. If no output directory is provided, it uses the
default from parameters.toml.
:param data_table_content: Binary content of the data table file.
:param data_table_filename: Original filename of the data table.
:param sample_table_content: Binary content of the sample table file.
:param sample_table_filename: Original filename of the sample table.
:param pipeline_toml_content: Binary content of the pipeline TOML file.
:param pipeline_toml_filename: Original filename of the pipeline TOML.
:param proteomics_comparisons_content: Optional binary content of proteomics comparisons file.
:param proteomics_comparisons_filename: Optional original filename of proteomics comparisons.
:param output_directory: Optional output directory path. If None, uses default from parameters.toml.
:param upload_dir_name: Optional upload directory name. If None, generates a new one.
:returns: Dictionary with status, message, and saved file paths.
"""
try:
# Load parameters to get default output directory if not provided
root_dir = Path(__file__).resolve().parents[1]
parameters_path = os.path.join(root_dir, 'config','parameters.toml')
parameters = read_toml(Path(parameters_path))
if output_directory is None:
pipeline_path = parameters.get('Pipeline module', {}).get('Input watch directory', [])
if isinstance(pipeline_path, list):
# Filter out None values and ensure all are strings
path_parts = [str(p) for p in pipeline_path if p is not None]
output_directory = os.path.join(*path_parts) if path_parts else str(pipeline_path)
else:
output_directory = str(pipeline_path)
# Ensure output directory exists
os.makedirs(output_directory, exist_ok=True)
# Create timestamped subdirectory for this upload
if upload_dir_name is None:
timestamp = datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
upload_dir_name = f'{timestamp}--api_upload_{uuid4()}'
upload_dir = os.path.join(output_directory, upload_dir_name)
os.makedirs(upload_dir, exist_ok=True)
saved_files = {}
# Save data table
data_table_path = os.path.join(upload_dir, secure_filename(data_table_filename))
with open(data_table_path, 'wb') as f:
f.write(data_table_content)
saved_files['data_table'] = data_table_path
logger.info(f"Saved data table to {data_table_path}")
# Save sample table
sample_table_path = os.path.join(upload_dir, secure_filename(sample_table_filename))
with open(sample_table_path, 'wb') as f:
f.write(sample_table_content)
saved_files['sample_table'] = sample_table_path
logger.info(f"Saved sample table to {sample_table_path}")
# Save pipeline TOML
pipeline_toml_path = os.path.join(upload_dir, secure_filename(pipeline_toml_filename))
with open(pipeline_toml_path, 'wb') as f:
f.write(pipeline_toml_content)
saved_files['pipeline_toml'] = pipeline_toml_path
logger.info(f"Saved pipeline TOML to {pipeline_toml_path}")
# Save proteomics comparisons if provided
if proteomics_comparisons_content is not None and proteomics_comparisons_filename:
proteomics_comparisons_path = os.path.join(
upload_dir, secure_filename(proteomics_comparisons_filename)
)
with open(proteomics_comparisons_path, 'wb') as f:
f.write(proteomics_comparisons_content)
saved_files['proteomics_comparisons'] = proteomics_comparisons_path
logger.info(f"Saved proteomics comparisons to {proteomics_comparisons_path}")
# Modify pipeline TOML to update file paths
try:
# Parse TOML
doc = load_toml(Path(pipeline_toml_path))
# Ensure 'general' section exists
if 'general' not in doc:
doc['general'] = {}
# Update or create 'data' key in general section
data_filename = secure_filename(data_table_filename)
doc['general']['data'] = data_filename
logger.info(f"Updated pipeline TOML: general.data = {data_filename}")
# Update or create 'sample table' key in general section
sample_table_filename_secure = secure_filename(sample_table_filename)
doc['general']['sample table'] = sample_table_filename_secure
logger.info(f"Updated pipeline TOML: general['sample table'] = {sample_table_filename_secure}")
# Update proteomics comparisons if provided
if proteomics_comparisons_content is not None and proteomics_comparisons_filename:
# Ensure 'proteomics' section exists
if 'proteomics' not in doc:
doc['proteomics'] = {}
# Update or create 'comparison_file' key in proteomics section
proteomics_comparisons_filename_secure = secure_filename(proteomics_comparisons_filename)
doc['proteomics']['comparison_file'] = proteomics_comparisons_filename_secure
logger.info(f"Updated pipeline TOML: proteomics.comparison_file = {proteomics_comparisons_filename_secure}")
# Write the modified TOML back
save_toml(doc, Path(pipeline_toml_path))
logger.info(f"Successfully modified pipeline TOML at {pipeline_toml_path}")
except Exception as e:
# Log error but don't fail the task - files are already saved
error_msg = f"Error modifying pipeline TOML: {str(e)}"
logger.warning(error_msg, exc_info=True)
return {
'status': 'success',
'message': f'Files saved successfully to {upload_dir}',
'upload_directory': upload_dir,
'upload_directory_name': upload_dir_name,
'saved_files': saved_files
}
except Exception as e:
error_msg = f"Error saving uploaded files: {str(e)}"
logger.error(error_msg, exc_info=True)
return {
'status': 'error',
'message': error_msg,
'upload_directory': None,
'saved_files': {}
}
[docs]
def register_file_upload_api(server, celery_app_instance=None):
"""Register the file upload API endpoint with the Flask server.
This function sets up a POST endpoint at '/api/upload-pipeline-files' that
accepts multipart/form-data file uploads. The endpoint accepts:
- data_table (required): Data table file
- sample_table (required): Sample table file
- pipeline_toml (required): Pipeline configuration TOML file
- proteomics_comparisons (optional): Proteomics comparisons file
Files are processed asynchronously via a Celery task.
:param server: Flask server instance (typically app.server from Dash).
:param celery_app_instance: Optional Celery app instance to use for task execution.
:returns: None
"""
# Store celery_app instance for use in the endpoint
celery_app_for_task = celery_app_instance
@server.route('/api/upload-pipeline-files', methods=['POST'])
def upload_pipeline_files():
"""API endpoint for uploading pipeline input files.
Accepts POST requests with multipart/form-data containing:
- data_table (required): File upload
- sample_table (required): File upload
- pipeline_toml (required): File upload
- proteomics_comparisons (optional): File upload
- output_directory (optional): Custom output directory path
:returns: JSON response with task ID and status.
"""
try:
# Check for required files
if 'data_table' not in request.files:
return jsonify({
'status': 'error',
'message': 'Missing required file: data_table'
}), 400
if 'sample_table' not in request.files:
return jsonify({
'status': 'error',
'message': 'Missing required file: sample_table'
}), 400
if 'pipeline_toml' not in request.files:
return jsonify({
'status': 'error',
'message': 'Missing required file: pipeline_toml'
}), 400
# Get required files
data_table_file = request.files['data_table']
sample_table_file = request.files['sample_table']
pipeline_toml_file = request.files['pipeline_toml']
# Check if files are actually uploaded (not empty)
if data_table_file.filename == '':
return jsonify({
'status': 'error',
'message': 'data_table file is empty'
}), 400
if sample_table_file.filename == '':
return jsonify({
'status': 'error',
'message': 'sample_table file is empty'
}), 400
if pipeline_toml_file.filename == '':
return jsonify({
'status': 'error',
'message': 'pipeline_toml file is empty'
}), 400
# Read file contents
data_table_content = data_table_file.read()
sample_table_content = sample_table_file.read()
pipeline_toml_content = pipeline_toml_file.read()
# Get optional proteomics_comparisons file
proteomics_comparisons_content = None
proteomics_comparisons_filename = None
if 'proteomics_comparisons' in request.files:
proteomics_comparisons_file = request.files['proteomics_comparisons']
if proteomics_comparisons_file.filename != '':
proteomics_comparisons_content = proteomics_comparisons_file.read()
proteomics_comparisons_filename = proteomics_comparisons_file.filename
# Get optional output directory
output_directory = request.form.get('output_directory', None)
# Generate upload directory name synchronously so we can return it immediately
# This matches what the task will create
timestamp = datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
upload_dir_name = f'{timestamp}--api_upload_{uuid4()}'
# Trigger Celery task to save files
try:
# Use the celery_app instance if provided, otherwise try to import it
app_instance = celery_app_for_task
if app_instance is None:
try:
from app import celery_app
app_instance = celery_app
except ImportError:
app_instance = None
if app_instance is not None:
# Use send_task with the configured celery_app instance
task = app_instance.send_task(
'components.file_upload_api.save_uploaded_files',
args=[],
kwargs={
'data_table_content': data_table_content,
'data_table_filename': data_table_file.filename,
'sample_table_content': sample_table_content,
'sample_table_filename': sample_table_file.filename,
'pipeline_toml_content': pipeline_toml_content,
'pipeline_toml_filename': pipeline_toml_file.filename,
'proteomics_comparisons_content': proteomics_comparisons_content,
'proteomics_comparisons_filename': proteomics_comparisons_filename,
'output_directory': output_directory,
'upload_dir_name': upload_dir_name
}
)
else:
# Fallback to using shared_task directly
task = save_uploaded_files.delay(
data_table_content=data_table_content,
data_table_filename=data_table_file.filename,
sample_table_content=sample_table_content,
sample_table_filename=sample_table_file.filename,
pipeline_toml_content=pipeline_toml_content,
pipeline_toml_filename=pipeline_toml_file.filename,
proteomics_comparisons_content=proteomics_comparisons_content,
proteomics_comparisons_filename=proteomics_comparisons_filename,
output_directory=output_directory,
upload_dir_name=upload_dir_name
)
logger.info(f"File upload task queued: {task.id}, upload directory: {upload_dir_name}")
return jsonify({
'status': 'accepted',
'message': 'Files uploaded successfully, processing in background',
'task_id': task.id,
'upload_directory_name': upload_dir_name
}), 202
except Exception as celery_error:
# Check if it's a Redis connection error
error_str = str(celery_error)
if 'Connection refused' in error_str or '111' in error_str or 'ConnectionError' in str(type(celery_error).__name__):
logger.error(f"Celery/Redis connection error: {celery_error}", exc_info=True)
return jsonify({
'status': 'error',
'message': 'Cannot connect to Celery/Redis. Please ensure Redis is running and Celery workers are started.'
}), 503 # Service Unavailable
else:
# Re-raise other errors to be caught by outer exception handler
raise
except Exception as e:
error_msg = f"Error processing file upload request: {str(e)}"
logger.error(error_msg, exc_info=True)
return jsonify({
'status': 'error',
'message': error_msg
}), 500
@server.route('/api/pipeline-status', methods=['GET'])
def check_pipeline_status():
"""API endpoint for checking pipeline processing status.
Accepts GET requests with query parameter:
- upload_directory_name (required): Name of the upload directory
Returns the status of the pipeline run:
- 'processing': Pipeline is currently running (.pg_analyzing.lock exists)
- 'success': Pipeline completed successfully (pipeline.success.txt exists)
- 'error': Pipeline failed (pipeline.failure.txt exists)
- 'not_found': Upload directory not found
- 'unknown': No status files found (may not have started yet)
If status is 'error', also returns the contents of ERRORS.txt.
:returns: JSON response with status and optional error message.
"""
try:
# Get upload directory name from query parameter
upload_dir_name = request.args.get('upload_directory_name')
if not upload_dir_name:
return jsonify({
'status': 'error',
'message': 'Missing required parameter: upload_directory_name'
}), 400
# Load parameters to get the base output directory
root_dir = Path(__file__).resolve().parents[1]
parameters_path = os.path.join(root_dir, 'config','parameters.toml')
parameters = read_toml(Path(parameters_path))
pipeline_path = parameters.get('Pipeline module', {}).get('Input watch directory', [])
if isinstance(pipeline_path, list):
# Filter out None values and ensure all are strings
path_parts = [str(p) for p in pipeline_path if p is not None]
base_directory = os.path.join(*path_parts) if path_parts else str(pipeline_path)
else:
base_directory = str(pipeline_path)
# Construct full path to upload directory
upload_dir = os.path.join(base_directory, upload_dir_name)
upload_dir_path = Path(upload_dir)
# Check if directory exists
if not upload_dir_path.exists() or not upload_dir_path.is_dir():
return jsonify({
'status': 'not_found',
'message': f'Upload directory not found: {upload_dir_name}',
'upload_directory_name': upload_dir_name
}), 404
# Check for status files
lock_file = upload_dir_path / '.pg_analyzing.lock'
watcher_log = upload_dir_path / 'watcher.log'
success_file = upload_dir_path / 'pipeline.success.txt'
failure_file = upload_dir_path / 'pipeline.failure.txt'
errors_file = upload_dir_path / 'ERRORS.txt'
# Determine status based on file presence
if lock_file.exists():
# Pipeline is currently processing
return jsonify({
'status': 'processing',
'message': 'Pipeline is currently running',
'upload_directory_name': upload_dir_name
}), 200
if success_file.exists():
# Pipeline completed successfully
return jsonify({
'status': 'success',
'message': 'Pipeline completed successfully',
'upload_directory_name': upload_dir_name,
'success_timestamp': success_file.read_text(encoding='utf-8').strip() if success_file.exists() else None
}), 200
if failure_file.exists():
# Pipeline failed
error_content = None
if errors_file.exists():
try:
error_content = errors_file.read_text(encoding='utf-8')
except Exception as e:
logger.warning(f"Error reading ERRORS.txt: {e}")
error_content = f"Error reading ERRORS.txt: {str(e)}"
return jsonify({
'status': 'error',
'message': 'Pipeline execution failed',
'upload_directory_name': upload_dir_name,
'error_message': error_content,
'failure_timestamp': failure_file.read_text(encoding='utf-8').strip() if failure_file.exists() else None
}), 200
# Pipeline being watched, not yet processing. watcher.log will remain after it's done, so we will check it last.
if watcher_log.exists():
return jsonify({
'status': 'processing',
'message': 'Pipeline is currently running',
'upload_directory_name': upload_dir_name
}), 200
return jsonify({
'status': 'unknown',
'message': 'No status files found. Pipeline may not have started yet.',
'upload_directory_name': upload_dir_name
}), 200
except Exception as e:
error_msg = f"Error checking pipeline status: {str(e)}"
logger.error(error_msg, exc_info=True)
return jsonify({
'status': 'error',
'message': error_msg
}), 500
@server.route('/api/download-output', methods=['GET'])
def download_output():
"""API endpoint for downloading the PG output directory as a zip file.
Accepts GET requests with query parameter:
- upload_directory_name (required): Name of the upload directory
Returns the "PG output" directory as a zip file named "PG output.zip".
If the PG output directory doesn't exist, returns a 404 error.
:returns: ZIP file download or error response.
"""
try:
# Get upload directory name from query parameter
upload_dir_name = request.args.get('upload_directory_name')
if not upload_dir_name:
return jsonify({
'status': 'error',
'message': 'Missing required parameter: upload_directory_name'
}), 400
# Load parameters to get the base output directory
root_dir = Path(__file__).resolve().parents[1]
parameters_path = os.path.join(root_dir, 'config','parameters.toml')
parameters = read_toml(Path(parameters_path))
pipeline_path = parameters.get('Pipeline module', {}).get('Input watch directory', [])
if isinstance(pipeline_path, list):
# Filter out None values and ensure all are strings
path_parts = [str(p) for p in pipeline_path if p is not None]
base_directory = os.path.join(*path_parts) if path_parts else str(pipeline_path)
else:
base_directory = str(pipeline_path)
# Construct full path to upload directory
upload_dir = os.path.join(base_directory, upload_dir_name)
upload_dir_path = Path(upload_dir)
# Check if directory exists
if not upload_dir_path.exists() or not upload_dir_path.is_dir():
return jsonify({
'status': 'not_found',
'message': f'Upload directory not found: {upload_dir_name}'
}), 404
# Find PG output directory
pg_output_dir = upload_dir_path / 'PG output'
if not pg_output_dir.exists() or not pg_output_dir.is_dir():
return jsonify({
'status': 'not_found',
'message': 'PG output directory not found. Pipeline may not have completed yet.'
}), 404
# Create a temporary zip file
temp_zip = tempfile.NamedTemporaryFile(delete=False, suffix='.zip')
temp_zip_path = Path(temp_zip.name)
temp_zip.close()
try:
# Create zip file
with zipfile.ZipFile(temp_zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
# Walk through the PG output directory and add all files
for root, dirs, files in os.walk(upload_dir_path):
for file in files:
file_path = Path(root) / file
# Create archive name relative to PG output directory
arcname = file_path.relative_to(upload_dir_path)
zipf.write(file_path, arcname)
logger.info(f"Created zip file for PG output: {temp_zip_path}")
# Send the zip file
return send_file(
str(temp_zip_path),
mimetype='application/zip',
as_attachment=True,
download_name='PG output.zip'
)
except Exception as zip_error:
# Clean up temp file on error
if temp_zip_path.exists():
temp_zip_path.unlink()
logger.error(f"Error creating zip file: {zip_error}", exc_info=True)
return jsonify({
'status': 'error',
'message': f'Error creating zip file: {str(zip_error)}'
}), 500
except Exception as e:
error_msg = f"Error downloading output: {str(e)}"
logger.error(error_msg, exc_info=True)
return jsonify({
'status': 'error',
'message': error_msg
}), 500