Source code for app.components.tools.R_tools

from datetime import datetime
import os
import uuid
import pandas as pd
import tempfile
import sh
import numpy as np

[docs] def vsn(dataframe: pd.DataFrame, random_seed: int, errorfile: str) -> pd.DataFrame: """Apply VSN normalization via R's vsn::justvsn. :param dataframe: Input DataFrame (rows=ids in index; columns=samples). :param random_seed: Seed for reproducibility in R. :param errorfile: Base path to append Rscript stderr on failure. :returns: DataFrame of VSN-transformed values, aligned to input. """ tempname: str = str(uuid.uuid4()) tempdir: str = str(uuid.uuid4()) dataframe.index.name = 'PROTID' script: list = [ f'tempdir <- "{tempdir}"', 'dir.create(tempdir, showWarnings = FALSE, recursive = TRUE)', 'Sys.setenv(TMPDIR=tempdir)', # Set temporary directory 'library("vsn")', f'set.seed({random_seed})', f'setwd("{sh.pwd().strip()}")', f'Sys.setenv(R_USER="{sh.pwd().strip()}")', # Set R user directory f'data <- read.table("{tempname}",sep="\\t",header=TRUE,row.names="{dataframe.index.name}")', 'm = justvsn(data.matrix(data))', f'write.table(m,file="{tempname}",sep="\\t",col.names=NA,quote=FALSE)', '' ] return run_rscript(script, dataframe, tempname, errorfile, replace_dir = tempdir)
[docs] def run_rscript(r_script_contents:list, r_script_data: pd.DataFrame, replace_name: str, errorfile: str|list, replace_dir:str|None = None, input_df_has_index:bool = True): """Execute an R script with a temp data file and return parsed output. :param r_script_contents: Lines of the R script; occurrences of ``replace_name`` are replaced with temp paths. :param r_script_data: DataFrame to write to temp file for R to read. :param replace_name: Placeholder token to be replaced with temp filename. :param errorfile: Base path (e.g. 'logs/errors.log') or list of a file path (e.g. ['logs','errors.log']) for error log on failure. :param replace_dir: Optional directory placeholder to replace with temp dir. :param input_df_has_index: Whether to include index when writing CSV/TSV. :returns: DataFrame parsed from R output file. :raises Exception: Re-raises Rscript execution errors after logging. """ if isinstance(errorfile, list): errorfile = os.path.join(*errorfile) with tempfile.TemporaryDirectory() as tmpdir: with tempfile.NamedTemporaryFile() as datafile: repwith = datafile.name r_script_contents = [line.replace(replace_name,repwith) for line in r_script_contents] # Ensure R finds packages installed into the CI user library lib_setup = [ 'lib_user <- Sys.getenv("R_LIBS_USER")', 'if (nzchar(lib_user)) .libPaths(c(lib_user, .libPaths()))' ] r_script_contents = lib_setup + r_script_contents if replace_dir: r_script_contents = [line.replace(replace_dir,tmpdir) for line in r_script_contents] r_script_data.to_csv(datafile, sep='\t', index=input_df_has_index) with tempfile.NamedTemporaryFile() as scriptfile: scriptfile.write('\n'.join(r_script_contents).encode('utf-8')) scriptfile.flush() try: sh.Rscript(scriptfile.name) except Exception as e: datestr = str(datetime.now()).split()[0] # quick n dirty way to get just the date without time with open(f'{errorfile}','a') as fil: fil.write(f'===================\n{datestr}\n\n{e}\n\n{str(e.stderr)}\n-----------------------\n') raise e with open(datafile.name, "r") as f: out = f.read().split('\n') out = [o.split('\t')[1:] for o in out[1:] if len(o)>0] # skip empty rows script_output_df = pd.DataFrame(data = out) script_output_df = script_output_df.replace('NA',np.nan).replace('',np.nan).astype(float) script_output_df.columns = r_script_data.columns # Restore columns and index in case R renames anything from either. script_output_df.index = r_script_data.index return script_output_df
[docs] def impute_qrilc(dataframe: pd.DataFrame, random_seed: int, errorfile: str) -> pd.DataFrame: """Impute missing values using QRILC (via imputeLCMD). :param dataframe: Input DataFrame with missing values. :param random_seed: Seed for reproducibility. :param errorfile: Base path for error log on failure. :returns: DataFrame with imputed values. """ tempname: str = str(uuid.uuid4()) script: list = [ 'library("imputeLCMD")', f'set.seed({random_seed})', f'df <- read.csv("{tempname}",sep="\\t",row.names=1)', f'write.table(data.frame(impute.QRILC(df,tune.sigma=1)[1]),file="{tempname}",sep="\\t")' ] return run_rscript(script, dataframe, tempname, errorfile)
[docs] def impute_random_forest(dataframe: pd.DataFrame, random_seed: int, rev_sample_groups: dict, errorfile: str) -> pd.DataFrame: """Impute missing values using randomForest::rfImpute grouped by sample groups. :param dataframe: Input DataFrame with missing values (rows=ids; cols=samples). :param random_seed: Seed for reproducibility. :param rev_sample_groups: Mapping sample -> group for supervised imputation. :param errorfile: Base path for error log on failure. :returns: DataFrame with imputed values. """ tempname: str = str(uuid.uuid4()) with tempfile.NamedTemporaryFile() as groupsfile: groupsfile.write('sample\tgroup\n'.encode('utf-8')) groupsfile.write('\n'.join([f'{k}\t{v}' for k,v in rev_sample_groups.items()]).encode('utf-8')) groupsfile.flush() script: list = [ 'suppressPackageStartupMessages({', ' library(readr)', ' library(randomForest)', '})', f'INPUT_TSV <- "{tempname}" # col1 = protein IDs, rest = numeric samples', f'GROUPS_TSV <- "{groupsfile.name}" # sample, group', f'OUTPUT_TSV <- "{tempname}"', f'SEED <- {random_seed}', 'NTREE <- 300', 'ITER <- 5', 'set.seed(SEED)', 'dat <- read_tsv(INPUT_TSV, col_types = cols(.default = col_guess()))', 'id_colname <- names(dat)[1]', 'protein_ids <- as.character(dat[[1]])', 'expr_df <- dat[, -1, drop = FALSE]', 'X <- as.matrix(expr_df)', 'rownames(X) <- protein_ids', 'groups <- read_tsv(GROUPS_TSV,', ' col_types = cols(sample = col_character(), group = col_character()))', 'groups <- groups[match(colnames(X), groups$sample), , drop = FALSE]', 'y <- factor(groups$group)', 'if (!any(is.na(X))) {', ' out_df <- data.frame(protein_ids, X, check.names = FALSE)', ' names(out_df)[1] <- id_colname', ' write_tsv(out_df, OUTPUT_TSV)', ' cat("No NAs detected. Wrote input as final output.\n")', ' quit(save = "no", status = 0)', '}', 'X_df <- as.data.frame(t(X))', 'tmp <- tempfile()', 'invisible(capture.output({', ' imp <- rfImpute(x = X_df, y = y, iter = ITER, ntree = NTREE)', '}, file = tmp))', 'X_final <- t(as.matrix(imp[, -1, drop = FALSE]))', 'out_df <- data.frame(protein_ids, X_final, check.names = FALSE)', 'names(out_df)[1] <- id_colname', 'write_tsv(out_df, OUTPUT_TSV)', ] return run_rscript(script, dataframe, tempname, errorfile)