MVP / utils_app.py
yzhouchen001's picture
cpu support only
86f6f17
from matchms.importing import load_from_mgf
import yaml
import numpy as np
from mvp.subformula_assign.utils.spectra_utils import assign_subforms
import tempfile
import json
import os
from functools import partial
from pytorch_lightning import Trainer
from massspecgym.models.base import Stage
from mvp.data.data_module import TestDataModule
from mvp.data.datasets import ContrastiveDataset
from mvp.utils.data import get_spec_featurizer, get_mol_featurizer, get_test_ms_dataset
from mvp.utils.models import get_model
import pandas as pd
# check formspec requirements
def check_formspec_requirements(spectra):
for spec in spectra:
if 'formula' not in spec.metadata or 'adduct' not in spec.metadata:
return False
return True
# preprocess spectra
def preprocess_spectra(mgf_path, model_choice, mass_diff_thresh=20, dataset_pth=None, subformula_dir=None):
if dataset_pth is None:
dataset_pth = os.path.join(tempfile.gettempdir(), f"mvp_data.tsv")
if subformula_dir is None:
subformula_dir = os.path.join(tempfile.gettempdir(), f"mvp_subformulae")
os.makedirs(subformula_dir, exist_ok=True)
# load mgf file
spectra = list(load_from_mgf(mgf_path))
columns = ['identifier', 'formula', 'adduct', 'precursor_mz', 'precursor_formula', 'mzs', 'intensities', 'fold']
data = []
try:
for spec in spectra:
identifier = spec.metadata['title']
formula = spec.metadata.get('formula', None)
adduct = spec.metadata.get('adduct', None)
precursor_mz = spec.metadata['precursor_mz']
precursor_formula = spec.metadata['formula'] # technically incorrect, but we don't use it
mzs = spec.peaks.mz
intensities = spec.peaks.intensities
if model_choice == "formSpec":
if formula is None or adduct is None:
return None, None
ms = [(m, i) for m, i in zip(mzs, intensities)]
# annotate peaks
x = assign_subforms(formula, np.array(ms), adduct, mass_diff_thresh=mass_diff_thresh)
if x['output_tbl'] is None:
continue
# save json file
json_file = os.path.join(subformula_dir, f"{identifier}.json")
with open(json_file, 'w') as f:
json.dump(x['output_tbl'], f)
mzs = ','.join([str(m) for m in mzs])
intensities = ','.join([str(i) for i in intensities])
data.append([identifier, formula, adduct, precursor_mz, precursor_formula, mzs, intensities, 'test'])
df = pd.DataFrame(data, columns=columns)
df.to_csv(dataset_pth, sep='\t', index=False)
return dataset_pth, subformula_dir
except Exception as e:
return None, None
def setup_config(model_choice, dataset_pth, candidates_pth, subformula_dir):
if model_choice == "binnedSpec":
param_file = f"mvp/params_binnedSpec.yaml"
checkpoint_path = f"pretrained_models/msgym_binnedSpec.ckpt"
elif model_choice == "formSpec":
param_file = f"mvp/params_formSpec.yaml"
checkpoint_path = f"pretrained_models/msgym_formSpec.ckpt"
# load yaml
with open(param_file, 'r') as f:
params = yaml.safe_load(f)
params['dataset_pth'] = dataset_pth
params['candidates_pth'] = candidates_pth
params['subformula_dir_pth'] = subformula_dir
params['experiment_dir'] = tempfile.mkdtemp()
params['checkpoint_pth'] = checkpoint_path
params['df_test_path'] = os.path.join(params['experiment_dir'], f"results_{model_choice}.pkl")
return params
def run_inference(params):
# Load dataset
spec_featurizer = get_spec_featurizer(params['spectra_view'], params)
mol_featurizer = get_mol_featurizer(params['molecule_view'], params)
dataset = get_test_ms_dataset(params['spectra_view'], params['molecule_view'], spec_featurizer, mol_featurizer, params, external_test=True)
# Init data module
collate_fn = partial(ContrastiveDataset.collate_fn, spec_enc=params['spec_enc'], spectra_view=params['spectra_view'], stage=Stage.TEST)
data_module = TestDataModule(
dataset=dataset,
collate_fn=collate_fn,
split_pth=params['split_pth'],
batch_size=params['batch_size'],
num_workers=params['num_workers']
)
model = get_model(params['model'], params)
# print(model.hparams)
model.df_test_path = params['df_test_path']
model.external_test = True
model.hparams['use_fp'] = False
model.hparams["contr_views"] = [['spec_enc', 'mol_enc']]
model.hparams['use_cons_spec'] = False
# Init trainer
trainer = Trainer(
accelerator='cpu',
devices=1,
default_root_dir=params['experiment_dir'],
)
# Prepare data module to test
data_module.prepare_data()
data_module.setup(stage="test")
# Test
trainer.test(model, datamodule=data_module)
if __name__ == "__main__":
# test run
mgf_path = "data/app/data.mgf"
model_choice = "formSpec"
candidates_pth = "data/app/identifier_to_candidates.json"
mass_diff_thresh = 20
dataset_pth, subformula_dir = preprocess_spectra(mgf_path, model_choice, mass_diff_thresh=mass_diff_thresh)
params = setup_config(model_choice, dataset_pth, candidates_pth, subformula_dir)
run_inference(params)