"""
Quick statistical utilities for QC and differential analysis.
Includes ANOVA, pairwise differential testing, and helpers to compute
basic per-sample metrics used across figures.
"""
import pandas as pd
import numpy as np
from statsmodels.stats import multitest
from scipy.stats import ttest_ind, ttest_rel, f_oneway
from typing import Any
from components.db_functions import map_protein_info
[docs]
def anova(dataframe: pd.DataFrame, sample_groups: dict) -> pd.DataFrame:
"""Run one-way ANOVA across sample groups for each row.
:param dataframe: Wide matrix with samples as columns and proteins as rows.
:param sample_groups: Mapping group -> list of column names.
:returns: DataFrame with F-statistic, p-value, and Benjamini-Hochberg q-value.
"""
# Create an empty DataFrame to store the results
results = []
index = []
for protein in dataframe.index:
# Extract the values for each group for the current protein
groups = [dataframe.loc[protein, samples].dropna().values for samples in sample_groups.values()]
# Perform the ANOVA test for the current protein across all sample groups
f_stat, p_value = f_oneway(*groups)
results.append([f_stat, p_value])
index.append(protein)
ret_df: pd.DataFrame = pd.DataFrame(data=results,index=index,columns=['F-static','p-value'])
_, p_value_adj, _, _ = multitest.multipletests(ret_df['p-value'], method='fdr_bh')
ret_df['q-value'] = p_value_adj
return ret_df
[docs]
def differential(data_table: pd.DataFrame, sample_groups: dict, comparisons: list, data_is_log2_transformed: bool = True, namemap: dict = None, adj_p_thr: float = 0.01, fc_thr:float = 1.0, test_type: str = 'independent', db_file_path: str = None) -> pd.DataFrame:
"""Compute pairwise differential statistics for specified comparisons.
:param data_table: Wide matrix with samples as columns and proteins as rows.
:param sample_groups: Mapping from group name to list of column names.
:param comparisons: List of ``(sample_group, control_group)`` pairs.
:param data_is_log2_transformed: If ``True``, data is already log2; otherwise log2 means are computed.
:param namemap: Optional mapping from index to display name.
:param adj_p_thr: FDR q-value threshold used for the ``Significant`` flag.
:param fc_thr: Absolute log2 fold change threshold for ``Significant``.
:param test_type: ``'independent'`` or ``'paired'`` t-test.
:param db_file_path: Optional database path for gene mapping.
:returns: Long-form DataFrame with FC, p-values, q-values and metadata per comparison.
"""
sig_data: list = []
for sample, control in comparisons:
sample_columns: list = sample_groups[sample]
control_columns: list = sample_groups[control]
if data_is_log2_transformed:
log2_fold_change: pd.Series = data_table[sample_columns].mean(
axis=1) - data_table[control_columns].mean(axis=1)
else:
log2_fold_change: pd.Series = np.log2(data_table[sample_columns].mean(
axis=1)) - np.log2(data_table[control_columns].mean(axis=1))
sample_mean_val: pd.Series = data_table[sample_columns].mean(axis=1)
control_mean_val: pd.Series = data_table[control_columns].mean(axis=1)
# Calculate the p-value for each protein using a two-sample t-test
if test_type == 'independent':
p_value: float = data_table.apply(lambda x: ttest_ind(x[sample_columns], x[control_columns])[1], axis=1)
elif test_type == 'paired':
p_value: float = data_table.apply(lambda x: ttest_rel(x[sample_columns], x[control_columns])[1], axis=1)
# Adjust the p-values for multiple testing using the Benjamini-Hochberg correction method
_: Any
p_value_adj: np.ndarray
_, p_value_adj, _, _ = multitest.multipletests(p_value, method='fdr_bh')
# Create a new dataframe containing the fold change and adjusted p-value for each protein
result: pd.DataFrame = pd.DataFrame(
{
'fold_change': log2_fold_change,
'p_value_adj': p_value_adj,
'p_value_adj_neg_log10': -np.log10(p_value_adj),
'p_value': p_value,
'sample_mean_value': sample_mean_val,
'control_mean_value': control_mean_val})
if namemap:
result['Name'] = [namemap[i] for i in data_table.index.values]
result['Identifier'] = data_table.index
else:
result['Name'] = data_table.index.values
result['Gene'] = map_protein_info(result.index, db_file_path=db_file_path)
result['Sample'] = sample
result['Control'] = control
result['Significant'] = ((result['p_value_adj']<adj_p_thr) & (result['fold_change'].abs() > fc_thr))
result.sort_values(by='Significant',ascending=True,inplace=True)
#result['p_value_adj_neg_log10'] = -np.log10(result['p_value_adj'])
sig_data.append(result)
if len(sig_data) == 0:
return None
return pd.concat(sig_data,ignore_index=True)[
['Sample',
'Control',
'Name',
'Gene',
'Significant',
'fold_change',
'p_value',
'p_value_adj',
'p_value_adj_neg_log10',
'sample_mean_value',
'control_mean_value'
]]
[docs]
def get_count_data(data_table: pd.DataFrame, contaminant_list: list = None) -> pd.DataFrame:
"""Count non-NA entries per column, optionally excluding contaminants.
:param data_table: Input DataFrame.
:param contaminant_list: Optional list of identifiers to treat as contaminants.
:returns: DataFrame with counts and an ``Is contaminant`` flag when applicable.
"""
data: pd.DataFrame
data = data_table.\
notna().sum().\
to_frame(name='Protein count')
data.index.name = 'Sample name'
if contaminant_list is not None:
contaminants = data_table[data_table.index.isin(
contaminant_list)].notna().sum()
data['Protein count'] = data['Protein count'] - contaminants
data['Is contaminant'] = False
cont_data: pd.DataFrame = contaminants.to_frame(name='Protein count')
cont_data['Is contaminant'] = True
data = pd.concat([cont_data, data]).reset_index().rename(
columns={'index': 'Sample name'})
data.index = data['Sample name']
data = data.drop(columns='Sample name')
return data
[docs]
def get_coverage_data(data_table: pd.DataFrame) -> pd.DataFrame:
"""Compute identification coverage counts.
:param data_table: Input DataFrame.
:returns: DataFrame of value counts of the number of samples each protein is identified in.
"""
return pd.DataFrame(
data_table.notna()
.astype(int)
.sum(axis=1)
.value_counts()
).rename(columns={'count': 'Protein count'})
[docs]
def get_na_data(data_table: pd.DataFrame) -> pd.DataFrame:
"""Compute NA percentage per column.
:param data_table: Input DataFrame.
:returns: DataFrame with ``Missing value %`` per sample.
"""
data: pd.DataFrame = ((data_table.
isna().sum() / data_table.shape[0]) * 100).\
to_frame(name='Missing value %')
data.index.name = 'Sample name'
return data
[docs]
def get_sum_data(data_table) -> pd.DataFrame:
"""Compute column sums.
:param data_table: Input DataFrame.
:returns: DataFrame with ``Value sum`` per sample.
"""
data: pd.DataFrame = data_table.sum().\
to_frame(name='Value sum')
data.index.name = 'Sample name'
return data
[docs]
def get_mean_data(data_table) -> pd.DataFrame:
"""Compute column means.
:param data_table: Input DataFrame.
:returns: DataFrame with ``Value mean`` per sample.
"""
data: pd.DataFrame = data_table.mean().\
to_frame(name='Value mean')
data.index.name = 'Sample name'
return data
[docs]
def get_comparative_data(data_table, sample_groups) -> tuple:
"""Split data into a list of group-specific DataFrames.
:param data_table: Wide matrix with samples as columns.
:param sample_groups: Mapping from sample to group name.
:returns: Tuple of (list of group names, list of DataFrames).
"""
sample_group_names: list = sorted(
list(set([g for _, g in sample_groups.items()])))
comparative_data: list = []
for sample_group_name in sample_group_names:
sample_columns: list = [
sn for sn, sg in sample_groups.items() if sg == sample_group_name]
comparative_data.append(data_table[sample_columns])
return (
sample_group_names,
comparative_data
)
[docs]
def get_common_data(data_table: pd.DataFrame, rev_sample_groups: dict, only_groups: list|None = None) -> dict:
"""Collect sets of identified proteins per group.
:param data_table: Input DataFrame with proteins as index and samples as columns.
:param rev_sample_groups: Mapping from sample name to group name.
:param only_groups: Optional subset of group names to include.
:returns: Dict mapping group name to set of identified proteins.
"""
group_sets: dict = {}
for column in data_table.columns:
col_proteins: set = set(data_table[[column]].dropna().index.values)
group_name: str = rev_sample_groups[column]
if (only_groups is not None) and (group_name not in only_groups):
continue
group_sets.setdefault(group_name, set())
group_sets[group_name] |= col_proteins
return group_sets