"""
Module contains functions regarding the computation and plotting of WIW networks for transmission
trees and transmission CCDs.
"""
[docs]
def find_infector(node, indirect: bool = False):
"""
Takes a node in a transmission annotated phylogenetic tree and returns its infector.
:param indirect: Infer indirect transmissions, i.e. find last known infector
:param node: Node of which to find the infector.
:return: Infector of node
"""
if node.name != node.transm_ancest:
if not (indirect and node.transm_ancest.startswith("Unknown")):
return node.transm_ancest
parent = node.up
while (parent.transm_ancest == node.name or
(indirect and parent.transm_ancest.startswith("Unknown"))):
if parent.is_root():
break
parent = parent.up
if indirect and (not parent.is_root()) and parent.transm_ancest.startswith("Unknown"):
raise ValueError("This should not happen?")
return parent.transm_ancest
[docs]
def find_infector_with_data(node, root_node, indirect: bool = False):
"""
Finds the infector with extra data: distance to root of start and end of infection
:param node: The node to find the infector of, in this case assumes to be a leaf
:param root_node: The root node object of a tree (tree.get_tree_root())
:param indirect: Currently not supported?
:return:
"""
if node.name != node.transm_ancest:
if not (indirect and node.transm_ancest.startswith("Unknown")):
# regular infection from one taxon to the next or Unknown/Block
# node_root_dist = node.get_distance(root_node)
parent_root_dist = node.up.get_distance(root_node)
# Using block end due to blocks, if no block this is the same as start
event_length_from_parent = node.dist * node.blockend
infection_start_dist_root = parent_root_dist + event_length_from_parent
if node.blockcount > 0:
block_name = f"block_{node.name}"
data = [
block_name,
node.name,
infection_start_dist_root,
None,
]
else:
data = [
node.transm_ancest,
node.name,
infection_start_dist_root,
None
]
unknown_out_node = None
if node.transm_ancest.startswith("Unknown"):
unknown_out_node = node
return node.transm_ancest, [data], unknown_out_node
parent = node.up
while (parent.transm_ancest == node.name or
(indirect and parent.transm_ancest.startswith("Unknown"))):
if parent.is_root():
break
parent = parent.up
if indirect and (not parent.is_root()) and parent.transm_ancest.startswith("Unknown"):
raise ValueError("This should not happen?")
if parent.is_root():
# parent is the root is the leaf, og infection
assert node.name == parent.transm_ancest, "The parent is the root and node.name is OG?"
assert node.blockcount == -1, "Something wrong?"
data = [
parent.transm_ancest,
node.name,
0.0,
None,
]
return parent.transm_ancest, [data], None
# assert parent.blockstart == parent.blockend, "Assuming this is one infection atm!"
# node_root_dist = node.get_distance(root_node) # assumes node is a leaf!
# blockend so that if its a block we have the correct start...
length_to_add_from_cur_edge = parent.dist * parent.blockend
infection_start_dist_root = parent.up.get_distance(root_node) + length_to_add_from_cur_edge
if parent.blockcount > 0:
block_name = f"block_{node.name}"
data = [
block_name,
node.name,
infection_start_dist_root,
None,
]
else:
data = [
parent.transm_ancest,
node.name,
infection_start_dist_root,
None,
]
unknown_out_node = None
if parent.transm_ancest.startswith("Unknown"):
unknown_out_node = parent
return parent.transm_ancest, [data], unknown_out_node
[docs]
def find_infector_unknown(cur_u_node, root_node):
"""
Finds the infector of nodes that are unknown, not leafs
:param cur_u_node: A node whose transm_ancest is unknown
:param root_node: The root node of a tree (tree.get_tree_root())
:return:
"""
if cur_u_node.transm_ancest != cur_u_node.up.transm_ancest:
if cur_u_node.up.is_root():
parent_root_dist = 0.0
else:
parent_root_dist = cur_u_node.up.up.get_distance(root_node)
if cur_u_node.blockcount > 0:
infection_start_dist_root = parent_root_dist + (cur_u_node.blockstart * cur_u_node.dist)
block_name = f"block_{cur_u_node.name}"
if cur_u_node.up.blockcount > 0:
node_sibling = next(x for x in cur_u_node.up.children if x != cur_u_node)
if node_sibling.blockcount > 0:
# infecting a block from a block with a block as sibling will result in a newly
# named unknown sample
# block can not infect another block directly!
infection_block_unknown_start = parent_root_dist + (cur_u_node.up.dist *
cur_u_node.up.blockend)
data = [
[
f"Unknown-block_{cur_u_node.up.name}",
f"Unknown_{cur_u_node.up.name}",
infection_block_unknown_start,
None
],
[
f"Unknown_{cur_u_node.up.name}",
block_name,
infection_start_dist_root,
cur_u_node.blockcount
]
]
else:
# infecting a block from a block with a sibling having some label we know that
# the sibling sample persists and infects this block
data = [
node_sibling.transm_ancest,
block_name,
infection_start_dist_root,
cur_u_node.blockcount,
]
else:
data = [
cur_u_node.up.transm_ancest,
block_name,
infection_start_dist_root,
cur_u_node.blockcount,
]
else:
if hasattr(cur_u_node.up, "blockend"):
event_start_length_from_parent = cur_u_node.up.dist * cur_u_node.up.blockend
else:
event_start_length_from_parent = cur_u_node.up.dist
infection_start_dist_root = parent_root_dist + event_start_length_from_parent
if cur_u_node.up.blockcount > 0:
block_name = f"block_{cur_u_node.up.name}"
data = [
block_name,
cur_u_node.transm_ancest,
infection_start_dist_root,
None,
]
else:
data = [
cur_u_node.up.transm_ancest,
cur_u_node.transm_ancest,
infection_start_dist_root,
None,
]
result = data if isinstance(data[0], list) else [data]
if cur_u_node.up.transm_ancest.startswith("Unknown"):
if not cur_u_node.up.is_root():
more_data = find_infector_unknown(cur_u_node.up, root_node)
result.extend(more_data)
return result
parent = cur_u_node.up
while parent.transm_ancest == cur_u_node.transm_ancest:
if parent.is_root():
break
parent = parent.up
if parent.is_root():
data = [
parent.transm_ancest,
cur_u_node.transm_ancest,
0.0,
None,
]
return [data]
dist_start_infection_to_root = (parent.up.get_distance(root_node) +
(parent.dist * parent.blockend))
if cur_u_node.blockcount > 0:
block_name = f"block_{cur_u_node.name}"
if parent.blockcount > 0:
node_sibling = next(x for x in cur_u_node.up.children if x != cur_u_node)
if node_sibling.blockcount > 0:
# Block can not infect a block directly
infection_block_unknown_start = (parent.up.get_distance(root_node)
+ (parent.dist * parent.blockend))
data = [
[
f"Unknown-block_{parent.name}",
f"Unknown_{parent.name}",
infection_block_unknown_start,
None,
],
[
f"Unknown_{parent.name}",
block_name,
dist_start_infection_to_root,
cur_u_node.blockcount,
]
]
else:
data = [
node_sibling.transm_ancest,
block_name,
dist_start_infection_to_root,
cur_u_node.blockcount,
]
else:
data = [
parent.transm_ancest,
block_name,
dist_start_infection_to_root,
cur_u_node.blockcount
]
else:
if parent.blockcount > 0:
block_name = f"block_{parent.name}"
data = [
block_name,
cur_u_node.transm_ancest,
dist_start_infection_to_root,
None
]
else:
data = [
parent.transm_ancest,
cur_u_node.transm_ancest,
dist_start_infection_to_root,
None
]
result = data if isinstance(data[0], list) else [data]
if parent.transm_ancest.startswith("Unknown"):
more_data = find_infector_unknown(parent, root_node)
result.extend(more_data)
return result