Source code for schicluster.loop.merge_cell_to_group

import time
import h5py
import cooler
import pathlib
import numpy as np
import pandas as pd
from scipy.sparse import csr_matrix, load_npz, triu
from ..cool import write_coo, get_chrom_offsets
from concurrent.futures import ProcessPoolExecutor, as_completed

"""
Matrix names

"""


[docs] def merge_cells_for_single_chromosome(output_dir, output_prefix, merge_type='E'): """ Merge cell's E and T matrix to group matrices, sum only, not normalized by n_cells yet E: Matrix normalized by global diagonal backgrounds, calculated from loop_bkg E2: Sum of square of E, used to calculate global t statistics T: Matrix normalized by global diagonal and local backgrounds, then minus E (T is the delta matrix), calculated from loop_bkg T2: Sum of square of T, used to calculate local t statistics """ start_time = time.time() # get cell paths cell_paths = [str(p) for p in pathlib.Path(output_dir).glob(f'*.{merge_type}.npz')] n_cells = len(cell_paths) # get n_dims matrix = load_npz(cell_paths[0]) n_dims = matrix.shape[0] # initialize e_sum = csr_matrix((n_dims, n_dims), dtype=np.float32) e2_sum = csr_matrix((n_dims, n_dims), dtype=np.float32) for i, chrom_path in enumerate(cell_paths): matrix = load_npz(chrom_path) e_sum += matrix e2_sum += matrix.multiply(matrix) write_coo(f'{output_prefix}.{merge_type}.hdf', e_sum, chunk_size=None) write_coo(f'{output_prefix}.{merge_type}2.hdf', e2_sum, chunk_size=None) print(f'Merge {n_cells} cells took {time.time() - start_time:.0f} seconds') return
[docs] def read_single_cool_chrom(cool_path, chrom, chrom2=None): # Used in chrom_sum_iterator, return the sum according to group_n_cells # Also used in merge_raw_matrix and merge_group # Output chrom matrix cool = cooler.Cooler(str(cool_path)) selector = cool.matrix(balance=False, sparse=True) if chrom2 is None: matrix = triu(selector.fetch(chrom)) else: if chrom == chrom2: matrix = triu(selector.fetch(chrom, chrom2)) else: matrix = selector.fetch(chrom, chrom2) with h5py.File(cool_path, 'r') as f: if 'group_n_cells' in f.attrs: matrix.data *= f.attrs['group_n_cells'] return matrix
[docs] def chrom_sum_iterator(input_cool_list, chrom_sizes, chrom_offset, total_cells): # Used in save_single_matrix_type, return the average over cells # total_cells need to be provided # Output chrom df for chrom in chrom_sizes.keys(): cool_path = input_cool_list[0] matrix = read_single_cool_chrom(cool_path, chrom) for cool_path in input_cool_list[1:]: matrix += read_single_cool_chrom(cool_path, chrom) matrix = matrix.tocoo() pixel_df = pd.DataFrame({ 'bin1_id': matrix.row, 'bin2_id': matrix.col, 'count': matrix.data }) pixel_df.iloc[:, :2] += chrom_offset[chrom] pixel_df.iloc[:, -1] /= total_cells yield pixel_df
[docs] def save_single_matrix_type(input_cool_list, output_cool, bins_df, chrom_sizes, chrom_offset, total_cells): # Used by merge_group_chunks_to_group_cools and merge_cool # total_cells need to be provided # Output cool chrom_iter = chrom_sum_iterator(input_cool_list, chrom_sizes, chrom_offset, total_cells) cooler.create_cooler(cool_uri=output_cool, bins=bins_df, pixels=chrom_iter, ordered=True, dtypes={'count': np.float32}) with h5py.File(output_cool, 'a') as f: f.attrs['group_n_cells'] = total_cells return
[docs] def merge_cool(input_cool_tsv_file, output_cool): # Input could be cool files of single cell or average over cells # Output is average over cells # total_cell is counted over cools according to group_n_cells, otherwise 1 input_cool_list = pd.read_csv(input_cool_tsv_file).squeeze().tolist() input_cool_list = [str(pathlib.Path(cool).absolute()) for cool in input_cool_list] cool = cooler.Cooler(input_cool_list[0]) bins_df = cool.bins()[["chrom", "start", "end"]][:] chrom_sizes = cool.chromsizes[:] chrom_offset = get_chrom_offsets(bins_df) total_cells = 0 for cool_path in input_cool_list: with h5py.File(cool_path, 'r') as f: if 'group_n_cells' in f.attrs: total_cells += f.attrs['group_n_cells'] else: total_cells += 1 save_single_matrix_type(input_cool_list, output_cool, bins_df, chrom_sizes, chrom_offset, total_cells) return
[docs] def merge_group_chunks_to_group_cools(chrom_size_path, resolution, group, output_dir, matrix_types=('E', 'E2', 'T', 'T2', 'Q', 'Q2')): # Input is sum over cells per chunk # Output is average over cells of all chunks # total_cell is counted over cell_table.csv per chunk # determine chunk dirs for the group: output_dir = pathlib.Path(output_dir).absolute() group_dir = output_dir / group group_dir.mkdir(exist_ok=True) chunk_dirs = list(output_dir.glob(f'{group}_chunk*')) # count total cells total_cells = 0 for chunk_dir in chunk_dirs: total_cells += pd.read_csv(chunk_dir / 'cell_table.csv', index_col=0, header=None).shape[0] chrom_sizes = cooler.read_chromsizes(chrom_size_path, all_names=True) bins_df = cooler.binnify(chrom_sizes, resolution) chrom_offset = get_chrom_offsets(bins_df) with ProcessPoolExecutor(6) as exe: futures = {} for matrix_type in matrix_types: output_cool = str(group_dir / f'{group}.{matrix_type}.cool') input_cool_list = [chunk_dir / f'{matrix_type}.cool' for chunk_dir in chunk_dirs] future = exe.submit(save_single_matrix_type, input_cool_list=input_cool_list, output_cool=output_cool, bins_df=bins_df, chrom_sizes=chrom_sizes, chrom_offset=chrom_offset, total_cells=total_cells) futures[future] = matrix_type for future in as_completed(futures): matrix_type = futures[future] print(f'Matrix {matrix_type} generated') future.result() return
[docs] def merge_group_to_bigger_group_cools(chrom_size_path, resolution, group, output_dir, group_list, shuffle, matrix_types=('E', 'E2', 'T', 'T2', 'Q', 'Q2')): """ Sum all the group average cool files, and finally divide the total number of cells to get a group cell number normalized cool file in the end. """ # determine chunk dirs for the group: output_dir = pathlib.Path(output_dir).absolute() group_dir = output_dir / group group_dir.mkdir(exist_ok=True, parents=True) group_list = [pathlib.Path(xx) for xx in group_list] # count total cells total_cells = 0 for chunk_dir in group_list: total_cells += pd.read_csv(chunk_dir / 'cell_table.csv', index_col=0).shape[0] if shuffle: group_list = [xx / 'shuffle/' for xx in group_list] chrom_sizes = cooler.read_chromsizes(chrom_size_path, all_names=True) bins_df = cooler.binnify(chrom_sizes, resolution) chrom_offset = get_chrom_offsets(bins_df) with ProcessPoolExecutor(6) as exe: futures = {} for matrix_type in matrix_types: output_cool = str(group_dir / f'{group}.{matrix_type}.cool') input_cool_list = [list(group_dir.glob(f'*/*.{matrix_type}.cool'))[0] for group_dir in group_list] future = exe.submit(save_single_matrix_type, input_cool_list=input_cool_list, output_cool=output_cool, bins_df=bins_df, chrom_sizes=chrom_sizes, chrom_offset=chrom_offset, total_cells=total_cells) futures[future] = matrix_type for future in as_completed(futures): matrix_type = futures[future] print(f'Matrix {matrix_type} generated') future.result() return