Source code for pyccd.read_nexus

"""
This module reads nexus tree files that are created by BEAST2.
The function read_nexus_trees() has options to make it compatible with trees
that were generated by the BREATH package and read transmission trees.
"""
import re
from .tree import Tree

from .label_transmission_history import label_transmission_tree


[docs] def read_nexus_trees(file: str, breath_trees: bool = True, label_transm_history: bool = True) -> list: """ Function to read a nexus file that contains transmission trees. This assumes that trees are generated by BREATH BEAST2 package. The necessary information is the blockcount for every node/edge. This function will fully label the transmission history onto the tree. By setting breath_trees to false the transmission history labeling is also disabled. :param file: Input file :param breath_trees: If true, will assume that the trees have the blockcount annotated (default) :param label_transm_history: If true, will label transmission ancestry (default) :returns: list of transmission trees """ # re_tree returns nwk string without the root height and no ; in the end re_tree = re.compile("\t?tree .*=? (.*$)", flags=re.I | re.MULTILINE) # name_dict = get_mapping_dict(file) # Save tree label names in dict trees = [] with open(file, 'r', encoding="utf-8") as f: for line in f: if re_tree.match(line): # tree_string = f'{re.split(re_tree, line)[1][:re.split(re_tree, line)[1] # .rfind(")") + 1]};' tree_string = re.split(re_tree, line)[1] pattern = r"\d*\[[^\]]*\]" # matches meta data annotations counter = 0 # Initialize a counter def replace_match(match): nonlocal counter split_str = match.group(0).split("[&") if len(split_str) == 2: meta_list = split_str[1][:-1].split( ",") # deleting the ] from string and splitting all the meta data matching_element = next((s for s in meta_list if "blockcount" in s), None) # variable hard code with string in line above block_count = int( float(matching_element.split("=")[-1])) if not split_str[0]: counter += 1 # making new string be pair taxa;block_count node_name = split_str[0] if split_str[0] else f'internal{counter}' return f"%{node_name}/{block_count}%" raise NotImplementedError("Needs to be added or debugged.") # Replace all matches new_tree_string = re.sub(pattern, replace_match, tree_string) tree = Tree(new_tree_string, format=1) if breath_trees: # adjusting the tree to contain the blockcount label and correct node names _breath_label_nodes(tree) trees.append(tree) # if only label_transm_history is set to true this won't make sense anyway if breath_trees and label_transm_history: for tree in trees: label_transmission_tree(tree) return trees
[docs] def _breath_label_nodes(tree): """ Annotating the node names and blockcount values to an ete3.Tree. This only works if the nodes names follow the name convention from above. That is %Node-label/blockcount% which is extracted in replace_match() above. :param tree: Tree that will get blockcount annotations from node names """ for node in tree.traverse("levelorder"): # this should technically never be the case... if not hasattr(node, "blockcount"): # Assert node.name format assert node.name and "/" in node.name, (f"Invalid node name format: " f"'{node.name}' " f"(expected 'name/blockcount')") # Extract node name and blockcount safely # Ensure only one split cur_name, cur_blockcount = node.name.replace("%", "").split("/", 1) # Strip spaces & convert safely node.add_feature("blockcount", int(cur_blockcount.strip())) # Remove unnecessary spaces node.name = cur_name.strip()