Source code for itrails.run_markov_chain_AB

import numpy as np
from joblib import Parallel, delayed

import itrails.ncpu as ncpu
from itrails.expm import expm
from itrails.helper_omegas import translate_to_omega


[docs] def compute_matrix_end(prob_mat, exponential_time, omega_end_mask): """ Helper function that computes the first matrix multiplication, then slices the matrix to get the columns corresponding to the end state. :param prob_mat: Matrix of probabilities :type prob_mat: Numpy array :param exponential_time: Exponential matrix of transition matrix multiplied by time. :type exponential_time: Numpy array :param omega_end_mask: Vector of booleans that masks the columns of the matrix. :type omega_end_mask: Numpy array of booleans. :return: Sliced matrix :rtype: Numpy array """ mat_mult_result = prob_mat @ exponential_time result = mat_mult_result * omega_end_mask return result
[docs] def compute_matrix_start_end( prob_mat, exponential_time, omega_start_mask, omega_end_mask ): """ Helper function that computes all matrix multiplications but the first, then slices the matrix to get the rows corresponding to the starting state and columns corresponding to the end state. :param prob_mat: Matrix of probabilities :type prob_mat: Numpy array :param exponential_time: Exponential matrix of transition matrix multiplied by time. :type exponential_time: Numpy array :param omega_start_mask: Vector of booleans that masks the rows of the matrix. :type omega_start_mask: Numpy array of booleans. :param omega_end_mask: Vector of booleans that masks the columns of the matrix. :type omega_end_mask: Numpy array of booleans. :return: Sliced matrix :rtype: Numpy array """ sliced_mat = (omega_start_mask) @ (exponential_time) @ (omega_end_mask) result = (prob_mat) @ (sliced_mat) return result
[docs] def compute_matrices_end_wrapper( prob_mats, exponential_time, omega_end_masks, num_combinations, ): """Parallel wrapper that computes the sliced matrix result for each combination by invoking compute_matrix_end on each set of inputs; for every index from 0 to num_combinations-1, it multiplies the corresponding probability matrix (from prob_mats) with the result of slicing the exponential_time matrix using the corresponding omega_end_mask, and aggregates all results into a single numpy array; the computations are performed in parallel using joblib.Parallel with ncpu.N_CPU_GLOBAL workers. :param prob_mats: List or array of probability matrices where each element is a numpy array. :type prob_mats: list or np.ndarray. :param exponential_time: Numpy array representing the matrix computed by applying the exponential function to the transition matrix multiplied by time. :type exponential_time: np.ndarray. :param omega_end_masks: List or array of boolean vectors used to mask the columns of the exponential_time matrix for each combination. :type omega_end_masks: list or np.ndarray of bool. :param num_combinations: Total number of combinations (iterations) over which to compute the sliced matrix. :type num_combinations: int. :return: Numpy array containing the resulting matrices computed for each combination. :rtype: np.ndarray.""" results = Parallel(n_jobs=ncpu.N_CPU_GLOBAL)( delayed(compute_matrix_end)(prob_mats[i], exponential_time, omega_end_masks[i]) for i in range(num_combinations) ) return np.array(results)
[docs] def compute_matrices_start_end_wrapper( prob_mats, exponential_time, omega_start_masks, omega_end_masks, num_combinations, ): """Parallel wrapper that computes the sliced matrix result for each combination by invoking compute_matrix_start_end on each set of inputs; for every index from 0 to num_combinations-1, it multiplies the corresponding probability matrix (from prob_mats) with the result of slicing the exponential_time matrix using the corresponding omega_start_mask and omega_end_mask, and aggregates all results into a single numpy array; the computations are performed in parallel using joblib.Parallel with ncpu.N_CPU_GLOBAL workers. :param prob_mats: List or array of probability matrices where each element is a numpy array. :type prob_mats: list or np.ndarray. :param exponential_time: Numpy array representing the matrix computed by applying the exponential function to the transition matrix multiplied by time. :type exponential_time: np.ndarray. :param omega_start_masks: List or array of boolean vectors used to mask the rows of the exponential_time matrix for each combination. :type omega_start_masks: list or np.ndarray of bool. :param omega_end_masks: List or array of boolean vectors used to mask the columns of the exponential_time matrix for each combination. :type omega_end_masks: list or np.ndarray of bool. :param num_combinations: Total number of combinations (iterations) over which to compute the sliced matrix. :type num_combinations: int. :return: Numpy array containing the resulting matrices computed for each combination. :rtype: np.ndarray.""" results = Parallel(n_jobs=ncpu.N_CPU_GLOBAL)( delayed(compute_matrix_start_end)( prob_mats[i], exponential_time, omega_start_masks[i], omega_end_masks[i] ) for i in range(num_combinations) ) return np.array(results)
[docs] def run_markov_chain_AB( trans_mat, times, omega_dict, prob_dict, n_int_AB, ): """ Function that runs the Discrete Time Markov chain for species A and B. :param trans_mat: Transition matrix. :type trans_mat: Numpy array :param times: Array of cut times. :type times: Numpy array :param omega_dict: Dictionary of omega indices (key) and vector of booleans where each key has the states (value). :type omega_dict: Numba typed dictionary :param prob_dict: Dictionary of each path (keys) and probabilities for each state at the start of the first time interval (values). :type prob_dict: Numba typed dictionary :param n_int_AB: Number of intervals for species A and B. :type n_int_AB: int :return: Updated dictionary of each path (keys) and probabilities for each state at the end of the last time interval (values). :rtype: Numba typed dictionary """ step = 0 exponential_time_0 = expm(trans_mat * times[step]) exponential_time_0 = exponential_time_0.copy() og_keys = list(prob_dict.keys()) for path in og_keys: prob_mats = np.zeros((4, 1, 15), dtype=np.float64) omega_masks_end = np.zeros((4, 1, 15), dtype=np.float64) keys = np.zeros((4, 6), dtype=np.int64) result_idx = 0 prob_mat = prob_dict[path] l_path, r_path = path[0], path[1] l_results = np.full((2, 3), -1, dtype=np.int64) r_results = np.full((2, 3), -1, dtype=np.int64) l_results[0] = l_path r_results[0] = r_path l_results[1] = (0, step, l_path[2]) if l_path[0] == -1 else l_path r_results[1] = (0, step, r_path[2]) if r_path[0] == -1 else r_path for l_row in l_results: l_tuple = (int(l_row[0]), int(l_row[1]), int(l_row[2])) for r_row in r_results: r_tuple = (int(r_row[0]), int(r_row[1]), int(r_row[2])) if (l_tuple, r_tuple) in og_keys and not ( np.array_equal(l_row, l_path) and np.array_equal(r_row, r_path) ): continue else: new_key = ( l_tuple, r_tuple, ) omega_end_mask = omega_dict[translate_to_omega(new_key)].reshape( 1, 15 ) new_row = np.array( [l_row[0], l_row[1], l_row[2], r_row[0], r_row[1], r_row[2]], dtype=np.int64, ) prob_mats[result_idx] = np.ascontiguousarray(prob_mat) omega_masks_end[result_idx] = np.ascontiguousarray(omega_end_mask) keys[result_idx] = np.ascontiguousarray(new_row) result_idx += 1 results = compute_matrices_end_wrapper( prob_mats, exponential_time_0, omega_masks_end, result_idx ) step += 1 for i in range(result_idx): prob_dict[ ( (keys[i][0], keys[i][1], keys[i][2]), ( keys[i][3], keys[i][4], keys[i][5], ), ) ] = results[i] for _ in range(step, n_int_AB): exponential_time = expm(trans_mat * times[step]) exponential_time = exponential_time.copy() og_keys = list(prob_dict.keys()) for path in og_keys: prob_mats = np.zeros((4, 1, 15), dtype=np.float64) omega_masks_start = np.zeros((4, 15, 15), dtype=np.float64) omega_masks_end = np.zeros((4, 15, 15), dtype=np.float64) keys = np.zeros((4, 6), dtype=np.int64) result_idx = 0 prob_mat = prob_dict[path] l_path, r_path = path[0], path[1] l_results = np.full((2, 3), -1, dtype=np.int64) r_results = np.full((2, 3), -1, dtype=np.int64) l_results[0] = l_path r_results[0] = r_path l_results[1] = (0, step, l_path[2]) if l_path[0] == -1 else l_path r_results[1] = (0, step, r_path[2]) if r_path[0] == -1 else r_path for l_row in l_results: l_tuple = (int(l_row[0]), int(l_row[1]), int(l_row[2])) for r_row in r_results: r_tuple = (int(r_row[0]), int(r_row[1]), int(r_row[2])) if (l_tuple, r_tuple) in og_keys and not ( np.array_equal(l_row, l_path) and np.array_equal(r_row, r_path) ): continue else: new_key = ( l_tuple, r_tuple, ) omega_start = translate_to_omega(path) omega_end = translate_to_omega(new_key) omega_start_mask = omega_dict[omega_start] omega_end_mask = omega_dict[omega_end] new_row = np.array( [ l_row[0], l_row[1], l_row[2], r_row[0], r_row[1], r_row[2], ], dtype=np.int64, ) prob_mats[result_idx] = np.ascontiguousarray(prob_mat) omega_masks_start[result_idx] = np.diag(omega_start_mask) omega_masks_end[result_idx] = np.diag(omega_end_mask) keys[result_idx] = np.ascontiguousarray(new_row) result_idx += 1 results = compute_matrices_start_end_wrapper( prob_mats, exponential_time, omega_masks_start, omega_masks_end, result_idx, ) for i in range(result_idx): prob_dict[ ( (keys[i][0], keys[i][1], keys[i][2]), ( keys[i][3], keys[i][4], keys[i][5], ), ) ] = results[i] step += 1 return prob_dict