Source code for schicluster.impute.merge_cell_to_group
import time
import numpy as np
from scipy.sparse import csr_matrix, triu
import cooler
from ..cool.utilities import write_coo
import pandas as pd
import logging
"""
Matrix names
Q: The imputed, before normalization matrix.
"""
[docs]
def read_chrom(cell_url, chrom):
cool = cooler.Cooler(cell_url)
matrix = triu(cool.matrix(balance=False, sparse=True).fetch(chrom))
return matrix
[docs]
def merge_cells_for_single_chromosome(cell_urls_path,
chrom,
output_prefix,
square=False):
cell_urls = pd.read_csv(cell_urls_path, index_col=0, header=None)[1].tolist()
# cell_urls = cell_table['cell_url']
n_cells = len(cell_urls)
# get n_dims
matrix = read_chrom(cell_urls[0], chrom)
n_dims = matrix.shape[0]
start_time = time.time()
print('Merging Q (imputed, before normalization) matrix.')
# initialize
q_sum = csr_matrix((n_dims, n_dims), dtype=np.float32)
q2_sum = csr_matrix((n_dims, n_dims), dtype=np.float32)
for i, path in enumerate(cell_urls):
data = read_chrom(path, chrom)
q_sum += data
if square:
q2_sum += data.multiply(data)
# we do not normalize by total cell numbers here, instead, normalize it in merge_group_chunks_to_group_cools
# NO matrix_sum.data /= n_cells
write_coo(f'{output_prefix}.Q.hdf', q_sum, chunk_size=None)
if square:
write_coo(f'{output_prefix}.Q2.hdf', q2_sum, chunk_size=None)
logging.debug(f'Merge {n_cells} cells took {time.time() - start_time:.0f} seconds')
return