#!/usr/bin/env python3
# Run Dash analysis headlessly, step-by-step.
from __future__ import annotations
import os, json, base64, time
import logging
import pandas as pd
from io import StringIO
from dataclasses import dataclass, field, asdict, is_dataclass
from typing import Any, Dict, List, Optional, Tuple
import pickle
from collections.abc import Mapping
from pathlib import Path
from components import parsing, qc_analysis, proteomics, interactomics, db_functions
from components.figures import color_tools
from _version import __version__
[docs]
def dash_to_wire(obj):
"""Recursively convert Dash/Plotly components to JSON-serializable structures.
- Leaves primitives (str, int, float, bool, None) untouched.
- Converts any object exposing ``to_plotly_json()`` (Dash components, go.Figure).
- Recurses through dicts and lists/tuples.
- Dataclasses are converted via ``asdict()`` then recursed.
:param obj: Any Python object (Dash component, go.Figure, dict/list, primitives).
:returns: JSON-serializable structure with components replaced by dicts/lists.
"""
# Fast path: primitives / “don’t touch”
if obj is None or isinstance(obj, (str, int, float, bool, bytes, bytearray, memoryview)):
return obj
# Numpy scalars -> built-in types (optional but handy)
try:
import numpy as np # type: ignore
if isinstance(obj, np.generic):
return obj.item()
except Exception:
pass
# Dash/Plotly components & figures expose this
to_json = getattr(obj, "to_plotly_json", None)
if callable(to_json):
return dash_to_wire(to_json())
# Dataclasses → dict, then recurse
if is_dataclass(obj):
return dash_to_wire(asdict(obj))
# Mappings → dict, then recurse on values
if isinstance(obj, Mapping):
return {k: dash_to_wire(v) for k, v in obj.items()}
# Sequences (lists/tuples) → list, recurse per element
if isinstance(obj, (list, tuple)):
return [dash_to_wire(x) for x in obj]
# Anything else: leave as-is (you can add more coercions if needed)
return obj
# -------- Config --------
[docs]
@dataclass
class BatchConfig:
# --- data ---
data_table_path: str # e.g. "data/your_maxquant_proteingroups.tsv"
sample_table_path: str # e.g. "data/experimental_design.tsv"
outdir: str = "batch_out" # where to write JSON artifacts
figure_template: str = "plotly_white"
remove_common_contaminants: bool = True
rename_replicates: bool = False
unique_only: bool = False
workflow: str = "proteomics"
# --- pipeline ---
plot_formats: List[str] = field(default_factory=lambda: ["png", "html", "pdf"])
keep_batch_output: bool = False
# Proteomics knobs
na_filter_percent: int = 70
na_filter_type: str = "sample-group" # "sample-group" | "sample-set"
normalization: str = "no_normalization" # "Median" | "Quantile" | "Vsn" | "no_normalization"
imputation: str = "QRILC" # "knn" | "mean" | ...
control_group: Optional[str] = None # If None, provide comparison_file instead
comparison_file: Optional[str] = None
fc_threshold: float = 2
p_threshold: float = 0.05
test_type: str = "independent"
# Interactomics knobs
uploaded_controls: List[str] = field(default_factory=list)
additional_controls: List[str] = field(default_factory=list)
crapome_sets: List[str] = field(default_factory=list)
proximity_filtering: bool = False
n_controls: int = 3
saint_bfdr_threshold: float = 0.05
crapome_percentage_threshold: int = 20
crapome_fc_threshold: int = 2
rescue_enabled: bool = False
chosen_enrichments: List[str] = field(default_factory=list)
force_supervenn: bool = False
# -------- Helpers that mimic Dash's upload content --------
def _upload_contents_for_path(path: str) -> Tuple[str, str, int]:
"""Return tuple compatible with Dash Upload: (contents, filename, mtime_ms).
:param path: Path to a file on disk.
:returns: Tuple of (base64 contents string, filename, last-modified ms).
"""
with open(path, "rb") as f:
b64 = base64.b64encode(f.read()).decode("ascii")
# Dash passes strings like "data:application/octet-stream;base64,AAAA..."
contents = f"data:application/octet-stream;base64,{b64}"
filename = os.path.basename(path)
last_mod = int(os.path.getmtime(path) * 1000)
return contents, filename, last_mod
def _ensure_dir(path: str):
os.makedirs(path, exist_ok=True)
def _dump_json(outdir: str, name: str, obj: Any):
_ensure_dir(outdir)
with open(os.path.join(outdir, f"{name}.json"), "w", encoding="utf-8") as f:
json.dump(obj, f, indent=2)
def _collect_version_info(db_file: str) -> Dict[str, Any]:
"""Collect version information for Proteogyver, database, and external data.
Mirrors the save_version_info function from QC_and_data_analysis.py.
:param db_file: Path to SQLite database file.
:returns: Dictionary mapping entity names to versions.
"""
version_dict = {
'Proteogyver version': __version__,
}
# Get database versions
for update_type, version in db_functions.get_database_versions(db_file).items():
version_dict[f'Database: {update_type}'] = version
# Get external data versions
conn = db_functions.create_connection(db_file, mode='ro')
try:
for dataset, version, _ in db_functions.get_full_table_as_pd(conn, 'data_versions').values:
version_dict[dataset] = version
except Exception as e:
logger = logging.getLogger(__name__)
logger.error(f'Error getting external versions: {e}')
finally:
conn.close() # type: ignore
return version_dict
# -------- Pipeline --------
[docs]
def run_pipeline(cfg: BatchConfig, params: dict) -> Dict[str, Any]:
"""Execute the batch pipeline mirroring the app's QC and analysis steps.
:param cfg: Batch configuration object.
:param params: Parsed application parameters.
:returns: Summary dict and JSON artifacts written to ``cfg.outdir``.
"""
# 1) Load parameters & db/contaminants (mirrors QC_and_data_analysis.py)
db_file = os.path.join(*params["Data paths"]["Database file"])
contaminant_list = db_functions.get_contaminants(db_file)
# 1.5) Collect and save version information (mirrors save_version_info callback)
version_info = _collect_version_info(db_file)
_dump_json(cfg.outdir, "00_version_info", version_info)
# 2) “Upload” data & sample tables from disk (use same parsing functions the app uses)
data_contents, data_name, data_mtime = _upload_contents_for_path(cfg.data_table_path)
sample_contents, sample_name, sample_mtime = _upload_contents_for_path(cfg.sample_table_path)
# App gives a style dict and file-loading config; we can pass a dummy style and the real file settings
dummy_style = {}
file_loading_cfg = params["file loading"]
# Same functions the callbacks use:
# parsing.parse_data_file -> returns (upload_style, data_info, data_tables)
_, data_info, data_tables, warnings, mztab_sampletable = parsing.parse_data_file(
data_contents, data_name, data_mtime, dummy_style, file_loading_cfg
)
if len(warnings) > 0:
warnings.insert(0, 'Data table warnings')
warnings.append('- This might be due to file format. Supported formats are: csv (comma separated); tsv, txt, tab (tab separated); xlsx, xls (excel)')
_, expdes_info, expdes_table = parsing.parse_sample_table(
sample_contents, sample_name, sample_mtime, dummy_style, params['Sample table parsing']['SDRF']
)
exp_cols_found: list[str] = expdes_info['required columns found']
if len(exp_cols_found) < 2:
req_cols: list[str] = ['sample name', 'sample group']
fcols = ', '.join([expdes_info[col] for col in req_cols if col in expdes_info])
warnings = [
'Sample table warnings',
f'- Experimental design table is missing required columns. Found columns: {fcols}, required columns: {", ".join(req_cols)}.',
'- This might be due to file format. Supported formats are: csv (comma separated); tsv, txt, tab (tab separated); xlsx, xls (excel)'
]
if len(warnings) > 0:
# Return error information to be handled by pipeline_input_watcher
return {
"workflow": cfg.workflow,
"session_name": f'{time.strftime("%Y-%m-%d-%H-%M-%S")}--batch',
"error": "Pipeline terminated due to warnings in input files",
"warnings": warnings,
"outdir": cfg.outdir,
}
# 3) Format the data (mirrors `validate_data` callback)
# also set your figure template
import plotly.io as pio
pio.templates.default = cfg.figure_template
session_name = f'{time.strftime("%Y-%m-%d-%H-%M-%S")}--batch'
data_dictionary = parsing.format_data(
session_name,
data_tables,
data_info,
expdes_table,
expdes_info,
contaminant_list if cfg.remove_common_contaminants else [],
cfg.rename_replicates,
cfg.unique_only,
params["workflow parameters"]["interactomics"]["control indicators"],
params["file loading"]["Bait ID column names"],
)
data_dictionary['info'] = data_info
data_dictionary['input_data_tables'] = data_tables
data_dictionary['input_sample_table'] = expdes_table
_dump_json(cfg.outdir, "01_data_dictionary", data_dictionary)
# 4) Assign replicate colors (mirrors assign_replicate_colors)
rep_colors, rep_colors_with_cont = color_tools.get_assigned_colors(
data_dictionary["sample groups"]["norm"]
)
_dump_json(cfg.outdir, "02_replicate_colors", rep_colors)
_dump_json(cfg.outdir, "02_replicate_colors_with_cont", rep_colors_with_cont)
# 5) QC chain (call the same functions used in callbacks)
artifacts: Dict[str, Any] = {}
divs = {}
# TIC
tic_div, tic_data = qc_analysis.parse_tic_data(
data_dictionary["data tables"]["experimental design"],
rep_colors,
db_file,
params["Figure defaults"]["full-height"],
)
artifacts["tic"] = tic_data
divs["tic"] = tic_div
# Counts
table_to_use = data_dictionary["data tables"]["table to use"]
count_div, count_data = qc_analysis.count_plot(
data_dictionary["data tables"]["with-contaminants"][table_to_use],
rep_colors_with_cont,
contaminant_list,
params["Figure defaults"]["full-height"],
)
artifacts["counts"] = count_data
divs["counts"] = count_div
# Common proteins
common_div, common_data = qc_analysis.common_proteins(
data_dictionary["data tables"][table_to_use],
db_file,
params["Figure defaults"]["full-height"],
additional_groups={"Other contaminants": contaminant_list},
id_str="qc",
)
artifacts["common_proteins"] = common_data
divs["common_proteins"] = common_div
# Coverage
coverage_div, coverage_data = qc_analysis.coverage_plot(
data_dictionary["data tables"][table_to_use],
params["Figure defaults"]["half-height"],
)
artifacts["coverage"] = coverage_data
divs["coverage"] = coverage_div
# Reproducibility
repro_div, repro_data = qc_analysis.reproducibility_plot(
data_dictionary["data tables"][table_to_use],
data_dictionary["sample groups"]["norm"],
table_to_use,
params["Figure defaults"]["full-height"],
)
artifacts["reproducibility"] = repro_data
divs["reproducibility"] = repro_div
# Missing
missing_div, missing_data = qc_analysis.missing_plot(
data_dictionary["data tables"][table_to_use],
rep_colors,
params["Figure defaults"]["half-height"],
)
artifacts["missing"] = missing_data
divs["missing"] = missing_div
# Sum
sum_div, sum_data = qc_analysis.sum_plot(
data_dictionary["data tables"][table_to_use],
rep_colors,
params["Figure defaults"]["half-height"],
)
artifacts["sum"] = sum_data
divs["sum"] = sum_div
# Mean
mean_div, mean_data = qc_analysis.mean_plot(
data_dictionary["data tables"][table_to_use],
rep_colors,
params["Figure defaults"]["half-height"],
)
artifacts["mean"] = mean_data
divs["mean"] = mean_div
# Distribution
title = parsing.get_distribution_title(table_to_use)
dist_div, dist_data = qc_analysis.distribution_plot(
data_dictionary["data tables"][table_to_use],
rep_colors,
data_dictionary["sample groups"]["rev"],
params["Figure defaults"]["full-height"],
title,
)
artifacts["distribution"] = dist_data
divs["distribution"] = dist_div
# Commonality
commonality_div, commonality_data, pdf_str = qc_analysis.commonality_plot(
data_dictionary['data tables'][data_dictionary['data tables']['table to use']],
data_dictionary['sample groups']['rev'],
params['Figure defaults']['full-height'],
)
artifacts["commonality"] = commonality_data
artifacts["commonality_pdf"] = pdf_str
divs["commonality"] = commonality_div
with open(os.path.join(cfg.outdir, "03_qc_divs.pickle"), "wb") as f:
pickle.dump(divs, f)
_dump_json(cfg.outdir, "03_qc_artifacts", artifacts)
# 6) Workflow-specific analysis
if cfg.workflow.lower() == "proteomics":
return _run_proteomics_workflow(cfg, data_dictionary, rep_colors, params, artifacts)
elif cfg.workflow.lower() == "interactomics":
return _run_interactomics_workflow(cfg, data_dictionary, rep_colors, rep_colors_with_cont, params, artifacts)
else:
raise ValueError(f"Unknown workflow: {cfg.workflow}")
def _run_proteomics_workflow(cfg: BatchConfig, data_dictionary: Dict[str, Any],
rep_colors: Dict[str, Any], params: Dict[str, Any],
artifacts: Dict[str, Any]) -> Dict[str, Any]:
"""Run the proteomics analysis workflow.
:param cfg: Batch configuration.
:param data_dictionary: Parsed/validated inputs and groups.
:param rep_colors: Replicate color assignments.
:param params: Parsed application parameters.
:param artifacts: QC artifacts dict.
:returns: Proteomics summary dict.
"""
divs = {}
# NA filter
na_filter_div, na_filtered = proteomics.na_filter(
data_dictionary,
cfg.na_filter_percent,
params["Figure defaults"]["full-height"],
filter_type=cfg.na_filter_type,
)
_dump_json(cfg.outdir, "10_na_filtered", na_filtered)
divs["na_filter"] = na_filter_div
# Normalization
normalization_div, normalized = proteomics.normalization(
na_filtered, cfg.normalization,
params["Figure defaults"]["full-height"],
params["Config"]["script error file"],
)
_dump_json(cfg.outdir, "11_normalized", normalized)
divs["normalization"] = normalization_div
# Imputation
if normalized is not None:
missing_values_in_other_samples_div = proteomics.missing_values_in_other_samples(
normalized,
params["Figure defaults"]["full-height"],
)
divs["missing_values_in_other_samples"] = missing_values_in_other_samples_div
imputation_div, imputed = proteomics.imputation(
normalized, cfg.imputation,
params["Figure defaults"]["full-height"],
params["Config"]["script error file"],
sample_groups_rev=data_dictionary["sample groups"]["rev"]
)
_dump_json(cfg.outdir, "12_imputed", imputed)
divs["imputation"] = imputation_div
else:
imputed = None
# PCA (optional)
if imputed is not None:
pca_div, pca_data = proteomics.pca(
imputed,
data_dictionary["sample groups"]["rev"],
params["Figure defaults"]["full-height"],
rep_colors,
)
_dump_json(cfg.outdir, "13_pca", pca_data)
divs["pca"] = pca_div
# CV analysis
if True:
cv_div, cv_data = proteomics.perc_cvplot(
data_dictionary['data tables'][data_dictionary['data tables']['table to use']],
na_filtered,
data_dictionary["sample groups"]["norm"],
rep_colors,
params["Figure defaults"]["full-height"],
)
_dump_json(cfg.outdir, "13_cv", cv_data)
divs["cv"] = cv_div
# Clustermap/correlation clustering
clustermap_div, clustermap_data = proteomics.clustermap(
imputed,
params["Figure defaults"]["full-height"]
)
_dump_json(cfg.outdir, "13_clustermap", clustermap_data)
divs["clustermap"] = clustermap_div
# Perturbation analysis (if we have control groups)
# Find control groups from comparisons
control_groups = set()
if cfg.comparison_file:
import pandas as pd
comp_df = pd.read_csv(cfg.comparison_file, sep='\t')
if 'Control' in comp_df.columns:
control_groups.update(comp_df['Control'].unique())
# Volcano (control vs comparisons) — optional when controls/comparisons provided
volcano = None
if imputed is not None:
sgroups = data_dictionary["sample groups"]["norm"]
# If a comparisons file is provided and valid, validate like the UI does
comp_data = None
comp_style = {"background-color": "green"}
comparisons_file_path = None
try:
if cfg.comparison_file and isinstance(cfg.comparison_file, str) and len(cfg.comparison_file.strip()) > 0 and os.path.isfile(cfg.comparison_file):
comparisons_file_path = cfg.comparison_file
except Exception:
comparisons_file_path = None
if comparisons_file_path:
comp_contents, comp_name, _ = _upload_contents_for_path(comparisons_file_path)
comp_style, comp_data = parsing.check_comparison_file(
comp_contents, comp_name, sgroups, comp_style
)
# Normalize control group (treat empty string as None)
control_group_clean = cfg.control_group if (cfg.control_group and str(cfg.control_group).strip() != "") else None
# Only run DA if we have either a control group or valid comparisons
has_controls_or_comparisons = bool(control_group_clean) or (comp_data is not None and len(comp_data) > 0)
if has_controls_or_comparisons:
comparisons = parsing.parse_comparisons(
control_group_clean, comp_data, sgroups
)
volcano_div, volcano_data = proteomics.differential_abundance(
imputed,
sgroups,
comparisons,
cfg.fc_threshold,
cfg.p_threshold,
params["Figure defaults"]["full-height"],
cfg.test_type,
os.path.join(*params["Data paths"]["Database file"]),
)
volcano = volcano_data
_dump_json(cfg.outdir, "14_volcano", volcano)
divs["volcano"] = volcano_div
else:
# No control information; skip DA gracefully
volcano = None
with open(os.path.join(cfg.outdir, "04_proteomics_divs.pickle"), "wb") as f:
pickle.dump(divs, f)
summary = {
"workflow": "proteomics",
"session_name": data_dictionary["other"]["session name"],
"artifacts": artifacts,
"na_filtered": na_filtered,
"normalized": (normalized is not None),
"imputed": (imputed is not None),
"volcano": volcano is not None,
"outdir": cfg.outdir,
}
_dump_json(cfg.outdir, "00_summary", summary)
return summary
def _run_interactomics_workflow(cfg: BatchConfig, data_dictionary: Dict[str, Any],
rep_colors: Dict[str, Any], rep_colors_with_cont: Dict[str, Any], params: Dict[str, Any],
artifacts: Dict[str, Any]) -> Dict[str, Any]:
"""Run the interactomics analysis workflow.
:param cfg: Batch configuration.
:param data_dictionary: Parsed/validated inputs and groups.
:param rep_colors: Replicate color assignments.
:param rep_colors_with_cont: Replicate colors incl. contaminants.
:param params: Parsed application parameters.
:param artifacts: QC artifacts dict.
:returns: Interactomics summary dict.
"""
db_file = os.path.join(*params["Data paths"]["Database file"])
contaminant_list = db_functions.get_contaminants(db_file)
divs = {}
# Check if we have spectral count data
if '"No data"' in data_dictionary["data tables"]["spc"]:
return {
"workflow": "interactomics",
"session_name": data_dictionary["other"]["session name"],
"artifacts": artifacts,
"error": "No spectral count data available for interactomics analysis",
"outdir": cfg.outdir,
}
# 1) Generate SAINT container (prepare controls and data)
saint_div, saint_dict, crapome_data = interactomics.generate_saint_container(
data_dictionary,
cfg.uploaded_controls,
cfg.additional_controls,
cfg.crapome_sets,
db_file,
cfg.proximity_filtering,
cfg.n_controls
)
_dump_json(cfg.outdir, "20_saint_dict", saint_dict)
_dump_json(cfg.outdir, "20_crapome_data", crapome_data)
divs["saint"] = saint_div
# 2) Run SAINT analysis
if not saint_dict: # Empty dict means no data
return {
"workflow": "interactomics",
"session_name": data_dictionary["other"]["session name"],
"artifacts": artifacts,
"error": "Insufficient data for SAINT analysis",
"outdir": cfg.outdir,
}
session_name = data_dictionary["other"]["session name"]
bait_uniprots = data_dictionary["other"].get("bait uniprots", {})
saint_output, saint_failed = interactomics.run_saint(
saint_dict,
params["External tools"]["SAINT tempdir"],
session_name,
bait_uniprots,
cleanup=True
)
if "SAINT failed" in saint_output:
return {
"workflow": "interactomics",
"session_name": session_name,
"artifacts": artifacts,
"error": "SAINT analysis failed",
"saint_failed": True,
"outdir": cfg.outdir,
}
_dump_json(cfg.outdir, "21_saint_output_raw", saint_output)
# 3) Add CRAPome data if available
if crapome_data and crapome_data != '{"columns":[],"index":[],"data":[]}':
saint_output = interactomics.add_crapome(saint_output, crapome_data)
_dump_json(cfg.outdir, "22_saint_with_crapome", saint_output)
# 4) Filter SAINT results
filtered_saint = interactomics.saint_filtering(
saint_output,
cfg.saint_bfdr_threshold,
cfg.crapome_percentage_threshold,
cfg.crapome_fc_threshold,
cfg.rescue_enabled
)
_dump_json(cfg.outdir, "23_saint_filtered", filtered_saint)
filtered_saint = interactomics.map_intensity(filtered_saint, data_dictionary['data tables']['intensity'], data_dictionary['sample groups']['norm'])
_dump_json(cfg.outdir, "23_saint_filtered_and_intensity_mapped", filtered_saint)
known_div, filtered_saint_with_knowns = interactomics.known_plot(filtered_saint, db_file, rep_colors_with_cont, params['Figure defaults']['half-height'])
_dump_json(cfg.outdir, "23_saint_filtered_and_intensity_mapped_with_knowns", filtered_saint_with_knowns)
divs["known"] = known_div
# 4.5) Common proteins plot
saint_matrix = interactomics.get_saint_matrix(filtered_saint)
common_proteins_div, common_proteins_data = qc_analysis.common_proteins(
saint_matrix.to_json(orient='split'),
db_file,
params["Figure defaults"]["full-height"],
additional_groups={"Other contaminants": contaminant_list},
id_str="interactomics",
)
_dump_json(cfg.outdir, "23_common_proteins", common_proteins_data)
divs["common_proteins"] = common_proteins_div
# 5) Generate network plot
network_div, network_elements, interactions = interactomics.do_network(
filtered_saint,
params["Figure defaults"]["full-height"]["height"]
)
_dump_json(cfg.outdir, "24_network_elements", network_elements)
_dump_json(cfg.outdir, "24_interactions", interactions)
divs["network"] = network_div
# 6) PCA analysis
pca_div, pca_data = interactomics.pca(
filtered_saint,
params["Figure defaults"]["full-height"],
rep_colors
)
_dump_json(cfg.outdir, "25_pca", pca_data)
divs["pca"] = pca_div
# 7) Enrichment analysis (if requested)
enrichment_data = None
enrichment_info = None
root_dir = Path(__file__).resolve().parents[1]
parameters_path = os.path.join(root_dir, 'config','parameters.toml')
if cfg.chosen_enrichments:
enrichment_div, enrichment_data, enrichment_info = interactomics.enrich(
filtered_saint,
cfg.chosen_enrichments,
params["Figure defaults"]["full-height"],
parameters_file=parameters_path
)
_dump_json(cfg.outdir, "26_enrichment_data", enrichment_data)
_dump_json(cfg.outdir, "26_enrichment_info", enrichment_info)
divs["enrichment"] = enrichment_div
# 8) MS-microscopy analysis
msmic_div, msmic_data = interactomics.do_ms_microscopy(
filtered_saint,
db_file,
params["Figure defaults"]["full-height"]
)
_dump_json(cfg.outdir, "27_msmic_data", msmic_data)
divs["msmic"] = msmic_div
with open(os.path.join(cfg.outdir, "05_interactomics_divs.pickle"), "wb") as f:
pickle.dump(divs, f)
summary = {
"workflow": "interactomics",
"session_name": session_name,
"artifacts": artifacts,
"saint_output": saint_output,
"saint_failed": saint_failed,
"filtered_saint": filtered_saint,
"network_elements": len(network_elements) if network_elements else 0,
"interactions": len(interactions) if interactions else 0,
"enrichment_analysis": enrichment_data is not None,
"msmic_analysis": msmic_data is not None,
"outdir": cfg.outdir,
}
_dump_json(cfg.outdir, "00_summary", summary)
return summary