import subprocess
import numpy as np
import pandas as pd
from scipy.sparse import diags, csr_matrix
import cooler
import pathlib
from concurrent.futures import ProcessPoolExecutor, as_completed
import xarray as xr
[docs]
def get_cpg_profile(fasta_path, hdf_output_path, cell_url=None, chrom_size_path=None, resolution=100000):
temp_bed_path = f'{hdf_output_path}_bins.bed'
temp_cpg_path = f'{hdf_output_path}_cpg_profile.txt'
# count CpG per bin
if cell_url is not None:
cool = cooler.Cooler(cell_url)
bins = cool.bins()[:]
elif chrom_size_path is not None:
chrom_sizes = pd.read_csv(chrom_size_path, sep='\t', index_col=0, header=None).squeeze(axis=1)
bins = cooler.binnify(chrom_sizes, resolution)
else:
print('ERROR : Must provide either cell_url or chrom_size_path')
bins.to_csv(temp_bed_path, index=None, header=None, sep='\t')
subprocess.run(
f'bedtools nuc -fi {fasta_path} -bed {temp_bed_path} -pattern CG -C > {temp_cpg_path}',
shell=True)
# get CpG ratio for each bin
result = pd.read_csv(temp_cpg_path, sep='\t')
result['ratio'] = (result['13_user_patt_count'] /
(result['12_seq_len'] - result['10_num_N'])).fillna(0)
# chrom, start, end, CpG ratio
result = result.iloc[:, [0, 1, 2, -1]]
result.columns = ['chrom', 'start', 'end', 'cpg_ratio']
result.to_hdf(hdf_output_path, key='data')
# cleanup
subprocess.run(f'rm -f {temp_bed_path} {temp_cpg_path}', shell=True)
return
[docs]
def compartment_strength(matrix, comp, cpg_ratio):
bin_filter = cpg_ratio > 0
# for calculating compartment strength
tmp = comp[bin_filter]
a_pos = (tmp > np.percentile(tmp, 80))
b_pos = (tmp < np.percentile(tmp, 20))
E = matrix.tocoo()
decay = np.array([E.diagonal(i).mean() for i in range(E.shape[0])])
E.data = E.data / decay[np.abs(E.col - E.row)]
E = E.tocsr()[np.ix_(bin_filter, bin_filter)]
aa = E[np.ix_(a_pos, a_pos)].sum()
bb = E[np.ix_(b_pos, b_pos)].sum()
ab = E[np.ix_(a_pos, b_pos)].sum()
scores = np.array([aa, bb, ab])
return scores
[docs]
def single_chrom_compartment(matrix, cpg_ratio, calc_strength=False):
matrix = matrix - diags(matrix.diagonal()) # remove diag
# matrix = matrix + matrix.T # make full matrix
matrix = matrix + diags((matrix.sum(axis=0).A.ravel() == 0).astype(int))
matrix.data = matrix.data / np.repeat(matrix.sum(axis=0).A1, matrix.getnnz(axis=1))
# weighted average of cpg density
comp = matrix.dot(cpg_ratio.values[:, None])[:, 0]
if calc_strength:
scores = compartment_strength(matrix, comp, cpg_ratio)
else:
scores = None
return comp, scores
[docs]
def single_cell_compartment(cell_url, cpg_profile, calc_strength, output_prefix, mode,
resolution, chrom_sizes, chrom1, pos1, chrom2, pos2):
if mode=='cool':
cool = cooler.Cooler(cell_url)
chroms = pd.Index(cool.chromnames).intersection(cpg_profile['chrom'])
elif mode=='tsv':
chroms = chrom_sizes.index
data = pd.read_csv(cell_url, sep='\t', index_col=None, header=None, comment='#')
data = data.loc[(data[chrom1]==data[chrom2]) & data[chrom1].isin(chroms)]
all_comp = []
scores = np.array([0, 0, 0])
for chrom in chroms:
cpg_ratio = cpg_profile.loc[cpg_profile['chrom'] == chrom, 'cpg_ratio']
if mode=='cool':
matrix = cool.matrix(balance=False, sparse=True).fetch(chrom)
else:
n_bins = (chrom_sizes.loc[chrom] // resolution) + 1
chrfilter = (data[chrom1]==chrom)
if chrfilter.sum()==0:
matrix = csr_matrix((n_bins, n_bins))
else:
matrix = data.loc[chrfilter]
matrix[[pos1, pos2]] = (matrix[[pos1, pos2]] - 1) // resolution
matrix = matrix.groupby(by=[pos1, pos2])[chrom1].count().reset_index()
matrix = csr_matrix((matrix[chrom1].astype(np.int32), (matrix[pos1], matrix[pos2])), (n_bins, n_bins))
matrix = matrix + matrix.T
if cpg_ratio.shape[0]!=matrix.shape[0]:
print(f'ERROR : cpg_ratio and chrom_size have different shapes at {chrom}')
return(0)
comp, scores = single_chrom_compartment(matrix.tocsr(),
cpg_ratio,
calc_strength=calc_strength)
if calc_strength:
scores += scores
all_comp.append(comp)
all_comp = np.concatenate(all_comp)
if calc_strength:
np.savez(f'{output_prefix}.comp.npz', all_comp, scores)
else:
np.savez(f'{output_prefix}.comp.npz', all_comp)
return
[docs]
def aggregate_compartment(cell_table, temp_dir, bins, output_path, calc_strength):
total_comp = []
total_scores = []
for cell_id, cell_url in cell_table.items():
comp_path = f'{temp_dir}/{cell_id}.comp.npz'
npz_data = np.load(comp_path)
total_comp.append(npz_data['arr_0'])
if calc_strength:
total_scores.append(npz_data['arr_1'])
total_comp = np.vstack(total_comp)
total_comp = pd.DataFrame(total_comp,
index=cell_table.index,
columns=bins.index)
total_comp.index.name = 'cell'
total_comp.columns.name = 'bin'
total_comp = xr.Dataset({'compartment': total_comp})
total_comp.coords['bin_chrom'] = bins['chrom']
total_comp.coords['bin_start'] = bins['start']
total_comp.coords['bin_end'] = bins['end']
if calc_strength:
total_scores = np.vstack(total_scores)
total_scores = pd.DataFrame(total_scores,
index=cell_table.index,
columns=['AA', 'BB', 'AB'])
total_scores.index.name = 'cell'
total_scores.columns.name = 'sum'
total_comp['summary'] = xr.DataArray(total_scores)
total_comp.to_netcdf(output_path)
return
[docs]
def multiple_cell_compartment(cell_table_path,
output_prefix,
cpg_profile_path,
cpu=10,
calc_strength=False,
mode='cool',
chrom_size_path=None,
resolution=100000,
chrom1=1,
pos1=2,
chrom2=5,
pos2=6):
cell_table = pd.read_csv(cell_table_path,
sep='\t',
index_col=0,
header=None).squeeze(axis=1)
print(cell_table.shape[0], 'cells to calculate.')
temp_dir = pathlib.Path(f'{output_prefix}_compartment_temp')
temp_dir.mkdir(exist_ok=True)
cpg_profile = pd.read_hdf(cpg_profile_path)
if mode=='tsv':
if chrom_size_path is not None:
chrom_sizes = pd.read_csv(chrom_size_path, sep='\t', index_col=0, header=None).squeeze(axis=1)
bins = cooler.binnify(chrom_sizes, resolution)
else:
print('ERROR : need to provide chrom_size_path for tsv mode')
return
elif mode=='cool':
chrom_sizes = None
cell_cool = cooler.Cooler(cell_table.iloc[0])
bins = cell_cool.bins()[:]
bins = bins.loc[bins['chrom'].isin(cpg_profile['chrom'])]
else:
print('ERROR : mode need to be cool or tsv')
return
# calculate individual cells
with ProcessPoolExecutor(cpu) as exe:
future_dict = {}
for cell_id, cell_url in cell_table.items():
cell_prefix = f'{temp_dir}/{cell_id}'
if pathlib.Path(f'{cell_prefix}.comp.npz').exists():
continue
future = exe.submit(single_cell_compartment,
cell_url=cell_url,
cpg_profile=cpg_profile,
output_prefix=cell_prefix,
calc_strength=calc_strength,
mode=mode,
chrom_sizes=chrom_sizes,
resolution=resolution,
chrom1=chrom1,
pos1=pos1,
chrom2=chrom2,
pos2=pos2)
future_dict[future] = cell_id
for future in as_completed(future_dict):
cell_id = future_dict[future]
print(f'{cell_id} finished.')
future.result()
# aggregate boundary
aggregate_compartment(cell_table=cell_table,
temp_dir=temp_dir,
bins=bins,
output_path=f'{output_prefix}.compartment.nc',
calc_strength=calc_strength)
# cleanup
subprocess.run(f'rm -rf {temp_dir}', shell=True)
return