"""Functions for processing and visualizing protein-protein interaction data.
This module provides functionality for analyzing mass spectrometry-based
interactomics data, including:
- Running and processing SAINT analysis for scoring protein interactions
- Filtering results based on BFDR and CRAPome metrics
- Creating visualizations (networks, heatmaps, PCA plots)
- Performing enrichment analysis
- MS-microscopy analysis for protein localization
- Processing known interaction data
The module integrates with a SQLite database for retrieving reference data
and uses Dash components for creating interactive visualizations.
Typical usage example:
>>> saint_dict = make_saint_dict(spc_table, sample_groups, controls, proteins)
>>> saint_output = run_saint(saint_dict, temp_dir, session_id, bait_ids)
>>> filtered_output = saint_filtering(saint_output, bfdr=0.01, crapome_pct=0.1)
>>> network_plot = do_network(filtered_output, plot_height=600)
Attributes:
logger: Logger instance for module-level logging
"""
from typing import Dict, List, Tuple, Set, Optional, Any, Union
from dash import html, dash_table
import pandas as pd
from io import StringIO
from components import db_functions
import numpy as np
import shutil
import os
import tempfile
import sh
import sqlite3
from components.figures import histogram, bar_graph, scatter, heatmaps, network_plot
from components import matrix_functions, db_functions, ms_microscopy
from components.figures.figure_legends import INTERACTOMICS_LEGENDS as legends
from components.figures.figure_legends import enrichment_legend, leg_rep
from components.text_handling import replace_accent_and_special_characters
from components import EnrichmentAdmin as ea
from dash_bootstrap_components import Card, CardBody, Tab, Tabs
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
[docs]
def count_knowns(saint_output: pd.DataFrame,
replicate_colors: Dict[str, Dict[str, Dict[str, str]]]) -> pd.DataFrame:
"""Count known interactions per bait protein.
:param saint_output: SAINT output with columns including ``Bait`` and ``Known interaction``.
:param replicate_colors: Mapping with structure ``{'contaminant': {'sample groups': {bait: color}}, 'non-contaminant': {...}}``.
:returns: DataFrame with columns ``Bait``, ``Known interaction``, ``Prey count``, and ``Color``.
"""
data: pd.DataFrame = saint_output[['Bait', 'Known interaction']].\
value_counts().to_frame().reset_index().rename(
columns={'count': 'Prey count'})
color_col: list = []
for _, row in data.iterrows():
if row['Known interaction']:
color_col.append(
replicate_colors['contaminant']['sample groups'][row['Bait']])
else:
color_col.append(
replicate_colors['non-contaminant']['sample groups'][row['Bait']])
data['Color'] = color_col
return data
[docs]
def do_network(saint_output_json: str,
plot_height: int) -> Tuple[html.Div, List[Dict[str, Any]], Dict[str, Any]]:
"""Create a Cytoscape network from filtered SAINT output.
:param saint_output_json: SAINT output in pandas split-JSON format.
:param plot_height: Height of the network plot in pixels.
:returns: Tuple of (plot container Div, cytoscape elements, interactions dict).
"""
saint_output: pd.DataFrame = pd.read_json(
StringIO(saint_output_json), orient='split')
cyto_elements, interactions = network_plot.get_cytoscape_elements_and_ints(saint_output)
plot_container = network_plot.get_cytoscape_container(cyto_elements, full_height=plot_height)
return (plot_container, cyto_elements, interactions)
[docs]
def network_display_data(
node_data: dict[str, list[dict]],
int_data: dict[str, dict[str, list[str|float]]],
table_height: int,
datatype: str = 'Cytoscape'
) -> list[html.Label | dash_table.DataTable]:
"""Create a table for network connections.
:param node_data: Node data; for Cytoscape use ``{'edgesData': [{'source','target'},...]}``; for visdcc use ``{'edges': ['source_-_target', ...]}``.
:param int_data: Mapping ``source -> target -> [gene_name, avg_spec]``.
:param table_height: Table height in pixels.
:param datatype: ``'Cytoscape'`` or ``'visdcc'``.
:returns: List containing a label and a DataTable with Bait, Prey, PreyGene, AvgSpec.
"""
ret = [['Bait','Prey', 'PreyGene','AvgSpec']]
if datatype == 'Cytoscape':
for e in node_data['edgesData']:
ret.append([e['source'], e['target']])
ret[-1].extend(int_data[e['source']][e['target']])
elif datatype == 'visdcc':
for e in node_data['edges']:
source, target = e.split('_-_')
ret.append([
source,
target,
int_data[source][target]
])
df = pd.DataFrame(data=ret[1:], columns=ret[0])
div_contents = [
html.Label('Selected node connections:'),
dash_table.DataTable(
df.to_dict('records'),
[{"name": i, "id": i} for i in df.columns],
fixed_rows={'headers': True},
style_table={'height': table_height}
)
]
return div_contents
[docs]
def known_plot(filtered_saint_input_json: str,
db_file: str,
rep_colors_with_cont: Dict[str, Dict[str, str]],
figure_defaults: Dict[str, Any],
isoform_agnostic: bool = False) -> Tuple[html.Div, str]:
"""Plot known interactions per bait.
:param filtered_saint_input_json: Filtered SAINT output in pandas split-JSON format.
:param db_file: Path to SQLite database file.
:param rep_colors_with_cont: Mapping for contaminant and non-contaminant colors by bait.
:param figure_defaults: Figure defaults for plotting.
:param isoform_agnostic: If ``True``, match using base UniProt IDs (no isoforms).
:returns: Tuple of (plot Div, processed SAINT output JSON).
"""
logger.info(f'known_plot - started: {datetime.now()}')
upid_a_col: str = 'uniprot_id_a'
upid_b_col: str = 'uniprot_id_b'
if isoform_agnostic:
upid_a_col += '_noiso'
upid_b_col += '_noiso'
saint_output: pd.DataFrame = pd.read_json(
StringIO(filtered_saint_input_json), orient='split')
db_conn = db_functions.create_connection(db_file)
col_order: list = list(saint_output.columns)
knowns: pd.DataFrame = db_functions.get_from_table_by_list_criteria(
db_conn, 'known_interactions', upid_a_col, list(
saint_output['Bait uniprot'].unique())
)
db_conn.close()
# TODO: multibait
saint_output = pd.merge(
saint_output,
knowns,
left_on=['Bait uniprot', 'Prey'],
right_on=[upid_a_col, upid_b_col],
how='left'
)
saint_output['Known interaction'] = saint_output['update_time'].notna()
logger.info(
f'known_plot - knowns: {saint_output["Known interaction"].value_counts()}')
col_order.append('Known interaction')
col_order.extend([c for c in saint_output.columns if c not in col_order])
saint_output = saint_output[col_order]
figure_data: pd.DataFrame = count_knowns(
saint_output, rep_colors_with_cont)
figure_data.sort_values(by=['Bait', 'Known interaction'], ascending=[
True, False], inplace=True)
figure_data.index = figure_data['Bait']
figure_data.drop(columns=['Bait'], inplace=True)
bait_map: dict = {bu: b for b, bu in saint_output[[
'Bait', 'Bait uniprot']].drop_duplicates().values if bu != 'No bait uniprot'}
known_str: str = 'Known interactions found per bait (Known / All):'
no_knowns_found: set = set()
done: set = set()
for bait in figure_data.index:
if bait in done:
continue
done.add(bait)
bdata: pd.DataFrame = figure_data[figure_data.index == bait]
known_sum: int = bdata[bdata["Known interaction"]]["Prey count"].sum()
if known_sum == 0:
no_knowns_found.add(bait)
else:
known_str += f'{bait}: {known_sum} / {bdata["Prey count"].sum()}, '
known_str = known_str.strip().strip(', ') + '. '
known_str += f'No known interactions found: {", ".join(sorted(list(no_knowns_found)))}. '
more_known = 'Known preys available for these baits in the database: '
for index, value in knowns[upid_a_col].value_counts().items():
more_known += f'{bait_map[index]} ({value}), '
more_known = more_known.strip().strip(', ') + '. '
if len(no_knowns_found) == len(figure_data.index.values):
more_known = ''
figtitle = 'High-confidence interactions and identified known interactions'
return (
html.Div(
id='interactomics-saint-known-plot',
children=[
html.H4(id='interactomics-known-header',
children=figtitle),
bar_graph.make_graph(
'interactomics-saint-filt-int-known-graph',
figure_defaults,
figtitle,
figure_data,
'', color_discrete_map=True, y_name='Prey count', x_label='Bait'
),
legends['known'],
html.P(known_str),
html.P(more_known)
],
style={
'overflowX': 'auto',
'whiteSpace': 'nowrap'
}
),
saint_output.to_json(orient='split')
)
[docs]
def pca(saint_output_data: str,
defaults: Dict[str, Any],
replicate_colors: Dict[str, str]) -> Tuple[html.Div, str]:
"""Perform PCA on SAINT output and plot bait relationships.
:param saint_output_data: SAINT output in pandas split-JSON format.
:param defaults: Figure defaults.
:param replicate_colors: Mapping ``'sample groups'`` -> color.
:returns: Tuple of (plot Div, PCA data JSON). Returns empty plot if <2 baits.
"""
data_table: pd.DataFrame = pd.read_json(StringIO(saint_output_data),orient='split')
if len(data_table['Bait'].unique()) < 2:
gdiv = ['Too few samle groups for PCA']
pca_data = ''
else:
data_table = data_table.pivot_table(
index='Prey', columns='Bait', values='AvgSpec')
pc1: str
pc2: str
pca_result: pd.DataFrame
# Compute PCA of the data
spoofed_sample_groups: dict = {i: i for i in data_table.columns}
pc1, pc2, pca_result = matrix_functions.do_pca(
data_table.fillna(0), spoofed_sample_groups, n_components=2)
pca_result.sort_values(by=pc1, ascending=True, inplace=True)
pca_result['Sample group color'] = [replicate_colors['sample groups'][grp] for grp in pca_result['Sample group']]
dlname = 'SPC PCA'
gdiv = [
html.H4(id='interactomics-pca-header', children=dlname),
scatter.make_graph(
'interactomics-pca-plot',
defaults,
dlname,
pca_result,
pc1,
pc2,
'Sample group color',
'Sample group',
hover_data=['Sample group', 'Sample name', pc1,pc2]
),
legends['pca']
]
pca_data = pca_result.to_json(orient='split')
return (
html.Div(
id='interactomics-pca-plot-div',
children=gdiv
),
pca_data
)
[docs]
def enrich(saint_output_json: str,
chosen_enrichments: List[str],
figure_defaults: Dict[str, Any],
keep_all: bool = False,
sig_threshold: float = 0.01,
parameters_file: str = 'config/parameters.toml') -> Tuple[List[html.Div], Dict[str, Any], List[Any]]:
"""Run selected enrichment methods and visualize results.
:param saint_output_json: SAINT output in pandas split-JSON format.
:param chosen_enrichments: List of enrichment method names.
:param figure_defaults: Figure defaults for plotting.
:param keep_all: If ``True``, include non-significant rows meeting fold criteria.
:param sig_threshold: Significance cutoff.
:param parameters_file: Path to parameters TOML used by enrichment admin.
:returns: Tuple of (list of result Divs, dict of enrichment data, list of info).
"""
div_contents:list = []
enrichment_data: dict = {}
enrichment_information: list = []
chosen_enrichments = [e for e in chosen_enrichments if len(e.strip()) > 0]
if len(chosen_enrichments) == 0:
return (
div_contents,
enrichment_data,
enrichment_information
)
e_admin = ea.EnrichmentAdmin(parameters_file)
saint_output: pd.DataFrame = pd.read_json(
StringIO(saint_output_json), orient='split')
enrichment_names: list
enrichment_results: list
enrichment_names, enrichment_results, enrichment_information = e_admin.enrich_all(
saint_output,
chosen_enrichments,
id_column='Prey',
split_by_column='Bait',
split_name='Bait'
)
tablist: list = []
for i, (rescol, sigcol, namecol, result) in enumerate(enrichment_results):
if keep_all:
keep_these: set = set(result[result[rescol] >= 2][namecol].values)
keep_these = keep_these & set(
result[result[sigcol] < sig_threshold][namecol].values)
filtered_result: pd.DataFrame = result[result[namecol].isin(
keep_these)]
else:
filtered_result = result[(result[sigcol]<sig_threshold) & (result[rescol]>=2)]
matrix: pd.DataFrame = pd.pivot_table(
filtered_result,
index=namecol,
columns='Bait',
values=rescol
).fillna(0)
if filtered_result.shape[0] == 0:
graph = html.P('Nothing enriched.')
else:
enrichment_data[enrichment_names[i]] = {
'sigcol': sigcol,
'rescol': rescol,
'namecol': namecol,
'result': result.to_json(orient='split')
}
graph = heatmaps.make_heatmap_graph(
matrix,
f'interactomics-enrichment-{enrichment_names[i]}',
rescol.replace('_', ' '),
figure_defaults,
cmap = 'dense',
dlname = enrichment_names[i],
symmetrical = False
)
table_label: str = f'{enrichment_names[i]} data table'
table: dash_table.DataTable = dash_table.DataTable(
data=filtered_result.to_dict('records'),
columns=[{"name": i, "id": i} for i in filtered_result.columns],
page_size=15,
style_table={
'maxHeight': 600
},
style_data={
'width': '100px', 'minWidth': '25px', 'maxWidth': '250px',
'overflow': 'hidden',
'textOverflow': 'ellipsis',
},
filter_action='native',
id=f'interactomics-enrichment-{table_label.replace(" ","-")}',
)
e_legend: html.P = enrichment_legend(
replace_accent_and_special_characters(enrichment_names[i]),
enrichment_names[i],
rescol,
2,
sigcol,
sig_threshold
)
enrichment_tab: Card = Card(
CardBody(
[
html.H5(f'{enrichment_names[i]} heatmap'),
graph,
e_legend,
html.P(f'{enrichment_names[i]} data table'),
table
],
# style={'width': '98%'}
),
#style={'width': '98%'}
)
tablist.append(
Tab(
enrichment_tab, label=enrichment_names[i],
# style={'width': '98%'}
)
)
if len(enrichment_results) > 0:
div_contents: list = [
html.H4(id='interactomics-enrichment-header', children='Enrichment'),
Tabs(
id='interactomics-enrichment-tabs',
children=tablist,
style={'width': '98%'}
)]
return (div_contents,
enrichment_data,
enrichment_information
)
[docs]
def map_intensity(saint_output_json: str,
intensity_table_json: str,
sample_groups: Dict[str, str]) -> str:
"""Map averaged intensity per group onto SAINT output rows.
:param saint_output_json: SAINT output in pandas split-JSON format.
:param intensity_table_json: Intensity table in pandas split-JSON format.
:param sample_groups: Mapping bait -> group name.
:returns: SAINT output JSON with optional ``Averaged intensity`` column.
"""
intensity_table: pd.DataFrame = pd.read_json(
StringIO(intensity_table_json), orient='split')
saint_output: pd.DataFrame = pd.read_json(
StringIO(saint_output_json), orient='split')
has_intensity: bool = False
intensity_column: list = [np.nan for _ in saint_output.index]
if intensity_table.shape[0] > 1:
if intensity_table.shape[1] > 1:
if intensity_table.columns[0] != 'No data':
has_intensity = True
if has_intensity:
intensity_column = []
for _, row in saint_output.iterrows():
try:
intensity_column.append(
intensity_table[sample_groups[row['Bait']]].loc[row['Prey']].mean())
except KeyError:
intensity_column.append(np.nan)
saint_output['Averaged intensity'] = intensity_column
return saint_output.to_json(orient='split')
[docs]
def saint_histogram(saint_output_json: str,
figure_defaults: Dict[str, Any]) -> Tuple[html.Div, str]:
"""Create a histogram of BFDR scores from SAINT output.
:param saint_output_json: SAINT output in pandas split-JSON format.
:param figure_defaults: Figure defaults for plotting.
:returns: Tuple of (histogram Div, histogram data JSON).
"""
saint_output: pd.DataFrame = pd.read_json(
StringIO(saint_output_json), orient='split')
return (
histogram.make_figure(saint_output, 'BFDR', '', figure_defaults),
saint_output.to_json(orient='split')
)
[docs]
def add_bait_column(saint_output: pd.DataFrame,
bait_uniprot_dict: Dict[str, str]) -> pd.DataFrame:
"""Add bait UniProt and bait-self flags to SAINT output.
:param saint_output: SAINT output DataFrame with ``Bait`` and ``Prey``.
:param bait_uniprot_dict: Mapping bait name -> UniProt IDs (``;`` separated allowed).
:returns: DataFrame with ``Bait uniprot`` and ``Prey is bait`` added.
"""
saint_output['Bait'] = [b.rsplit('_',maxsplit=1)[0] for b in saint_output['Bait'].values]
bu_column: list = []
prey_is_bait: list = []
for _, row in saint_output.iterrows():
if row['Bait'] in bait_uniprot_dict:
bu_column.append(bait_uniprot_dict[row['Bait']])
prey_is_bait.append(row['Prey'].lower().strip() in [b.lower().strip() for b in bu_column[-1].split(';')])
else:
bu_column.append('No bait uniprot')
prey_is_bait.append(False)
saint_output['Bait uniprot'] = bu_column
saint_output['Prey is bait'] = prey_is_bait
return saint_output
[docs]
def saint_cmd(saint_input: Dict[str, List[List[str]]],
saint_tempdir: List[str],
session_uid: str) -> str:
"""Run SAINTexpressSpc on prepared input files.
:param saint_input: Dict with keys ``bait``, ``prey``, ``int`` containing row lists.
:param saint_tempdir: List of path segments for temp dir base.
:param session_uid: Unique identifier to isolate run directory.
:returns: Path to directory containing ``list.txt`` (or dummy if SAINT missing).
:raises OSError: On temp dir creation failure.
:raises sh.CommandNotFound: If SAINTexpressSpc is not available.
"""
temp_dir: str = os.path.join(*(saint_tempdir))
temp_dir = os.path.join(temp_dir, session_uid)
if not os.path.isdir(temp_dir):
os.makedirs(temp_dir)
with (
tempfile.NamedTemporaryFile() as baitfile,
tempfile.NamedTemporaryFile() as preyfile,
tempfile.NamedTemporaryFile() as intfile,
):
baitfile.write(
('\n'.join([
'\t'.join(x) for x in saint_input['bait']
])).encode('utf-8')
)
preyfile.write(
('\n'.join([
'\t'.join(x) for x in saint_input['prey']
])).encode('utf-8')
)
intfile.write(
('\n'.join([
'\t'.join(x) for x in saint_input['int']
])).encode('utf-8')
)
baitfile.flush()
preyfile.flush()
intfile.flush()
try:
sh.SAINTexpressSpc(intfile.name, preyfile.name, baitfile.name, _cwd=temp_dir)
except sh.CommandNotFound:
create_dummy_list_txt(temp_dir, saint_input)
return temp_dir
[docs]
def create_dummy_list_txt(temp_dir: str, saint_input: Dict[str, List[List[str]]]) -> None:
"""Create a dummy SAINT ``list.txt`` when SAINTexpressSpc is unavailable.
Generates a plausible-looking SAINT output file using random values so that
downstream steps can proceed in demo or fallback mode.
:param temp_dir: Target directory to write ``list.txt`` and marker file.
:param saint_input: SAINT input dict with keys ``bait``, ``prey``, ``int``.
:returns: None
"""
baits = {}
baitmap = {}
for baitrun, group, ctrl in saint_input['bait']:
baits.setdefault(ctrl, {}).setdefault(group, [])
baits[ctrl][group].append(baitrun)
baitmap[baitrun] = (ctrl, group)
preys = {}
for prey, _, gname in saint_input['prey']:
preys[prey] = gname
counts = {}
max_b_len = 0
for baitrun, group, prey, spc in saint_input['int']:
counts.setdefault(group, {}).setdefault(prey, [])
counts[group][prey].append(spc)
max_b_len = max(max_b_len, len(counts[group][prey]))
control_counts = {}
max_ctrl_len = 0
for baitgrp in baits['C'].keys():
for prey, spc in counts[baitgrp].items():
control_counts.setdefault(prey,[])
control_counts[prey].extend(spc)
max_ctrl_len = max(max_ctrl_len, len(control_counts[prey]))
def pad(li, le):
if len(li) > le:
return li
rlist = li
for i in range(len(li), le):
rlist.append('0')
return rlist
list_txt = []
alpha = 1
beta = 0.3
for group, pdic in counts.items():
if group in baits['C']: continue
for prey, spclist in pdic.items():
bfdr_random = np.random.beta(alpha, beta)
score_random = 1-bfdr_random*3
p_ctrl_list = []
if prey in control_counts:
p_ctrl_list = control_counts[prey]
spclist = pad(spclist, max_b_len)
p_ctrl_list = pad(p_ctrl_list, max_ctrl_len)
list_txt.append([
group,
prey,
preys[prey],
'|'.join(spclist),
sum([int(x) for x in spc])/len(spc),
sum([int(x) for x in spc]),
len(baits['T'][group]),
'|'.join(p_ctrl_list),
0,0,0,0,score_random, 1200, bfdr_random, np.nan])
lt = pd.DataFrame(data=list_txt, columns=['Bait', 'Prey', 'PreyGene', 'Spec', 'SpecSum', 'AvgSpec', 'NumReplicates', 'ctrlCounts', 'AvgP', 'MaxP', 'TopoAvgP', 'TopoMaxP', 'SaintScore', 'FoldChange', 'BFDR', 'boosted_by'])
lt.to_csv(os.path.join(temp_dir, 'list.txt'), sep='\t', index=False)
with open(os.path.join(temp_dir, 'list_is_dummy.txt'), 'w') as f:
f.write('this list has been created by dummy saint simulator that produces nonsense. This happened because SAINTexpressSpc was not found.')
[docs]
def run_saint(saint_input: Dict[str, List[List[str]]],
saint_tempdir: List[str],
session_uid: str,
bait_uniprots: Dict[str, str],
cleanup: bool = True) -> Tuple[str, bool]:
"""Execute SAINT pipeline and return processed output.
:param saint_input: SAINT input dict.
:param saint_tempdir: Temp directory base as path segments.
:param session_uid: Unique run identifier.
:param bait_uniprots: Mapping bait -> UniProt IDs.
:param cleanup: If ``True``, remove temp files after success.
:returns: Tuple of (output JSON or error string, saint_missing_flag).
"""
# Can not use logging in this function, since it's called from a background_callback_manager using celery, and logging will lead to a hang.
# Instead, we can use print statements, and they will show up as WARNINGS in celery log.
temp_dir: str = ''
if ('bait' in saint_input) and ('prey' in saint_input):
temp_dir = saint_cmd(saint_input, saint_tempdir, session_uid)
failed: bool = not os.path.isfile(os.path.join(temp_dir, 'list.txt'))
saintfail: bool = os.path.isfile(os.path.join(temp_dir, 'list_is_dummy.txt'))
if failed:
ret: str = 'SAINT failed. Can not proceed.'
else:
ret = add_bait_column(pd.read_csv(os.path.join(
temp_dir, 'list.txt'), sep='\t'), bait_uniprots)
ret = ret.to_json(orient='split')
if cleanup:
try:
shutil.rmtree(temp_dir)
except PermissionError as e:
print(
f'run_saint: Could not clean up after SAINT run: {datetime.now()} {e}')
return (ret, saintfail)
[docs]
def prepare_crapome(db_conn: sqlite3.Connection,
crapomes: List[str]) -> pd.DataFrame:
"""Prepare CRAPome tables for downstream filtering.
:param db_conn: SQLite connection.
:param crapomes: List of CRAPome table names (possibly with suffixes).
:returns: DataFrame with per-CRAPome frequency and spc averages plus max frequency.
"""
crapomes = [c.rsplit('_(',maxsplit=1)[0] for c in crapomes]
crapome_tables: list = [
db_functions.get_full_table_as_pd(db_conn, tablename, index_col='protein_id') for tablename in crapomes
]
crapome_table: pd.DataFrame = pd.concat(
[
crapome_tables[i][['frequency', 'spc_avg']].
rename(columns={
'frequency': f'{table_name}_frequency',
'spc_avg': f'{table_name}_spc_avg'
})
for i, table_name in enumerate(crapomes)
],
axis=1
)
crapome_freq_cols: list = [
c for c in crapome_table.columns if '_frequency' in c]
crapome_table['Max crapome frequency'] = crapome_table[crapome_freq_cols].max(
axis=1)
return crapome_table
[docs]
def prepare_controls(input_data_dict: Dict[str, Any],
uploaded_controls: List[str],
additional_controls: List[str],
db_conn: sqlite3.Connection,
select_most_similar_only: bool = False,
top_n: int = 30) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Assemble uploaded and DB controls for SAINT.
:param input_data_dict: Inputs including sample groups and SPC data tables.
:param uploaded_controls: Names of uploaded control groups.
:param additional_controls: Additional DB control table names.
:param db_conn: SQLite connection.
:param select_most_similar_only: If ``True``, keep only most similar controls.
:param top_n: Number of controls to keep per-sample when filtering.
:returns: Tuple of (SPC table without control columns, combined control table).
"""
logger.debug(f'additional controls: {additional_controls}')
additional_controls = [c.rsplit('_(',maxsplit=1)[0] for c in additional_controls]
logger.debug(f'preparing uploaded controls: {uploaded_controls}')
logger.debug(f'preparing additional controls: {additional_controls}')
sample_groups: dict = input_data_dict['sample groups']['norm']
spc_table: pd.DataFrame = pd.read_json(
StringIO(input_data_dict['data tables']['spc']), orient='split')
controls: list = []
for control_name in additional_controls:
ctable: pd.DataFrame = db_functions.get_full_table_as_pd(
db_conn, control_name, index_col='PROTID')
ctable.index.name = ''
controls.append(ctable)
logger.debug(f'control {control_name} shape: {ctable.shape}, indexvals: {list(ctable.index)[:5]}')
if (len(controls) > 0) and select_most_similar_only:
# groupby to merge possible duplicate columns that are annotated in multiple sets
# mean grouping should have no effect, since PSM values SHOULD be the same in any case.
control_table = filter_controls_by_similarity(spc_table, controls, top_n)
controls = [control_table]
control_cols: list = []
for cg in uploaded_controls:
control_cols.extend(sample_groups[cg])
controls.append(spc_table[control_cols])
spc_table = spc_table[[c for c in spc_table.columns if c not in control_cols]]
control_table: pd.DataFrame = pd.concat(controls, axis=1).T.groupby(level=0).mean().T
logger.debug(f'Controls concatenated: {control_table.shape}, indexvals: {list(control_table.index)[:5]}')
logger.debug(f'SPC table index: {list(spc_table.index)[:5]}')
# Discard any control preys that are not identified in baits. It will not affect SAINT results.
control_table.drop(index=set(control_table.index) -
set(spc_table.index), inplace=True)
logger.debug(f'non-detected preys dropped: {control_table.shape}')
return (spc_table, control_table)
[docs]
def filter_controls_by_similarity(spc_table: pd.DataFrame,
controls: List[pd.DataFrame],
top_n: int) -> pd.DataFrame:
"""Filter control runs by similarity to experiment runs.
:param spc_table: Spectral count table for experiment samples.
:param controls: List of candidate control tables.
:param top_n: Number of top similar controls to keep per sample.
:returns: Filtered control table with selected columns.
"""
control_table: pd.DataFrame = pd.concat(controls, axis=1).T.groupby(level=0).mean().T
chosen_controls: list = []
for c in spc_table.columns:
controls_ranked_by_similarity: list = matrix_functions.ranked_dist(
spc_table[[c]], control_table)
chosen_controls.extend([s[0] for s in controls_ranked_by_similarity[:top_n]])
control_table = control_table[list(set(chosen_controls))]
return control_table
[docs]
def add_crapome(saint_output_json: str,
crapome_json: str) -> str:
"""Merge CRAPome annotations into SAINT output JSON.
:param saint_output_json: SAINT output in pandas split-JSON format.
:param crapome_json: CRAPome table in pandas split-JSON format.
:returns: Merged SAINT output JSON.
"""
if 'Saint failed.' in saint_output_json:
return saint_output_json
saint_output: pd.DataFrame = pd.read_json(
StringIO(saint_output_json), orient='split')
crapome: pd.DataFrame = pd.read_json(StringIO(crapome_json),orient='split')
return pd.merge(
saint_output,
crapome,
left_on='Prey',
right_index=True,
how='left'
).to_json(orient='split')
[docs]
def make_saint_dict(spc_table: pd.DataFrame,
rev_sample_groups: Dict[str, str],
control_table: pd.DataFrame,
protein_table: pd.DataFrame) -> Dict[str, List[List[str]]]:
"""Create SAINT input dict from SPC and metadata tables.
:param spc_table: Spectral count data.
:param rev_sample_groups: Mapping sample -> group.
:param control_table: Control spectral count table.
:param protein_table: Protein info with columns ``uniprot_id``, ``length``, ``gene_name``.
:returns: Dict with keys ``bait``, ``prey``, ``int`` as lists of rows.
"""
protein_lenghts_and_names = {}
logger.info(
f'make_saint_dict: start: {datetime.now()}')
for _, row in protein_table.iterrows():
protein_lenghts_and_names[row['uniprot_id']] = {
'length': row['length'], 'gene name': row['gene_name']}
bait: list = []
prey: list = []
inter: list = []
for col in spc_table.columns:
bait.append([col, rev_sample_groups[col]+'_bait', 'T'])
for col in control_table.columns:
if col in rev_sample_groups:
bait.append([col, rev_sample_groups[col]+'_bait', 'C'])
else:
bait.append([col, 'inbuilt_ctrl', 'C'])
logger.info(
f'make_saint_dict: Baits prepared: {datetime.now()}')
logger.info(
f'make_saint_dict: Control table shape: {control_table.shape}')
control_melt: pd.DataFrame = pd.melt(
control_table, ignore_index=False).replace(0, np.nan).dropna().reset_index()
sgroups = []
for _, srow in control_melt.iterrows():
sgroup = 'inbuilt_ctrl'
if srow['variable'] in rev_sample_groups:
sgroup = rev_sample_groups[srow['variable']]+'_bait'
sgroups.append(sgroup)
control_melt['sgroup'] = sgroups
control_melt = control_melt.reindex(
columns=['variable', 'sgroup', 'index', 'value'])
control_melt['value'] = control_melt['value'].astype(int)
inter.extend(control_melt.values
.astype(str).tolist())
logger.info(
f'make_saint_dict: Control table melted: {control_melt.shape}: {datetime.now()}')
logger.info(
f'make_saint_dict: Control interactions prepared: {datetime.now()}')
for uniprot, srow in pd.melt(spc_table, ignore_index=False).replace(0, np.nan).dropna().iterrows():
sgroup: str = 'inbuilt_ctrl'
if srow['variable'] in rev_sample_groups:
sgroup = rev_sample_groups[srow['variable']]+'_bait'
inter.append([srow['variable'], sgroup, uniprot, str(int(srow['value']))])
logger.info(
f'make_saint_dict: SPC table interactions prepared: {datetime.now()}')
for uniprotid in (set(control_table.index.values) | set(spc_table.index.values)):
try:
plen: str = str(protein_lenghts_and_names[uniprotid]['length'])
gname: str = str(protein_lenghts_and_names[uniprotid]['gene name'])
except KeyError:
logger.warning(
f'make_saint_dict: No length found for uniprot: {uniprotid}')
plen = '200'
gname = str(uniprotid)
prey.append([str(uniprotid), plen, gname])
logger.info(
f'make_saint_dict: Preys prepared: {datetime.now()}')
return {'bait': bait, 'prey': prey, 'int': inter}
[docs]
def do_ms_microscopy(saint_output_json: str,
db_file: str,
figure_defaults: Dict[str, Any],
version: str = 'v1.0') -> Tuple[html.Div, str]:
"""Perform MS-microscopy localization analysis and visualize.
:param saint_output_json: SAINT output in pandas split-JSON format.
:param db_file: SQLite DB path for MS-microscopy reference.
:param figure_defaults: Figure defaults.
:param version: Analysis version tag.
:returns: Tuple of (plots Div, results JSON).
"""
saint_output: pd.DataFrame = pd.read_json(
StringIO(saint_output_json), orient='split'
)
db_conn = db_functions.create_connection(db_file)
msmic_reference = db_functions.get_full_table_as_pd(
db_conn, 'msmicroscopy', index_col='Interaction'
)
db_conn.close() # type: ignore
msmic_results: pd.DataFrame = ms_microscopy.generate_msmic_dataframes(saint_output, msmic_reference, )
polar_plots: list = [
(bait, ms_microscopy.localization_graph(f'interactomics-msmic-{bait}',figure_defaults, 'polar', bait, data_row))
for bait, data_row in msmic_results.iterrows()
]
msmic_heatmap = ms_microscopy.localization_graph(f'interactomics-msmic-heatmap', figure_defaults, 'heatmap', 'All baits', msmic_results)
tablist: list = [
Tab(
Card(
CardBody(
[
html.H5('MS-microscopy heatmap'),
msmic_heatmap,
legends['ms-microscopy-all']
],
# style={'width': '98%'}
),
# style={'width': '98%'}
),
label = 'Overall results',
#style={'width': '98%'}
)
]
i = 0
for bait, polar_graph in polar_plots:
tablist.append(
Tab(
Card(
CardBody(
[
html.H5(f'MS-microscopy for {bait}'),
polar_graph,
leg_rep(
legends['ms-microscopy-single'],
'BAITSTRING',
bait
)
],
# style={'width': '98%'}
),
#style={'width': '98%'}
),
label = bait,
#style={'width': '98%'}
)
)
return (
html.Div(
id='interactomics-msmicroscopy-plot-div',
children=[
html.H4(id='interactomics-msmic-header', children='MS-microscopy'),
Tabs(
id = 'interactomics-msmicroscopy-tabs',
children = tablist,
style = {'width': '98%'}
),
]
),
msmic_results.to_json(orient='split')
)
[docs]
def generate_saint_container(input_data_dict: Dict[str, Any],
uploaded_controls: List[str],
additional_controls: List[str],
crapomes: List[str],
db_file: str,
select_most_similar_only: bool = False,
n_controls: int = 30) -> Tuple[html.Div, Dict[str, List[List[str]]], str]:
"""Build SAINT UI container and prepare inputs.
:param input_data_dict: Input data and metadata including sample groups.
:param uploaded_controls: Uploaded control group names.
:param additional_controls: Additional DB control names.
:param crapomes: CRAPome dataset names.
:param db_file: SQLite database path.
:param select_most_similar_only: If ``True``, filter controls by similarity.
:param n_controls: Number of controls to keep when filtering.
:returns: Tuple of (container Div, SAINT input dict, CRAPome JSON).
"""
if '["No data"]' in input_data_dict['data tables']['spc']:
return (html.Div(['No spectral count data in input, cannot run SAINT.']),{},'')
logger.info(
f'generate_saint_container: preparations started: {datetime.now()}')
db_conn = db_functions.create_connection(db_file)
additional_controls = [
f'control_{ctrl_name.lower().replace(" ","_")}' for ctrl_name in additional_controls]
crapomes = [
f'crapome_{crap_name.lower().replace(" ","_")}' for crap_name in crapomes]
logger.info(f'generate_saint_container: DB connected')
spc_table: pd.DataFrame
control_table: pd.DataFrame
spc_table, control_table = prepare_controls(
input_data_dict, uploaded_controls, additional_controls, db_conn, select_most_similar_only, n_controls)
logger.info(f'generate_saint_container: Controls prepared')
protein_list: list = list(
set(spc_table.index.values) | set(control_table.index))
protein_table: pd.DataFrame = db_functions.get_from_table(
db_conn,
'proteins',
select_col=[
'uniprot_id',
'length',
'gene_name'
],
as_pandas=True
)
logger.info(f'generate_saint_container: Protein table retrieved')
protein_table = protein_table[protein_table['uniprot_id'].isin(
protein_list)]
if len(crapomes) > 0:
crapome: pd.DataFrame = prepare_crapome(db_conn, crapomes)
crapome.drop(index=set(crapome.index) -
set(spc_table.index), inplace=True)
else:
crapome = pd.DataFrame()
db_conn.close()
saint_dict: dict = make_saint_dict(
spc_table, input_data_dict['sample groups']['rev'], control_table, protein_table)
logger.info(
f'generate_saint_container: SAINT dict done: {datetime.now()}')
return (
html.Div(
id='interactomics-saint-container',
children=[
html.Div(id='interactomics-saint-filtering-container')
]
),
saint_dict,
crapome.to_json(orient='split')
)
[docs]
def saint_filtering(saint_output_json: str,
bfdr_threshold: float,
crapome_percentage: float,
crapome_fc: float,
do_rescue: bool = False) -> str:
"""Filter SAINT output by BFDR and CRAPome thresholds.
:param saint_output_json: SAINT output in pandas split-JSON format.
:param bfdr_threshold: BFDR threshold for filtering.
:param crapome_percentage: CRAPome frequency threshold.
:param crapome_fc: CRAPome fold-change threshold for rescue.
:param do_rescue: If ``True``, keep preys that pass in any bait.
:returns: Filtered SAINT output JSON.
"""
saint_output: pd.DataFrame = pd.read_json(
StringIO(saint_output_json), orient='split')
logger.info(f'saint filtering - beginning: {saint_output.shape}')
logger.info(
f'saint filtering - beginning nodupes: {saint_output.drop_duplicates().shape}')
saint_output = saint_output.drop_duplicates()
crapome_columns: list = []
for column in saint_output.columns:
if '_frequency' in column:
crapome_columns.append(
(column, column.replace('_frequency', '_spc_avg')))
keep_col: list = []
bfdr_disc = 0
crapome_disc = 0
keep_preys: set = set()
for _, row in saint_output.iterrows():
keep: bool = True
if row['BFDR'] > bfdr_threshold:
keep = False
bfdr_disc += 1
elif 'Max crapome frequency' in saint_output.columns:
if row['Max crapome frequency'] > crapome_percentage:
for freq_col, fc_col in crapome_columns:
if row[freq_col] >= crapome_percentage:
if row[fc_col] <= crapome_fc:
keep = False
crapome_disc += 1
break
if keep:
keep_preys.add(row['Prey'])
keep_col.append(keep)
logger.info(
f'saint filtering - Preys pass filter: {len(keep_preys)}')
saint_output['Passes filter'] = keep_col
logger.info(
f'saint filtering - Saint output pass filter: {saint_output["Passes filter"].value_counts()}')
saint_output['Passes filter with rescue'] = saint_output['Prey'].isin(
keep_preys)
logger.info(
f'saint filtering - Saint output pass filter with rescue: {saint_output["Passes filter with rescue"].value_counts()}')
if do_rescue:
use_col: str = 'Passes filter with rescue'
else:
use_col = 'Passes filter'
filtered_saint_output: pd.DataFrame = saint_output[
saint_output[use_col]
].copy()
logger.info(
f'saint filtering - filtered size: {filtered_saint_output.shape}')
if 'Bait uniprot' in filtered_saint_output.columns:
filtered_saint_output = filtered_saint_output[
filtered_saint_output['Prey is bait']==False
]
colorder: list = ['Bait', 'Bait uniprot', 'Prey', 'PreyGene', 'Prey is bait',
'Passes filter', 'Passes filter with rescue', 'AvgSpec']
colorder.extend(
[c for c in filtered_saint_output.columns if c not in colorder])
filtered_saint_output = filtered_saint_output[colorder]
logger.info(
f'saint filtering - bait removed filtered size: {filtered_saint_output.shape}')
logger.info(
f'saint filtering - bait removed filtered size nodupes: {filtered_saint_output.drop_duplicates().shape}')
return filtered_saint_output.reset_index().drop(columns=['index']).to_json(orient='split')
[docs]
def get_saint_matrix(saint_data_json: str) -> pd.DataFrame:
"""Convert SAINT output JSON to prey x bait matrix of AvgSpec.
:param saint_data_json: SAINT output in pandas split-JSON format.
:returns: Pivot table DataFrame (rows=Prey, cols=Bait, values=AvgSpec).
"""
df = pd.read_json(StringIO(saint_data_json),orient='split')
return df.pivot_table(index='Prey',columns='Bait',values='AvgSpec')
[docs]
def saint_counts(filtered_output_json: str,
figure_defaults: Dict[str, Any],
replicate_colors: Dict[str, str]) -> Tuple[html.Div, str]:
"""Count prey per bait and plot as a bar chart.
:param filtered_output_json: Filtered SAINT output in pandas split-JSON format.
:param figure_defaults: Figure defaults for plotting.
:param replicate_colors: Mapping ``'sample groups'`` -> color.
:returns: Tuple of (bar plot Div, count data JSON).
"""
count_df: pd.DataFrame = pd.read_json(StringIO(filtered_output_json),orient='split')['Bait'].\
value_counts().\
to_frame(name='Prey count')
count_df['Color'] = [
replicate_colors['sample groups'][index] for index in count_df.index.values
]
return (
bar_graph.bar_plot(
figure_defaults,
count_df,
title='',
hide_legend=True,
x_label='Sample group'
),
count_df.to_json(orient='split')
)