"""This module reads nexus tree files that are created by BEAST2.The function read_nexus_trees() has options to make it compatible with treesthat were generated by the BREATH package and read transmission trees."""importrefrom.treeimportTreefrom.label_transmission_historyimportlabel_transmission_tree
[docs]defread_nexus_trees(file:str,breath_trees:bool=True,label_transm_history:bool=True)->list:""" Function to read a nexus file that contains transmission trees. This assumes that trees are generated by BREATH BEAST2 package. The necessary information is the blockcount for every node/edge. This function will fully label the transmission history onto the tree. By setting breath_trees to false the transmission history labeling is also disabled. :param file: Input file :param breath_trees: If true, will assume that the trees have the blockcount annotated (default) :param label_transm_history: If true, will label transmission ancestry (default) :returns: list of transmission trees """# re_tree returns nwk string without the root height and no ; in the endre_tree=re.compile("\t?tree .*=? (.*$)",flags=re.I|re.MULTILINE)# name_dict = get_mapping_dict(file) # Save tree label names in dicttrees=[]withopen(file,'r',encoding="utf-8")asf:forlineinf:ifre_tree.match(line):# tree_string = f'{re.split(re_tree, line)[1][:re.split(re_tree, line)[1]# .rfind(")") + 1]};'tree_string=re.split(re_tree,line)[1]pattern=r"\d*\[[^\]]*\]"# matches meta data annotationscounter=0# Initialize a counterdefreplace_match(match):nonlocalcountersplit_str=match.group(0).split("[&")iflen(split_str)==2:meta_list=split_str[1][:-1].split(",")# deleting the ] from string and splitting all the meta datamatching_element=next((sforsinmeta_listif"blockcount"ins),None)# variable hard code with string in line aboveblock_count=int(float(matching_element.split("=")[-1]))ifnotsplit_str[0]:counter+=1# making new string be pair taxa;block_countnode_name=split_str[0]ifsplit_str[0]elsef'internal{counter}'returnf"%{node_name}/{block_count}%"raiseNotImplementedError("Needs to be added or debugged.")# Replace all matchesnew_tree_string=re.sub(pattern,replace_match,tree_string)tree=Tree(new_tree_string,format=1)ifbreath_trees:# adjusting the tree to contain the blockcount label and correct node names_breath_label_nodes(tree)trees.append(tree)# if only label_transm_history is set to true this won't make sense anywayifbreath_treesandlabel_transm_history:fortreeintrees:label_transmission_tree(tree)returntrees
[docs]def_breath_label_nodes(tree):""" Annotating the node names and blockcount values to an ete3.Tree. This only works if the nodes names follow the name convention from above. That is %Node-label/blockcount% which is extracted in replace_match() above. :param tree: Tree that will get blockcount annotations from node names """fornodeintree.traverse("levelorder"):# this should technically never be the case...ifnothasattr(node,"blockcount"):# Assert node.name formatassertnode.nameand"/"innode.name,(f"Invalid node name format: "f"'{node.name}' "f"(expected 'name/blockcount')")# Extract node name and blockcount safely# Ensure only one splitcur_name,cur_blockcount=node.name.replace("%","").split("/",1)# Strip spaces & convert safelynode.add_feature("blockcount",int(cur_blockcount.strip()))# Remove unnecessary spacesnode.name=cur_name.strip()