Source code for utils.MSParser.parse_timstof

import xmltodict
import os
import pandas as pd
from datetime import datetime
from plotly import io as pio
import sqlite3
from plotly import graph_objects as go
import warnings
from . import ms_name_identifier

[docs] def get_directory_size(directory_path): """Calculate the total size of all files in a directory recursively.""" total_size = 0 try: for dirpath, dirnames, filenames in os.walk(directory_path): for filename in filenames: filepath = os.path.join(dirpath, filename) if os.path.exists(filepath): total_size += os.path.getsize(filepath) except (OSError, PermissionError): # Handle cases where we can't access some files pass return total_size
[docs] def get_traces(ms1_df, ms2_df): """Get traces from a Timstof file. :param ms1_df: DataFrame containing the MS1 data. :param ms2_df: DataFrame containing the MS2 data. :returns: Dictionary containing the traces. """ bpc_df = ms1_df[['Time','MaxIntensity','SummedIntensities']] msn_df = ms2_df[['Time','MaxIntensity','SummedIntensities']] tic_data = [] bpc_data = [] msn_data = [] for i in range(1, int(bpc_df['Time'].max()+2)): i_df = bpc_df[(bpc_df['Time']<i) & ((i-1)<bpc_df['Time'])] i_df_ms2 = msn_df[(msn_df['Time']<i) & ((i-1)<msn_df['Time'])] try: bpc_data.append(int(i_df['MaxIntensity'].max())) except ValueError: bpc_data.append(0) try: tic_data.append(int(i_df['SummedIntensities'].sum())) except ValueError: tic_data.append(0) try: msn_data.append(int(i_df_ms2['SummedIntensities'].sum())) except ValueError: msn_data.append(0) bpc = pd.Series(bpc_data,name='BPC').to_dict() tic = pd.Series(tic_data,name='TIC').to_dict() MSn = pd.Series(msn_data,name='MSn').to_dict() return {'TIC': tic, 'BPC': bpc, 'MS2': MSn}
[docs] def parse_file(root, run_name, run_id_regex): """Parse a Timstof file. :param root: Root directory of the Timstof file. :param run_name: Name of the run. :param run_id_regex: Regular expression to match the run ID. :returns: Dictionary containing the parsed data. """ warnings.simplefilter(action="ignore", category=FutureWarning) timsfile = os.path.join(root, run_name) with sqlite3.connect(os.path.join(timsfile, 'analysis.tdf')) as conn: df_ms = pd.read_sql_query('SELECT * FROM Frames', conn) df_MS1 = df_ms[df_ms['MsMsType']==0] df_MS2 = df_ms[df_ms['MsMsType'] == 8] if df_MS2.shape[0] > 0: datatype = 'ddaPASEF' else: datatype = 'diaPASEF' df_MS2 = df_ms[df_ms['MsMsType'] == 9] metadata = {} for _, row in pd.read_sql_query('SELECT * FROM GlobalMetadata', conn).iterrows(): metadata[row['Key']] = row['Value'] if os.path.isdir(timsfile): if 'SampleInfo.xml' in os.listdir(timsfile): try: with open(os.path.join(timsfile,'SampleInfo.xml'),encoding='utf-16') as fil: sampleinfo = fil.read() sampleinfo = xmltodict.parse(sampleinfo) except: sampleinfo = {'SampleTable': {}, 'Sample': {}, 'No_sampletable': 'no sampletable'} fdic = {'file_name': run_name, 'file_path': timsfile} fdic['file_size'] = get_directory_size(timsfile) fdic['files'] = {} for root, dirs, files in os.walk(fdic['file_path']): for f in files: file_stats = os.stat(os.path.join(root,f)) fp = root.replace(fdic['file_path'],'') fdic['files'][fp] = { 'size': file_stats.st_size, 'atime': file_stats.st_atime, 'mtime': file_stats.st_mtime, 'ctime': file_stats.st_ctime, } try: fdic['files'][fp]['birthtime'] = file_stats.st_birthtime except AttributeError: fdic['files'][fp]['birthtime'] = '' fdic['instrument'] = { 'inst_name': metadata['InstrumentName'], 'inst_model': f'{metadata["InstrumentName"]}_{metadata["InstrumentFamily"]}_{metadata["InstrumentRevision"]}', 'inst_serial_no': metadata['InstrumentSerialNumber'], 'software_version': metadata['AcquisitionSoftwareVersion'], 'firmware_version': metadata['AcquisitionFirmwareVersion'], 'extras': '', 'name': ms_name_identifier.identify(fdic['file_path']) } if 'DigitizerType' in metadata: fdic['instrument']['digitizer'] = metadata['DigitizerType'] fdic['run'] = { 'first_scan_number': int(df_ms['Id'].min()), 'last_scan_number': int(df_ms['Id'].max()), 'start_time': float(df_ms['Time'].min())/60, 'end_time': float(df_ms['Time'].max())/60, 'run_date': datetime.fromisoformat(metadata['AcquisitionDateTime']).strftime("%Y-%m-%d %H:%M:%S"), 'number_of_scans': int(df_ms['Id'].max()), 'method_name': metadata['MethodName'], } infodict = sampleinfo['SampleTable']['Sample'] if '@MS_Method' in infodict: fdic['run']['ms_method'] = infodict['@MS_Method'].rsplit('\\')[-1] fdic['run']['full_ms_method'] = infodict['@MS_Method'] if '@Method' in infodict: fdic['run']['full_method'] = infodict['@Method'] if '@SampleID' in infodict: fdic['sample'] = { 'sample_id': infodict['@SampleID'], 'sample_name': infodict['@SampleID'], } fdic['sample_id'] = fdic['sample']['sample_id'] else: fdic['sample'] = {} if '@DataPath' in infodict: fdic['sample']['file_name'] = infodict['@DataPath'].rsplit('\\',maxsplit=1)[-1] if '@Position' in infodict: fdic['sample']['vial'] = infodict['@Position'] if '@Volume' in infodict: fdic['sample']['injection_volume'] = infodict['@Volume'] if '@Line' in infodict: fdic['sample']['row_number'] = infodict['@Line'] if '@Dilution' in infodict: fdic['sample']['dilution_factor'] = infodict['@Dilution'] if '@DataPath' in infodict: fdic['sample']['original_file_path'] = infodict['@DataPath'] fdic['metadata'] = metadata fdic['data_type'] = datatype fdic['parsed_date'] = datetime.now().strftime('%Y.%m.%d') fdic['traces'] = get_traces(df_MS1, df_MS2) fdic['smooth_sigma'] = { 'TIC': 2, 'BPC': 2, 'MSn': 2 } return fdic