Source code for itrails.int_get_trans_emiss

import numpy as np
import pandas as pd

from itrails.cutpoints import cutpoints_AB, cutpoints_ABC
from itrails.int_get_emission_prob_mat import get_emission_prob_mat_introgression
from itrails.int_get_joint_prob_mat import get_joint_prob_mat_introgression


[docs] def trans_emiss_calc_introgression( t_A, t_B, t_C, t_2, t_upper, t_out, t_m, N_AB, N_BC, N_ABC, r, m, n_int_AB, n_int_ABC, cut_AB, cut_ABC, tmp_path="./", ): """ This function calculates the emission and transition probabilities given a certain set of parameters. Parameters ---------- t_A : numeric Time in generations from present to the first speciation event for species A (times mutation rate) t_B : numeric Time in generations from present to the migration event for species B (times mutation rate) t_C : numeric Time in generations from present to the migration event for species C (times mutation rate) t_2 : numeric Time in generations from the first speciation event to the second speciation event (times mutation rate) t_upper : numeric Time in generations between the end of the second-to-last interval and the third speciation event (times mutation rate) t_out : numeric Time in generations from present to the third speciation event for species D, plus the divergence between the ancestor of D and the ancestor of A, B and C at the time of the third speciation event (times mutation rate) t_m : numeric Time in generagions from admixture time until first speciation time N_AB : numeric Effective population size between speciation events (times mutation rate) for AB N_BC : numeric Effective population size between speciation events (times mutation rate) for BC N_ABC : numeric Effective population size in deep coalescence, before the second speciation event (times mutation rate) r : numeric Recombination rate per site per generation (divided by mutation rate) m : numeric Migration rate (admixture proportion) n_int_AB : integer Number of discretized time intervals between speciation events n_int_ABC : integer Number of discretized time intervals in deep coalescent """ # Reference Ne (for normalization) N_ref = N_ABC # Speciation times (in coalescent units, i.e. number of generations / N_ref) t_A = t_A / N_ref t_B = t_B / N_ref t_AB = t_2 / N_ref t_C = t_C / N_ref t_upper = t_upper / N_ref t_out = t_out / N_ref t_m = t_m / N_ref # Recombination rates (r = rec. rate per site per generation) rho_A = N_ref * r rho_B = N_ref * r rho_AB = N_ref * r rho_C = N_ref * r rho_ABC = N_ref * r # Coalescent rates coal_A = N_ref / N_AB coal_B = N_ref / N_AB coal_AB = N_ref / N_AB coal_BC = N_ref / N_BC coal_C = N_ref / N_BC coal_ABC = N_ref / N_ABC # Mutation rates (mu = mut. rate per site per generation) mu_A = N_ref * (4 / 3) mu_B = N_ref * (4 / 3) mu_C = N_ref * (4 / 3) mu_D = N_ref * (4 / 3) mu_AB = N_ref * (4 / 3) mu_ABC = N_ref * (4 / 3) if isinstance(cut_AB, str): if cut_AB == "standard": cut_AB = cutpoints_AB(n_int_AB, t_AB, coal_AB) if isinstance(cut_ABC, str): if cut_ABC == "standard": cut_ABC = cutpoints_ABC(n_int_ABC, coal_ABC) cut_AB = np.asarray(cut_AB, dtype=float) cut_ABC = np.asarray(cut_ABC, dtype=float) tr = get_joint_prob_mat_introgression( t_A, t_B, t_AB, t_C, t_m, rho_A, rho_B, rho_AB, rho_C, rho_ABC, coal_A, coal_B, coal_AB, coal_BC, coal_C, coal_ABC, m, n_int_AB, n_int_ABC, cut_AB, cut_ABC, tmp_path, ) tr = pd.DataFrame(tr, columns=["From", "To", "Prob"]).pivot( index=["From"], columns=["To"], values=["Prob"] ) tr.columns = tr.columns.droplevel() hidden_names = list(tr.columns) hidden_names = dict(zip(range(len(hidden_names)), hidden_names)) arr = np.array(tr).astype(np.float64) pi = arr.sum(axis=1) a = arr / pi[:, None] em = get_emission_prob_mat_introgression( t_A, t_B, t_AB, t_C, t_upper, t_out, t_m, rho_A, rho_B, rho_AB, rho_C, rho_ABC, coal_A, coal_B, coal_AB, coal_BC, coal_C, coal_ABC, n_int_AB, n_int_ABC, mu_A, mu_B, mu_C, mu_D, mu_AB, mu_ABC, cut_AB, cut_ABC, ) em.hidden_state = em.hidden_state.astype("category") em.hidden_state.cat.set_categories(hidden_names) em = em.sort_values(["hidden_state"]) em = em.iloc[:, 1:] observed_names = list(em.columns) observed_names = dict(zip(range(len(observed_names)), observed_names)) b = np.array(em) return a, b, pi, hidden_names, observed_names