"""Perform a bidirectional blastp using DIAMOND on an input and a template (annotated genomes)."""
__author__ = "Carolin Brune"
################################################################################
# requirements
################################################################################
import pandas as pd
pd.options.mode.chained_assignment = None # editing on copy and saving is elsewhere
import logging
import subprocess
import time
import os.path
from Bio import SeqIO
from pathlib import Path
from typing import Literal
################################################################################
# setup logging
################################################################################
# general logging
genlogger = logging.getLogger(__name__)
# internal logger with logging file
logger = logging.getLogger(__name__ + "-intern")
logger.setLevel(logging.DEBUG)
logger.propagate = False
################################################################################
# functions
################################################################################
[docs]
def create_diamond_db(dir: str, name: str, path: str, threads: int):
"""Create a DIAMOND database for a given protein FASTA file.
Args:
- dir (str):
Path to the data directory.
- name (str):
Name of the genome/database.
- path (str):
Path to the FASTA-file.
- threads (int):
Number of threads to use.
"""
# get logger
logger = logging.getLogger(__name__ + "-intern")
# check if database already exists
if os.path.isfile(Path(dir, "db", name + ".dmnd")):
logger.info(f"database for {name} already exists")
else:
# generate new database using diamond makedb
logger.info(f"create DIAMOND database for {name} using:")
logger.info(
f'diamond makedb --in {path} --db {str(Path(dir,"db",name+".dmnd"))} --threads {int(threads)}'
)
start = time.time()
subprocess.run(
[
"diamond",
"makedb",
"--in",
path,
"--db",
str(Path(dir, "db", name + ".dmnd")),
"--threads",
str(threads),
]
)
end = time.time()
logger.info(f"\t time: {end - start}s")
[docs]
def run_diamond_blastp(
dir: str, db: str, query: str, fasta_path: str, sensitivity: str, threads: int
):
"""Run DIAMOND blastp for a given database name and FASTA - relies on the structure
created by :py:mod:`~specimen.hqtb.core.bidirectional_blast`.
Args:
- dir (str):
Parent directory of the place to save the files to.
- db (str):
Name of the genome/database used as the database.
- query (str):
Name of the genome used as the query.
- fasta_path (str):
Path to the FASTA-file containing the CDS.
- sensitivity (str):
Sensitivity mode to use for DIAMOND blastp.
- threads (int):
Number of threads that will be used for running DIAMOND
"""
# get logger
logger = logging.getLogger(__name__ + "-intern")
outname = Path(dir, "DIAMONDblastp", f"{query}_vs_{db}.tsv")
# check if file already exists
if os.path.isfile(outname):
logger.info(f"file or filename for {query} vs. {db} blastp already exists")
else:
# blast file
logger.info(f"blast {query} against {db} using:")
logger.info(
f'diamond blastp -d {str(Path(dir,"db/",db+".dmnd"))} -q {F"{fasta_path}"} --{sensitivity} -p {int(threads)} -o {outname} --outfmt 6 qseqid sseqid pident length mismatch gapopen qstart qend sstart send evalue bitscore qlen '
)
start = time.time()
subprocess.run(
[
"diamond",
"blastp",
"-d",
str(Path(dir, "db", db + ".dmnd")),
"-q",
fasta_path,
"--" + sensitivity,
"-p",
str(threads),
"-o",
outname,
"--outfmt",
"6",
"qseqid",
"sseqid",
"pident",
"length",
"mismatch",
"gapopen",
"qstart",
"qend",
"sstart",
"send",
"evalue",
"bitscore",
"qlen",
]
)
end = time.time()
logger.info(f"\ttime: {end - start}s")
[docs]
def bdbp_diamond(
dir: str,
template_name: str,
input_name: str,
template_path: str,
input_path: str,
sensitivity="sensitive",
threads=2,
):
"""Perform bidirectional blastp using DIAMOND.
Args:
- dir (str):
Path to the directory parent to in/out.
- template_name (str):
Name of the template genome.
- input_name (str):
Name of the input genome.
- template_path (str):
Path to the CDS FASTA-file of the template.
- input_path (str):
Path to the CDS FASTA-file of the input.
- sensitivity (str, optional):
Sensitivity mode for DIAMOND.
Defaults to 'sensitive'.
- threads (int, optional):
Number of threads to use when running DIAMOND.
Defaults to 2.
"""
# get logger
logger = logging.getLogger(__name__ + "-intern")
# -----------------------
# create output directory
# -----------------------
try:
Path(dir, "db").mkdir(parents=True, exist_ok=False)
logger.info(f'Creating new directory {str(Path(dir,"db"))}')
except FileExistsError:
logger.info("Given directory already has required structure.")
try:
Path(dir, "DIAMONDblastp").mkdir(parents=True, exist_ok=False)
logger.info(f'Creating new directory {str(Path(dir,"DIAMONDblastp"))}')
except FileExistsError:
logger.info("DIAMONDblastp: Given directory already has required structure.")
# ---------------------
# make DIAMOND database
# ---------------------
create_diamond_db(dir, template_name, template_path, threads)
create_diamond_db(dir, input_name, input_path, threads)
# ----------------------
# perform diamond blastp
# ----------------------
run_diamond_blastp(dir, template_name, input_name, input_path, sensitivity, threads)
run_diamond_blastp(
dir, input_name, template_name, template_path, sensitivity, threads
)
[docs]
def run(
template: str,
input: str,
dir: str,
template_name: str = None,
input_name: str = None,
temp_header: str = None,
in_header: str = None,
threads: int = 2,
extra_info: list[str] = ["locus_tag", "product", "protein_id"],
sensitivity: Literal[
"sensitive", "more-sensitive", "very-sensitive", "ultra-sensitive"
] = "more-sensitive",
):
"""Run the bidirectional blast on a template and input genome (annotated).
Args:
- template (str):
Path to the annotated genome file used as a template.
- input (str):
Path to the annotated genome file used as a input.
- dir (str):
Path to the output directory.
- template_name (str, optional):
Name of the annotated genome file used as a template..
Defaults to None.
- input_name (str, optional):
Name of the annotated genome file used as input..
Defaults to None.
- temp_header (str, optional):
Feature qualifier of the gbff (NCBI) / faa (PROKKA) of the template to use as header for the FASTA files.
If None is given, sets it based on file extension (currently only implemented for gbff and faa).
Defaults to 'protein_id'.
- in_header (str, optional):
Feature qualifier of the gbff (NCBI) / faa (PROKKA) of the input to use as header for the FASTA files.
If None is given, sets it based on file extension (currently only implememted for gbff and faa).
Defaults to 'locus_tag'.
- threads (int, optional):
Number of threads to be used for DIAMOND.
Defaults to 2.
- extra_info (list[str], optional):
List of feature qualifiers to be extracted from the annotated genome files as additional information.
Defaults to ['locus_tag', 'product', 'protein_id'].
- sensitivity (Literal['sensitive', 'more-sensitive', 'very-sensitive', 'ultra-sensitive'], optional):
Sensitivity mode for DIAMOND blastp run..
Defaults to 'more-sensitive'.
Raises:
- ValueError: Unknown file extension. Please set value for temp_header manually or check file.
- ValueError: Unknown file extension. Please set value for in_header manually or check file.
- ValueError: Unknown sensitive mode
"""
# create output directory
try:
Path(dir, "FASTA").mkdir(parents=True, exist_ok=False)
genlogger.info(f'Creating new directory {str(Path(dir,"FASTA"))}')
except FileExistsError:
genlogger.info("Given directory already has required structure: FASTA")
# set path for logging file
Path(dir, "FASTA", "fasta.log").unlink(missing_ok=True)
handler = logging.handlers.RotatingFileHandler(
str(Path(dir, "FASTA", "fasta.log")),
mode="w",
# maxBytes=1000,
backupCount=10,
encoding="utf-8",
delay=0,
)
handler.setFormatter(
logging.Formatter(
"{levelname} \t {name} \t {message}",
style="{",
)
)
logger.addHandler(handler)
total_time_s = time.time()
# -----------
# check input
# -----------
if not template_name:
template_name = os.path.splitext(os.path.basename(template))[0]
if not input_name:
input_name = os.path.splitext(os.path.basename(input))[0]
if not temp_header:
if ".gbff" == os.path.splitext(template)[1]:
temp_header = "protein_id"
elif ".faa" == os.path.splitext(template)[1]:
temp_header = "locus_tag"
else:
raise ValueError(
"Unknown file extension. Please set value for temp_header manually or check file."
)
if not in_header:
if ".gbff" == os.path.splitext(input)[1]:
in_header = "protein_id"
elif ".faa" == os.path.splitext(input)[1]:
in_header = "locus_tag"
else:
raise ValueError(
"Unknown file extension. Please set value for in_header manually or check file."
)
if sensitivity not in [
"sensitive",
"more-sensitive",
"very-sensitive",
"ultra-sensitive",
]:
raise ValueError(
f"Unknown sensitive mode {sensitivity}. Please choose one of the following: sensitive, more-sensitive, very-sensitive, ultra-sensitive"
)
# -------------
# start program
# -------------
logger.info(
"\nbidirectional blast\n################################################################################\n"
)
# ---------------------
# extract CDS sequences
# ---------------------
logger.info(
"\n# ----------------------\n# extract CDS into FASTA\n# ----------------------\n"
)
logger.info("processing template genome...")
start = time.time()
temp_fasta = extract_cds(template, template_name, dir, extra_info, temp_header)
end = time.time()
logger.info(f"\ttotal time: {end - start}s")
logger.info("processing genome of interest...")
start = time.time()
inpu_fasta = extract_cds(input, input_name, dir, extra_info, in_header)
end = time.time()
logger.info(f"\ttotal time: {end - start}s")
# ----------------------------
# perform bidirectional blastp
# ----------------------------
logger.info(
"\n# ----------------------------\n# perform bidirectional blastp\n# ----------------------------\n"
)
# reciprical blast - diamond version
bdbp_diamond(
dir, template_name, input_name, temp_fasta, inpu_fasta, sensitivity, threads
)
# ----------------------------
# find best bidirectional hits
# ----------------------------
logger.info(
"\n# ----------------------------\n# find best bidirectional hits\n# ----------------------------\n"
)
start = time.time()
extract_bestbdbp_hits(
Path(dir, "DIAMONDblastp", f"{template_name}_vs_{input_name}.tsv"),
Path(dir, "DIAMONDblastp", f"{input_name}_vs_{template_name}.tsv"),
Path(dir, f"{input_name}_{template_name}_bbh"),
)
end = time.time()
logger.info(f"\ttotal time: {end - start}s")
total_time_e = time.time()
logger.info(f"total runtime: {total_time_e-total_time_s}s")