Source code for app.run_as_pipeline

#!/usr/bin/env python3
"""
ProteoGyver Batch Pipeline

This script runs the complete batch pipeline using the same infrastructure
as the GUI, ensuring identical behavior and maintainability.

"""

import os
import sys
import argparse
import tempfile
from datetime import datetime
import logging
from pathlib import Path
from pipeline_module import pipeline_batch
from pipeline_module import pipeline_from_toml
from pipeline_module import batch_data_store_builder
from pipeline_module import batch_figure_builder_from_divs
from components import infra
from components import parsing

logger = logging.getLogger(__name__)


[docs] def run_batch_pipeline(toml_path: str) -> dict: """Run the complete batch pipeline. :param toml_path: Path to the TOML configuration file. :returns: Summary dict with execution details, export paths, and figures info. """ script_dir = Path(__file__).resolve().parent parameters_file = script_dir / 'config/parameters.toml' parameters = parsing.parse_parameters(parameters_file) input_dir = os.path.dirname(os.path.realpath(toml_path)) config = pipeline_from_toml.load_config(toml_path, default_toml_dir=Path(*parameters['Pipeline module']['Default toml files directory'])) plot_formats = config.plot_formats keep_batch_output = config.keep_batch_output export_dir = os.path.join(input_dir, 'PG output') os.makedirs(export_dir, exist_ok=True) # Generate session name from timestamp session_name = f"{datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}--pipeline" logger.info(f"Session name: {session_name}, output directory: {export_dir}") # Determine whether to use temporary directory or keep output. The pipeline # writes artifacts to config.outdir; ensure downstream readers use the same path. temp_context = None if keep_batch_output: # Ensure configured outdir exists and use it directly os.makedirs(config.outdir, exist_ok=True) batch_output_dir = config.outdir else: # Use a temporary directory and point config.outdir to it temp_context = tempfile.TemporaryDirectory(prefix="proteogyver_pipeline_") config.outdir = temp_context.name batch_output_dir = config.outdir try: if temp_context: logger.info(f"Running batch pipeline with temporary output: {batch_output_dir}") else: logger.info(f"Running batch pipeline with permanent output: {batch_output_dir}") # Step 1: Run the batch pipeline summary = {} try: logger.info("Step 1: Running batch pipeline...") summary = pipeline_batch.run_pipeline(config, parameters) # Check if pipeline returned error due to warnings if "error" in summary and "warnings" in summary: error_msg = f"{summary['error']}" logger.error(error_msg) logger.error(f"Warnings: {summary['warnings']}") # Write error file to input directory input_dir = os.path.dirname(os.path.realpath(toml_path)) error_file = os.path.join(input_dir, "ERRORS.txt") from datetime import datetime as _dt ts = _dt.now().strftime("%Y-%m-%d %H:%M:%S") with open(error_file, "a", encoding="utf-8") as f: f.write(f"[{ts}] Errors:\n") for warning in summary['warnings']: f.write(f"[{ts}] - {warning}\n") logger.info(f"Warnings written to {error_file}") raise ValueError(error_msg) logger.info(f"Batch pipeline completed successfully") except Exception as e: logger.error(f"Batch pipeline failed: {e}") raise # Step 2: Build data stores and export try: logger.info("Step 2: Building data stores and exporting...") # Detect workflow from config workflow = config.workflow # Build data stores from batch output data_stores = batch_data_store_builder.build_data_stores_from_batch_output(batch_output_dir, workflow) logger.info(f"Built {len(data_stores)} data stores") data_export_result = infra.save_data_stores(data_stores, export_dir) logger.info(f"Data export completed") except Exception as e: logger.error(f"Data export failed: {e}") raise figure_summary = {} try: logger.info("Step 3: Generating figures...") figures_export_dir = export_dir figure_summary = batch_figure_builder_from_divs.save_batch_figures_using_saved_divs( batch_output_dir=batch_output_dir, export_dir=figures_export_dir, workflow=workflow, parameters=parameters, output_formats=plot_formats ) logger.info(f"Figure generation completed") except Exception as e: logger.error(f"Figure generation failed: {e}") # Don't raise - figures are optional figure_summary = {"error": str(e)} guide_path = os.path.join(os.path.dirname(__file__), 'data', 'output_guide.md') infra.write_README(export_dir, guide_path) finally: # Clean up temporary directory if used if temp_context: temp_context.cleanup() # Final summary result = { "pipeline_summary": summary, "export_directory": export_dir, "session_name": session_name, "workflow": workflow, "data_stores_built": len(data_stores), "batch_output_directory": batch_output_dir if keep_batch_output else "temporary", "figures_generated": figure_summary.get("analysis_divs_count", 0) if isinstance(figure_summary, dict) and "error" not in figure_summary else 0, "figure_details": figure_summary, } logger.info(f"pipeline finished. Export directory: {export_dir}") return result
[docs] def main(): """Command line interface for the batch pipeline. :returns: None. """ parser = argparse.ArgumentParser( description="Run ProteoGyver batch pipeline", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Run proteomics pipeline python run_as_pipeline.py proteomics_pipeline.toml # Run interactomics pipeline with custom export directory python run_as_pipeline.py interactomics_pipeline.toml --export-dir my_results # Keep intermediate batch files for debugging python run_as_pipeline.py config.toml --keep-batch-output # Run without plot generation python run_as_pipeline.py config.toml --no-plots """ ) parser.add_argument("toml_file", help="TOML configuration file for the pipeline") parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging") parser.add_argument("--debug", "-d", action="store_true", help="Enable debug logging") args = parser.parse_args() # Configure logging # Configure logging log_level = logging.DEBUG if args.debug else logging.INFO if args.verbose else logging.WARNING timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') toml_dir = Path(args.toml_file).resolve().parent log_file = toml_dir / f"{timestamp}_pipeline.log" logging.basicConfig( level=log_level, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler(str(log_file)) ] ) # Validate inputs if not os.path.exists(args.toml_file): logger.error(f"TOML file not found: {args.toml_file}") sys.exit(1) try: # Run the pipeline result = run_batch_pipeline( toml_path=args.toml_file, ) # Print summary print(f"\nPipeline Complete!") print(f"Workflow: {result['workflow']}") if "error" in result.get('figure_details', {}): print(f"ERROR: Figure generation failed: {result['figure_details']['error']}") print(f"\nSession name: {result['session_name']}") except Exception as e: logger.exception(f"Pipeline failed: {e}") print(f"\nERROR: Pipeline Failed: {e}") sys.exit(1)
if __name__ == "__main__": main()