Source code for itrails.int_get_joint_prob_mat

from ast import literal_eval

import numpy as np
from scipy.linalg import expm

from itrails.int_combine_states import combine_states
from itrails.int_get_tab import (
    get_tab_AB_introgression,
    get_tab_ABC_introgression,
    precomp,
)
from itrails.int_get_times import get_times
from itrails.int_load_trans_mat import load_trans_mat, trans_mat_num


[docs] def get_joint_prob_mat_introgression( t_A, t_B, t_AB, t_C, t_m, rho_A, rho_B, rho_AB, rho_C, rho_ABC, coal_A, coal_B, coal_AB, coal_BC, coal_C, coal_ABC, m, n_int_AB, n_int_ABC, cut_AB, cut_ABC, tmp_path="./", ): """ This is a wrapper function that unifies all the CTMCs to get a matrix of joint probabilities for the HMM. Parameters ---------- t_A : float Time for the one-sequence CTMC of the first sequence (A) t_B : float Time for the one-sequence CTMC of the second sequence (B) t_AB : float Time for the two-sequence CTMC (AB) t_C : float Time for the one-sequence CTMC of the third sequence (C) t_m : float Time from the migration event to the first speciation event rho_A : float Recombination rate for the one-sequence CTMC of the first sequence (A) rho_B : float Recombination rate for the one-sequence CTMC of the second sequence (B) rho_AB : float Recombination rate for the two-sequence CTMC (AB) rho_C : float Recombination rate for the one-sequence CTMC of the third sequence (C) rho_ABC : float Recombination rate for the three-sequence CTMC (ABC) coal_A : float Coalescent rate for the one-sequence CTMC of the first sequence (A) coal_B : float Coalescent rate for the one-sequence CTMC of the second sequence (B) coal_AB : float Coalescent rate for the two-sequence CTMC (AB) coal_BC : float Coalescent rate for the two-sequence CTMC (BC) coal_C : float Coalescent rate for the one-sequence CTMC of the third sequence (C) coal_ABC : float Coalescent rate for the three-sequence CTMC (ABC) m : float Probability that an individual migrates from B to C (backwards in time) n_int_AB : integer Number of intervals of the two-sequence CTMC (AB) n_int_ABC : integer Number of intervals of the three-sequence CTMC (ABC) """ # Calculate cutpoints intervals_AB = get_times(cut_AB, list(range(len(cut_AB)))) intervals_BC = [intervals_AB[0] + t_m] + intervals_AB[1:] #################################### ### Load state-space information ### #################################### # Load string transition rate matrix and convert string names to actual lists (trans_mat_1, state_space_1) = load_trans_mat(1) state_space_A = [literal_eval(i) for i in state_space_1] (trans_mat_2, state_space_2) = load_trans_mat(2) state_space_AB = [literal_eval(i) for i in state_space_2] (trans_mat_3, state_space_3) = load_trans_mat(3) state_space_ABC = [literal_eval(i) for i in state_space_3] state_space_B = [] for j in state_space_A: lst = [] for i in j: one = 2 if i[0] == 1 else i[0] two = 2 if i[1] == 1 else i[1] lst.append((one, two)) state_space_B.append(lst) state_space_C = [] for j in state_space_A: lst = [] for i in j: one = 4 if i[0] == 1 else i[0] two = 4 if i[1] == 1 else i[1] lst.append((one, two)) state_space_C.append(lst) trans_mat_A = trans_mat_num(trans_mat_1, coal_A, rho_A) trans_mat_B = trans_mat_num(trans_mat_1, coal_B, rho_B) trans_mat_C = trans_mat_num(trans_mat_1, coal_C, rho_C) trans_mat_AB = trans_mat_num(trans_mat_2, coal_AB, rho_AB) pr_AB = precomp(trans_mat_AB, intervals_AB) trans_mat_BC = trans_mat_num(trans_mat_2, coal_BC, rho_AB) pr_BC = precomp(trans_mat_BC, intervals_BC) ########################## ### One-sequence CTMCs ### ########################## # Species A, from present to first speciation event final_A = expm(trans_mat_A * t_A)[0] # Species B, from present to migration event final_B = expm(trans_mat_B * t_B)[0] # Species C, from present to migration event final_C = expm(trans_mat_C * t_C)[0] # Species A, from present to second speciation event final_A_bis = expm(trans_mat_A * (t_A + t_AB))[0] # Species C, from present to second speciation event final_C_bis = expm(trans_mat_C * (t_C + t_m + t_AB))[0] # Split probabilities for the left and the right path # Left path: lineages do not migrate and will later mix with species A (state_space_B_left, final_B_left) = split_migration( state_space_B, final_B, m, "left" ) # Right path: lineages migrate and are instantly mixed with species C (state_space_B_right, final_B_right) = split_migration( state_space_B, final_B, m, "right" ) ########################## ### Two-sequence CTMCs ### ########################## # -#-# Right path, from migration event to second speciation event #-#-# # # Full lineages # # # This is when both the left and the right sites have B lineages # Combine the state space of the migrated B lineages with the state space of C (comb_BC_name_full, comb_BC_value_full) = combine_states( state_space_A, state_space_B_right[0:2], final_C, final_B_right[0:2] ) # Re-order states pi_BC_full = [ comb_BC_value_full[comb_BC_name_full.index(i)] if i in comb_BC_name_full else 0 for i in state_space_2 ] # Obtain the correct state space for BC state_space_BC = [j.replace("1", "4") for j in state_space_2] state_space_BC = [j.replace("3", "6") for j in state_space_BC] state_space_BC = [literal_eval(i) for i in state_space_BC] state_space_BC = [sorted(i) for i in state_space_BC] # # Missing lineages # # # This is when at least one of either the left or the right sites are missing a B lineage # This is the transition matrix and state space for when lineages are missing (trans_mat_2_miss, state_space_2_miss) = load_trans_mat_miss() state_space_BC_miss = [literal_eval(i) for i in state_space_2_miss] trans_mat_BC_miss = trans_mat_num(trans_mat_2_miss, coal_BC, rho_AB) pr_BC_miss = precomp(trans_mat_BC_miss, intervals_BC) # Combine the state space of the migrated B lineages with the state space of C (comb_BC_name_miss, comb_BC_value_miss) = combine_states( state_space_C, state_space_B_right[2::], final_C, final_B_right[2::] ) # Re-order states pi_BC_miss = [ comb_BC_value_miss[comb_BC_name_miss.index(i)] if i in comb_BC_name_miss else 0 for i in state_space_2_miss ] # -#-# Left path, from migration event to second speciation event #-#-# # # Full lineages # # # This is when both the left and the right sites have B lineages # One-sequence CTMC from migration to first speciation event final_B_left_full = final_B_left[0:2] @ expm(trans_mat_B * t_m) # Combine the state space of the non-migrated B lineages with the state space of A (comb_AB_name_full, comb_AB_value_full) = combine_states( state_space_A, state_space_B_left[0:2], final_A, final_B_left_full ) # Re-order states pi_AB_full = [ comb_AB_value_full[comb_AB_name_full.index(i)] if i in comb_AB_name_full else 0 for i in state_space_2 ] state_space_AB = [literal_eval(i) for i in state_space_2] # # Missing lineages # # # This is when at least one of either the left or the right sites are missing a B lineage # This is the transition matrix and state space for when lineages are missing. # It is the same as for the right path, but with A instead of C. (trans_mat_4, state_space_4) = load_trans_mat_miss() trans_mat_AB_miss = trans_mat_num(trans_mat_4, coal_AB, rho_AB) pr_AB_miss = precomp(trans_mat_AB_miss, intervals_AB) state_space_4 = [i.replace("4", "1") for i in state_space_4] state_space_4 = [i.replace("6", "3") for i in state_space_4] state_space_AB_miss = [sorted(literal_eval(i)) for i in state_space_4] state_space_4 = [str(i) for i in state_space_AB_miss] # Combine the state space of the non-migrated B lineages with the state space of A (comb_AB_name_miss, comb_AB_value_miss) = combine_states( state_space_A, state_space_B_left[2::], final_A, final_B_left[2::] ) # Re-order states pi_AB_miss = [ comb_AB_value_miss[comb_AB_name_miss.index(i)] if i in comb_AB_name_miss else 0 for i in state_space_4 ] tab, tab_names = get_tab_AB_introgression( state_space_AB, state_space_AB_miss, state_space_BC, state_space_BC_miss, state_space_A, state_space_C, state_space_ABC, pi_AB_full, pi_AB_miss, pi_BC_full, pi_BC_miss, final_A_bis, final_C_bis, pr_AB, pr_AB_miss, pr_BC, pr_BC_miss, n_int_AB, ) trans_mat_ABC = trans_mat_num(trans_mat_3, coal_ABC, rho_ABC) joint_mat = get_tab_ABC_introgression( state_space_ABC, trans_mat_ABC, cut_ABC, tab, tab_names, n_int_AB, tmp_path ) return joint_mat
[docs] def split_migration(state_space, prob_vec, m, direction): """ This function splits the vector of final probabilities for species B given a certain migration rate m. Based on the direction of choice, the resulting probabilities will correspond to the left path (where lineages do not migrate and will later mix with species A), or the right path (where lineages migrate and are instantly mixed with species C). Parameters ---------- state_space : list of lists of tuples The state space for a one-sequence CTMC prob_vec : numpy array The end probabilities of the one-sequence CTMC at the time of migration m : float Probability that a sequence migrates direction : string Left or right path """ # Pr(O-- --O) = 1 - Pr(O--O) x = prob_vec[1] # Define new state space # O--O O-- --O O-- --O st = [state_space[0], state_space[1], [state_space[1][0]], [state_space[1][1]]] if direction == "left": pr = np.array( [ (1 - x) * (1 - m), (1 - m) ** 2 * x, 1 / 2 * (1 - m) * m * x, 1 / 2 * (1 - m) * m * x, ] ) if direction == "right": pr = np.array( [(1 - x) * m, x * m**2, 1 / 2 * (1 - m) * m * x, 1 / 2 * (1 - m) * m * x] ) return (st, pr)
[docs] def load_trans_mat_miss(): """ This function defines the state space and transition rate matrix for the right path when lineages are missing, i.e., when only the right or the left site of species B are mixed with the two sites of species C. """ mat = np.array( [ ["0", "R", "0", "0", "C", "0", "0", "0", "0", "0"], ["C", "0", "C", "C", "0", "0", "0", "0", "0", "0"], ["0", "R", "0", "0", "C", "0", "0", "0", "0", "0"], ["0", "0", "0", "0", "C", "0", "0", "0", "0", "0"], ["0", "0", "0", "R", "0", "0", "0", "0", "0", "0"], ["0", "0", "0", "0", "0", "0", "R", "0", "0", "C"], ["0", "0", "0", "0", "0", "C", "0", "C", "C", "0"], ["0", "0", "0", "0", "0", "0", "R", "0", "0", "C"], ["0", "0", "0", "0", "0", "0", "0", "0", "0", "C"], ["0", "0", "0", "0", "0", "0", "0", "0", "R", "0"], ], dtype=object, ) st = [ "[(2, 0), (4, 4)]", "[(0, 4), (2, 0), (4, 0)]", "[(2, 4), (4, 0)]", "[(0, 4), (6, 0)]", "[(6, 4)]", "[(0, 2), (4, 4)]", "[(0, 2), (0, 4), (4, 0)]", "[(0, 4), (4, 2)]", "[(0, 6), (4, 0)]", "[(4, 6)]", ] return (mat, st)
[docs] def divide_starting_probs(ordered_pi_ABC, state_space_ABC): """ This function divides the starting probabilities of the three-sequence CTMC according to the path taken, i.e., it identifies the states where sequences have already coalesced at the left and/or the right sites, and it divides their starting probabilities based on this. Parameters ---------- ordered_pi_ABC : list of floats The starting probabilities of the three-sequence CTMC state_space_ABC : list of lists of tuples The ordered state space of the three-sequence CTMC """ # Obtain all of the omegas om = {} flatten = [list(sum(i, ())) for i in state_space_ABC] for l in [0, 3, 5, 6, 7]: for r in [0, 3, 5, 6, 7]: if (l in [3, 5, 6, 7]) and (r in [3, 5, 6, 7]): om["%s%s" % (l, r)] = [ i for i in range(203) if (l in flatten[i][::2]) and (r in flatten[i][1::2]) ] elif (l == 0) and (r in [3, 5, 6, 7]): om["%s%s" % (l, r)] = [ i for i in range(203) if (all(x not in [3, 5, 6, 7] for x in flatten[i][::2])) and (r in flatten[i][1::2]) ] elif (l in [3, 5, 6, 7]) and (r == 0): om["%s%s" % (l, r)] = [ i for i in range(203) if (l in flatten[i][::2]) and (all(x not in [3, 5, 6, 7] for x in flatten[i][1::2])) ] elif l == r == 0: om["%s%s" % (l, r)] = [ i for i in range(203) if all(x not in [3, 5, 6, 7] for x in flatten[i]) ] omega_tot_ABC = [i for i in range(203)] om["71"] = sorted(om["73"] + om["75"] + om["76"]) om["17"] = sorted(om["37"] + om["57"] + om["67"]) om["10"] = sorted(om["30"] + om["50"] + om["60"]) om["13"] = sorted(om["33"] + om["53"] + om["63"]) om["15"] = sorted(om["35"] + om["55"] + om["65"]) om["16"] = sorted(om["36"] + om["56"] + om["66"]) om["11"] = sorted(om["13"] + om["15"] + om["16"]) # Create empty table for the joint probabilities tab = np.zeros((9, 203)) # Create empty vector for the names of the states tab_names = [] # Create accumulator for keeping track of the indices for the table acc = 0 # Uncoalesced states (destined for deep coalesce at both sites) tab_names.append(("D", "D")) tmp_lst = om["00"] tab[acc] = [ ordered_pi_ABC[i] if i in tmp_lst else 0 for i in range(len(state_space_ABC)) ] acc += 1 # Introgression states (already coalesced at both sites through right path) tab_names.append(((4, 0), (4, 0))) tmp_lst = om["66"] tab[acc] = [ ordered_pi_ABC[i] if i in tmp_lst else 0 for i in range(len(state_space_ABC)) ] acc += 1 # Shallow coalescence states (already coalesced at both sites through left path) tab_names.append(((0, 0), (0, 0))) tmp_lst = om["33"] tab[acc] = [ ordered_pi_ABC[i] if i in tmp_lst else 0 for i in range(len(state_space_ABC)) ] acc += 1 # Uncoalesced on left site, introgressed on right site tab_names.append(("D", (4, 0))) tmp_lst = om["06"] tab[acc] = [ ordered_pi_ABC[i] if i in tmp_lst else 0 for i in range(len(state_space_ABC)) ] acc += 1 # Introgressed on left site, uncoalesced on right site tab_names.append(((4, 0), "D")) tmp_lst = om["60"] tab[acc] = [ ordered_pi_ABC[i] if i in tmp_lst else 0 for i in range(len(state_space_ABC)) ] acc += 1 # Uncoalesced on left site, shallow coalescence on right site tab_names.append(("D", (0, 0))) tmp_lst = om["03"] tab[acc] = [ ordered_pi_ABC[i] if i in tmp_lst else 0 for i in range(len(state_space_ABC)) ] acc += 1 # Shallow coalescence on left site, uncoalesced on right site tab_names.append(((0, 0), "D")) tmp_lst = om["30"] tab[acc] = [ ordered_pi_ABC[i] if i in tmp_lst else 0 for i in range(len(state_space_ABC)) ] acc += 1 # Introgressed on left site, uncoalesced on right site tab_names.append(((4, 0), (0, 0))) tmp_lst = om["63"] tab[acc] = [ ordered_pi_ABC[i] if i in tmp_lst else 0 for i in range(len(state_space_ABC)) ] acc += 1 # Uncoalesced on left site, introgressed on right site tab_names.append(((0, 0), (4, 0))) tmp_lst = om["36"] tab[acc] = [ ordered_pi_ABC[i] if i in tmp_lst else 0 for i in range(len(state_space_ABC)) ] acc += 1 return (tab, tab_names)