Source code for specimen.util.util

#!/usr/bin/env python
"""Utility functions."""

__author__ = "Carolin Brune"

################################################################################
# requirements
################################################################################

import os
import os.path
import subprocess
import sys

from Bio import SeqIO
from pathlib import Path
from typing import Literal

from refinegems.utility.io import parse_gbff_for_cds


################################################################################
# variables
################################################################################

################################################################################
# functions
################################################################################

# create a DIAMOND reference database
# -----------------------------------


[docs] def create_DIAMOND_db_from_folder( dir: str, out: str, name: str = "database", extension: str = "faa", threads: int = 2 ): """Build a DIAMOND database from a folder containing FASTA files. Args: - dir (str): Path to the directory to search for FASTA files for the database (recursive file search). - out (str): Path of the directory of the output. - name (str, optional): Name of the created database. Defaults to 'database'. - extension (str, optional): File extension of the FASTA files (to determine which files to search for). Defaults to 'faa'. - threads (int, optional): Number of threads to use for DIAMOND. Defaults to 2. """ # get fasta file names fasta_files = Path(dir).rglob(f"*.{extension}") # ----------------------- # combine to 1 FASTA file # ----------------------- # check if folder already has a combined FASTA outname_fasta = Path(out, "combinded.faa") save = True if os.path.isfile(outname_fasta): print("A combined.faa files already exists in the given folder.") answer = None while answer not in ("overwrite", "use", "end"): answer = input( "Do you want to [overwrite] or [use] the file or [end] the programm?: " ) if answer == "use": save = False elif answer == "end": sys.exit() elif answer == "overwrite": save = True else: print("Enter [overwrite] or [use] or [end] (without brackets)") # save all in new (combined) FASTA if save: with open(outname_fasta, "w") as outfile: for f in fasta_files: SeqIO.write(SeqIO.parse(f, "fasta"), outfile, "fasta") # ------------------------- # generate DIAMOND database # ------------------------- outname_dnmd = Path(out, name) bl = "\\ " print( f'running the following command:\ndiamond makedb --in {str(outname_fasta).replace(" ",bl)} -d {str(outname_dnmd).replace(" ",bl)} -p {threads}' ) subprocess.run( [ f'diamond makedb --in {str(outname_fasta).replace(" ",bl)} -d {str(outname_dnmd).replace(" ",bl)} -p {threads}' ], shell=True, ) print("finished")
# create a NCBI mapping file for the database # -------------------------------------------
[docs] def create_NCBIinfo_mapping(dir: str, out: str, extension: Literal["gbff"] = "gbff"): """Create a NCBI information mapping file from a folder containing e.g. gbff files. Args: - dir (str): Path to the directory for the recursive file search for the mapping. - out (str): Path of the directory for the output. - extension (Literal['gbff'], optional): Name of the file extension to be searched. Default is gbff, and currently it is advised to leave it at that. Defaults to 'gbff'. """ # get gbff file names list_files = Path(dir).rglob(f"*.{extension}") file_no = len(list(Path(dir).rglob(f"*.{extension}"))) # ----------------------------------------------- # extract information and create the mapping file # ----------------------------------------------- with open(Path(out, "NCBI_mapping.csv"), "w") as out_file: file_counter = 1 for file_name in list_files: print(f"Parsing {file_counter}/{file_no}: {file_name}") # retrieve information match extension: case "gbff": info = parse_gbff_for_cds(file_name) case "gff": raise NotImplementedError( "This option is under discussion and has yet to be implemented." ) case _: raise ValueError(f"Unknown file extension {extension}") # save info.to_csv(out_file, header=True, index=False) # go to next file_counter += 1