import numpy as np
[docs]
def deep_identify(
current,
absorbing_state,
omega_nonrev_counts,
inverted_omega_nonrev_counts,
path,
all_paths_dict,
by_l=-1,
by_r=-1,
):
"""
Recursive function that identifies, in each iteration, the possible paths that can be taken from the current state to the final state. The function is called recursively until the final state is reached. The function stores the paths in the paths_array and all_paths_array arrays.
:param current: Current omega state in the recursion
:type current: Tuple of int
:param absorbing_state: Absorbing state
:type absorbing_state: Tuple of int
:param omega_nonrev_counts: Dictionary containing the number of non-reversible coalescents (value) for each omega state (key)
:type omega_nonrev_counts: Numba typed Dict
:param inverted_omega_nonrev_counts: Dictionary containing the omega states (value) for each number of non-reversible coalescents (key)
:type inverted_omega_nonrev_counts: Numba typed Dict
:param path: Current path in the recursion
:type path: Numpy array
:param all_paths_dict: Dictionary that recursively gets filled up with all the paths
:type all_paths_dict: Numpy array
:param by_l: Current omega left subpath, defaults to -1 (initial placeholder value)
:type by_l: int, optional
:param by_r: Current omega right subpath, defaults to -1 (initial placeholder value)
:type by_r: int, optional
"""
# Calculate differences
diff_l = omega_nonrev_counts[absorbing_state[0]] - omega_nonrev_counts[current[0]]
diff_r = omega_nonrev_counts[absorbing_state[1]] - omega_nonrev_counts[current[1]]
# Termination condition
if diff_l <= 1 and diff_r <= 1:
key = (by_l, by_r)
if key not in all_paths_dict:
all_paths_dict[key] = []
# Create a copy of the current path
path_copy = [tuple(p) for p in path]
all_paths_dict[key].append(path_copy)
return
start_l = omega_nonrev_counts[current[0]]
start_r = omega_nonrev_counts[current[1]]
end_l = omega_nonrev_counts[absorbing_state[0]]
end_r = omega_nonrev_counts[absorbing_state[1]]
# Explore next states for left
if start_l < end_l:
next_states_l = inverted_omega_nonrev_counts[start_l + 1]
for left in next_states_l:
new_state = (left, current[1])
new_by_l = (
by_l
if by_l != -1
else (
left
if omega_nonrev_counts[left] == 1 and start_l + 1 != end_l
else -1
)
)
path.append(new_state)
deep_identify(
new_state,
absorbing_state,
omega_nonrev_counts,
inverted_omega_nonrev_counts,
path,
all_paths_dict,
new_by_l,
by_r,
)
path.pop()
# Explore next states for right
if start_r < end_r:
next_states_r = inverted_omega_nonrev_counts[start_r + 1]
for right in next_states_r:
new_state = (current[0], right)
new_by_r = (
by_r
if by_r != -1
else (
right
if omega_nonrev_counts[right] == 1 and start_r + 1 != end_r
else -1
)
)
path.append(new_state)
deep_identify(
new_state,
absorbing_state,
omega_nonrev_counts,
inverted_omega_nonrev_counts,
path,
all_paths_dict,
by_l,
new_by_r,
)
path.pop()
# Explore next states for both left and right
if start_l < end_l and start_r < end_r:
next_states_l = inverted_omega_nonrev_counts[start_l + 1]
next_states_r = inverted_omega_nonrev_counts[start_r + 1]
for left in next_states_l:
for right in next_states_r:
if omega_nonrev_counts[right] > start_r:
new_state = (left, right)
new_by_l = (
by_l
if by_l != -1
else (
left
if omega_nonrev_counts[left] == 1 and start_l + 1 != end_l
else -1
)
)
new_by_r = (
by_r
if by_r != -1
else (
right
if omega_nonrev_counts[right] == 1 and start_r + 1 != end_r
else -1
)
)
path.append(new_state)
deep_identify(
new_state,
absorbing_state,
omega_nonrev_counts,
inverted_omega_nonrev_counts,
path,
all_paths_dict,
new_by_l,
new_by_r,
)
path.pop()
[docs]
def deep_identify_wrapper(
omega_init,
absorbing_state,
omega_nonrev_counts,
inverted_omega_nonrev_counts,
path_to_convert,
):
"""
Wrapper function for the deep_identify function. This function initializes the arrays and dictionaries needed for the recursion and calls the deep_identify function. In the end it returns the transformed keys and the paths.
:param omega_init: Initial omega state
:type omega_init: Tuple of int
:param absorbing_state: Absorbing state
:type absorbing_state: Tuple of int
:param omega_nonrev_counts: Dictionary containing the number of non-reversible coalescents (value) for each omega state (key)
:type omega_nonrev_counts: Numba typed Dict
:param inverted_omega_nonrev_counts: Dictionary containing the omega states (value) for each number of non-reversible coalescents (key)
:type inverted_omega_nonrev_counts: Numba typed Dict
:return: Resulting keys and paths
:rtype: Tuple(Array, Array, Array, int)
"""
all_paths_dict = {}
path = [omega_init]
deep_identify(
omega_init,
absorbing_state,
omega_nonrev_counts,
inverted_omega_nonrev_counts,
path,
all_paths_dict,
)
# Convert dictionary to keys_array and paths_array
keys_array = np.array(list(all_paths_dict.keys()))
keys_array_final = np.zeros((len(keys_array), 6))
path_to_convert_array = np.array(path_to_convert)
flattened_array = np.hstack(path_to_convert_array.ravel())
coded_dict = {3: 1, 5: 2, 6: 3}
for i, key in enumerate(keys_array):
keys_array_final[i] = flattened_array
if key[0] != -1 and keys_array_final[i][0] == -1:
keys_array_final[i][0] = coded_dict[key[0]]
if key[1] != -1 and keys_array_final[i][3] == -1:
keys_array_final[i][3] = coded_dict[key[1]]
# Find the maximum number of paths and subpaths for padding
max_paths = max(len(paths) for paths in all_paths_dict.values())
max_subpaths = max(
max(len(subpath) for subpath in paths) for paths in all_paths_dict.values()
)
# Initialize paths_array with zeros
paths_array = np.zeros((len(all_paths_dict), max_paths, max_subpaths, 2))
# Initialize path_lengths_array to store the length of each path
path_lengths_array = np.zeros((len(all_paths_dict), max_paths))
# Fill paths_array and path_lengths_array
for i, (key, paths) in enumerate(all_paths_dict.items()):
for j, subpath in enumerate(paths):
path_lengths_array[i, j] = len(subpath) # Store the length of the path
for k, point in enumerate(subpath):
paths_array[i, j, k] = point
return keys_array_final, paths_array, path_lengths_array, max_subpaths
[docs]
def deepest_ti(trans_mat_noabs, omega_dict_noabs, path):
"""
This function calculated the integral of matrix exponentials with an infinite time limit.
:param trans_mat_noabs: Transition matrix without absorbing states
:type trans_mat_noabs: Numpy array
:param omega_dict_noabs: Omega dictionary without absorbing states
:type omega_dict_noabs: Numpy array
:param path: Path of omega states
:type path: Numpy array
:return: Result of the integral of the series of multiplying matrix exponentials.
:rtype: Numpy array
"""
steps = len(path) - 1
n = trans_mat_noabs.shape[0]
if steps == 1:
C_mat = trans_mat_noabs
elif steps > 1:
C_mat = np.zeros((n * steps, n * steps))
C_mat[0:n, 0:n] = trans_mat_noabs
for idx in range(1, steps):
sub_om_init = (path[idx - 1, 0], path[idx - 1, 1])
sub_om_fin = (path[idx, 0], path[idx, 1])
A_mat = (
np.diag(omega_dict_noabs[sub_om_init].astype(np.float64))
@ trans_mat_noabs
@ np.diag(omega_dict_noabs[sub_om_fin].astype(np.float64))
)
C_mat[n * idx : n * (idx + 1), n * idx : n * (idx + 1)] = trans_mat_noabs
C_mat[n * (idx - 1) : n * idx, n * idx : n * (idx + 1)] = A_mat
sub_om_init = (path[-2, 0], path[-2, 1])
sub_om_fin = (path[-1, 0], path[-1, 1])
A_mat = np.ascontiguousarray(
np.diag(omega_dict_noabs[sub_om_init].astype(np.float64))
@ trans_mat_noabs
@ np.diag(omega_dict_noabs[sub_om_fin].astype(np.float64))
)
result = (-np.linalg.inv(C_mat))[:n, -n:] @ A_mat
return result