import os
import sys
from typing import Optional, Dict
import re
import unidecode
from . import parse_thermo
from . import parse_timstof
import traceback
import pandas as pd
from plotly import io as pio
from plotly import graph_objects as go
import json
from scipy.ndimage import gaussian_filter1d
import tomlkit
[docs]
def remove_accent_characters(text: str) -> str:
"""Replace accented characters with their unaccented equivalents.
:param text: Input string containing accented characters.
:returns: String with accented characters replaced by unaccented equivalents.
"""
return unidecode.unidecode(text)
[docs]
def replace_special_characters(
text: str,
replacewith: str = '.',
dict_and_re: bool = False,
replacement_dict: Optional[Dict[str, str]] = None,
stripresult: bool = True,
remove_duplicates: bool = False,
make_lowercase: bool = True,
allow_numbers: bool = True,
allow_space: bool = False,
mask_first_digit: str|None = None
) -> str:
"""Replace special characters in a string with specified replacements.
:param text: Input string containing special characters.
:param replacewith: Character to use for replacement.
:param dict_and_re: Whether to apply both dictionary replacements and regex.
:param replacement_dict: Mapping of specific substrings to replacements.
:param stripresult: Strip whitespace and replacement characters from result.
:param remove_duplicates: Collapse consecutive replacement characters.
:param make_lowercase: Convert result to lowercase.
:param allow_numbers: Allow numbers in the result.
:param allow_space: Allow spaces in the result.
:param mask_first_digit: Character to prefix when first char is a digit.
:returns: String with special characters replaced.
"""
ret: str
regex_pat = r'[^a-zA-Z0-9]'
if allow_space:
regex_pat = r'[^a-zA-Z0-9 ]'
if not allow_numbers:
regex_pat = regex_pat.replace('0-9', '')
if not replacement_dict:
ret = re.sub(regex_pat, replacewith, text)
else:
# Sort replacement keys by length (longest first) to handle overlapping patterns
for key in sorted(list(replacement_dict.keys()), key=lambda x: len(x), reverse=True):
if key in text:
text = text.replace(key, replacement_dict[key])
if dict_and_re:
ret = re.sub(regex_pat, replacewith, text)
else:
new_text: list[str] = []
for character in text:
if not character.isalnum():
new_text.append(replacewith)
else:
new_text.append(character)
ret = ''.join(new_text)
if stripresult:
curlen: int = -1
while len(ret) != curlen:
curlen = len(ret)
ret = ret.strip()
ret = ret.strip(replacewith)
if remove_duplicates:
curlen: int = -1
while len(ret) != curlen:
curlen = len(ret)
ret = ret.replace(f'{replacewith}{replacewith}', replacewith)
if make_lowercase:
ret = ret.lower()
if mask_first_digit:
if ret[0].isdigit():
ret = mask_first_digit + ret
return ret
[docs]
def remove_rawfile_ending(column_name: str) -> str:
"""Removes the raw file ending from a column name. For example, if the column name is 'run1.raw', it will be changed to 'run1'."""
raw_file_endings: list[str] = ['.raw', '.d','.wiff','.scan','.mzml','.dia']
for re in raw_file_endings:
if column_name[-len(re):].lower() == re:
return column_name[:-len(re)]
return column_name
[docs]
def read_toml(toml_file):
with open(toml_file, 'r') as tf:
data = tomlkit.load(tf)
return data
[docs]
def count_intercepts(xydata):
mean = xydata.mean()
intercepts = 0
intercept_times = []
prev = -1
for time, intensity in xydata.items():
low,high=sorted([prev,intensity])
if (low<mean) & (high>mean):
intercepts += 1
intercept_times.append(time)
prev = intensity
return (intercepts, intercept_times)
[docs]
def calculate_auc(ser):
auc = 0.0
prev = 0
for time, intval in ser.items():
auc += time*intval + ((time-prev)*intval/2)
prev = time
return auc
[docs]
def parse_raw(root, filename):
data_dict = parse_thermo.parse_file(root, filename)
return data_dict
[docs]
def parse_d(root, filename, run_id_regex):
data_dict = parse_timstof.parse_file(root, filename, run_id_regex)
return data_dict
[docs]
def handle_data(data_dict, oname):
new_traces = {}
sample_id_number = data_dict['sample']['file_name'].rsplit('/',maxsplit=1)[-1].rsplit('\\',maxsplit=1)[-1]
data_dict['file_name_clean'] = replace_special_characters(remove_accent_characters(remove_rawfile_ending(data_dict['sample']['file_name'])), replacewith='_', allow_numbers=True)
for tracetype, tracedict in data_dict['traces'].items():
ser = pd.Series(tracedict)
intercepts, intercept_times = count_intercepts(ser)
new_traces[f'{tracetype}_raw'] = ser.to_dict()
new_traces[f'{tracetype}_auc'] = calculate_auc(ser)
new_traces[f'{tracetype}_intercepts'] = intercepts
new_traces[f'{tracetype}_intercept_times'] = intercept_times
new_traces[f'{tracetype}_maxtime'] = int(ser.index.max())
new_traces[f'{tracetype}_mean_intensity'] = float(ser.mean())
new_traces[f'{tracetype}_max_intensity'] = int(ser.max())
sigma = data_dict['smooth_sigma'][tracetype] # This depends on the data, and should be manually figured out
if sigma > 0:
ser = pd.Series(gaussian_filter1d(ser, sigma))
new_traces[tracetype] = ser.to_dict()
new_traces[f'{tracetype}_trace'] = pio.to_json(go.Scatter(x=ser.index, y=ser.values,name=sample_id_number))
for k,v in new_traces.items():
data_dict['traces'][k] = v
with open(oname, 'w', encoding='utf-8') as fil:
json.dump(data_dict, fil, indent=2)
[docs]
def analyze(filename, outdir, errorfile):
filename = filename.rstrip(os.sep)
if os.sep in filename:
root_dir, filename = filename.rsplit(os.sep,maxsplit=1)
else:
root_dir = '.'
tims_run_id_regex = '^(\\d+)(?:(_Tomppa))?'
os.makedirs(outdir, exist_ok=True)
try:
oname = os.path.join(outdir, f'{filename.lower()}.json')
data = None
if filename.lower().endswith('raw'):
data = parse_raw(root_dir, filename)
elif filename.lower().endswith('.d'):
data = parse_d(root_dir, filename, tims_run_id_regex)
elif filename.lower().endswith('.mzml'):
data = parse_mzml(root_dir, filename)
if data is not None:
handle_data(data, oname)
print(filename, 'done')
else:
print(filename, 'data none')
return 0
except Exception as e:
name = type(e).__name__
details = ''.join(traceback.format_exception(type(e), e, e.__traceback__))
with open(errorfile, 'a') as fil:
from datetime import datetime
fil.write(f'{datetime.now()}:: {name}:\nFilename:{filename}\nDetails:{details}\n-=-=-=-=-=-=-=-=-=-=-=-=-=-\n')
print(f'{filename} ERRORED')
return 1
[docs]
def main():
filename, outdir, errorfile = sys.argv[1:]
analyze(filename, outdir, errorfile)
if __name__ == '__main__':
main()