Source code for brokilon.majority_rule_consensus.majority_consensus

from collections import defaultdict

from brokilon.ccd.domain.phylogeography import DemeClade
from brokilon.core import Tree


[docs] def regular_mrc(trees): print(len(trees)) raise NotImplementedError("WIP")
# def majority_consensus_annoated(trees): # # m1, m2, blockcount_map, branch_lengths_map = get_transmission_maps(trees, # type_str="Ancestry") # major_thr = 0.5 # clades_majority = [k for k, v in m1.items() if v / len(trees) > major_thr] # if len(clades_majority) < len(trees[0]) / 4: # print(f"No clades found with threshold: {major_thr}, will divide by 2 and try again") # major_thr = major_thr / 2 # clades_majority = [k for k, v in m1.items() if v / len(trees) > major_thr] # # # todo m1 does not contain leaf clades, but they are now important so we need to keep that in # # mind # # print(len(clades_majority)) # # print(clades_majority) # for c in clades_majority: # print(c) # return consensus
[docs] def phygeo_mrc(trees, geo_ann_str, major_thr=0.5): """ Computes MRC tree for annotated trees (type is input value). Assumes leaves are fixed states. :param trees: list of trees :param geo_ann_str: annotation that should be used for clades :return: MRC tree with annotations """ if not 0.0 < major_thr < 1.0: raise ValueError(f"Major threshold must be between 0 and 1, but got {major_thr}") # todo with very low values this might not be able to construct at tree... catch that. clade_count_map = defaultdict(int) for node in (node for t in trees for node in t.traverse("levelorder")): # if geo_ann_str not in node.features: if len(node.children) == 1: # the structured approaches have internal nodes, skip them for now, might be # interesting to think about them later... continue if not hasattr(node, geo_ann_str): raise ValueError(f"Node does not have a geographic annotation of name '{geo_ann_str}'." f"Available are {node.features}, maybe change the input value...") if len(node) > 1: parent_leaves = frozenset(int(leaf.name) for leaf in node) parent_clade = DemeClade(parent_leaves, deme=getattr(node, geo_ann_str)) clade_count_map[parent_clade] += 1 # clades_majority = [k for k, v in clade_count_map.items() if v / len(trees) > major_thr] clades_majority = [] clade_support = {} for k, v in clade_count_map.items(): if v / len(trees) > major_thr: clades_majority.append(k) clade_support[k] = v / len(trees) working_subtrees = {} for l in trees[0]: t = Tree(support=1.0, dist=1.0, name=l.name) t.add_feature(geo_ann_str, getattr(l, geo_ann_str)) working_subtrees[l.name] = t for i, c in enumerate(sorted(clades_majority, key=len)): new_node = Tree( name=f"clade_{i}", support=clade_support[c], dist=1.0, ) new_node.add_feature(geo_ann_str, c.deme) for taxon in c.clade: skip = False node = working_subtrees.pop(str(taxon)) if isinstance(node, str) and node.startswith("clade_"): if node in working_subtrees: if ( isinstance(working_subtrees[node], str) and working_subtrees[node].startswith("clade_") ): working_subtrees.pop(node) skip = True else: node = working_subtrees.pop(node) else: skip = True if not skip: new_node.add_child(node) working_subtrees[node.name] = f"clade_{i}" working_subtrees[str(taxon)] = f"clade_{i}" working_subtrees[f"clade_{i}"] = new_node try: output = working_subtrees[f"clade_{len(clades_majority) - 1}"] except KeyError: raise ValueError("Something went wrong when constructing the MRC tree.") return output
[docs] def transmission_mrc(trees): print(len(trees)) raise NotImplementedError("WIP")
if __name__ == '__main__': from pathlib import Path from brokilon.core.read_nexus import read_nexus_trees # trees_file = (f"{Path(__file__).parent.absolute().parent.parent}" # f"/examples/data/breath32sim.trees") tree_file = (f"{Path(__file__).parent.absolute().parent.parent}" f"/examples/data/" # f"roetzer40.trees" f"h3n2-bdmm.h3n2_2deme.trees") # todo burnin missing... trees, taxon_map = read_nexus_trees( tree_file, breath_trees=False, label_transm_history=False, parse_taxon_map=True ) phygeo_mrc(trees, geo_ann_str="type") # majority_consensus_annoated(trees)