import pathlib
import time
import numpy as np
from scipy.sparse import csr_matrix, load_npz, triu
from ..cool import write_coo, get_chrom_offsets
from .merge_cell_to_group import read_single_cool_chrom
import cooler
import pandas as pd
import h5py
from concurrent.futures import ProcessPoolExecutor, as_completed
[docs]
def chrom_ave_iterator(chunk_dirs,
chrom_sizes,
chrom_offset,
matrix_type,
total_cells):
print(f'Reading matrix {matrix_type}')
for chrom in chrom_sizes.keys():
cool_path = list(chunk_dirs[0].glob(f'*/*.{matrix_type}.cool'))
with h5py.File(cool_path, 'r') as f:
matrix = read_single_cool_chrom(cool_path, chrom)
for chunk_dir in chunk_dirs[1:]:
cool_path = list(chunk_dir.glob(f'*/*.{matrix_type}.cool'))
matrix += read_single_cool_chrom(cool_path, chrom)
matrix = matrix.tocoo()
pixel_df = pd.DataFrame({
'bin1_id': matrix.row,
'bin2_id': matrix.col,
'count': matrix.data
})
pixel_df.iloc[:, :2] += chrom_offset[chrom]
pixel_df.iloc[:, -1] /= total_cells
yield pixel_df
[docs]
def save_single_matrix_type(cooler_path,
bins_df,
chunk_dirs,
chrom_sizes,
chrom_offset,
matrix_type,
total_cells):
chrom_iter = chrom_ave_iterator(chunk_dirs,
chrom_sizes,
chrom_offset,
matrix_type,
total_cells)
cooler.create_cooler(cool_uri=cooler_path,
bins=bins_df,
pixels=chrom_iter,
ordered=True,
dtypes={'count': np.float32})
with h5py.File(cooler_path, 'a') as f:
f.attrs['group_n_cells'] = total_cells
return
[docs]
def merge_group_to_bigger_group_cools(chrom_size_path,
resolution,
group,
output_dir,
group_list,
shuffle,
matrix_types=('E', 'E2', 'T', 'T2', 'Q', 'Q2')):
"""
Sum all the chunk sum cool files,
and finally divide the total number of cells to
get a group cell number normalized cool file in the end.
"""
# determine chunk dirs for the group:
output_dir = pathlib.Path(output_dir).absolute()
group_dir = output_dir / group
group_dir.mkdir(exist_ok=True, parents=True)
group_list = [pathlib.Path(xx) for xx in group_list]
# count total cells
total_cells = 0
for chunk_dir in group_list:
total_cells += pd.read_csv(chunk_dir / 'cell_table.tsv', index_col=0).shape[0]
if shuffle:
group_list = [xx / 'shuffle/' for xx in group_list]
chrom_sizes = cooler.read_chromsizes(chrom_size_path, all_names=True)
bins_df = cooler.binnify(chrom_sizes, resolution)
chrom_offset = get_chrom_offsets(bins_df)
with ProcessPoolExecutor(5) as exe:
futures = {}
for matrix_type in matrix_types:
cooler_path = str(group_dir / f'{group}.{matrix_type}.cool')
future = exe.submit(save_single_matrix_type,
cooler_path=cooler_path,
bins_df=bins_df,
chunk_dirs=group_list,
chrom_sizes=chrom_sizes,
chrom_offset=chrom_offset,
matrix_type=matrix_type,
total_cells=total_cells)
futures[future] = matrix_type
for future in as_completed(futures):
matrix_type = futures[future]
print(f'Matrix {matrix_type} generated')
future.result()
return