import pathlib
import subprocess
import pandas as pd
import inspect
import schicluster
from .loop_calling import filter_loops, call_loops
from .merge_raw_matrix import make_raw_matrix_cell_table
from .shuffle_fdr import compute_t, permute_fdr, update_fdr_qval
from .merge_cell_to_group import merge_group_to_bigger_group_cools
[docs]
PACKAGE_DIR = pathlib.Path(schicluster.__path__[0])
with open(PACKAGE_DIR / 'loop/generate_matrix_chunk.Snakefile') as tmp:
[docs]
GENERATE_MATRIX_CHUNK_TEMPLATE = tmp.read()
with open(PACKAGE_DIR / 'loop/generate_matrix_group.Snakefile') as tmp:
[docs]
GENERATE_MATRIX_SCOOL_TEMPLATE = tmp.read()
[docs]
def prepare_dir(output_dir, chunk_df, dist, cap, pad, gap, resolution,
min_cutoff, chrom_size_path, keep_cell_matrix, log_e_str, shuffle):
output_dir.mkdir(exist_ok=True)
cell_table_path = str((output_dir / 'cell_table.csv').absolute())
chunk_df[['cell_url']].to_csv(cell_table_path, header=False, index=True)
if shuffle:
shuffle_str = '--shuffle'
else:
shuffle_str = ''
parameters = dict(dist=dist,
cap=cap,
pad=pad,
gap=gap,
resolution=resolution,
min_cutoff=min_cutoff,
cell_table_path=f'"{cell_table_path}"',
chrom_size_path=f'"{chrom_size_path}"',
keep_cell_matrix=keep_cell_matrix,
log_e_str=f'"{log_e_str}"',
shuffle=shuffle,
shuffle_str=f'"{shuffle_str}"')
parameters_str = '\n'.join(f'{k} = {v}'
for k, v in parameters.items())
with open(output_dir / 'Snakefile', 'w') as f:
f.write(parameters_str + GENERATE_MATRIX_CHUNK_TEMPLATE)
return
[docs]
def prepare_loop_snakemake(cell_table_path,
output_dir,
chrom_size_path,
chunk_size=100,
dist=5050000,
cap=5,
pad=5,
gap=2,
resolution=10000,
min_cutoff=1e-6,
keep_cell_matrix=False,
cpu_per_job=10,
log_e=True,
shuffle=False,
raw_resolution_str=None,
downsample_shuffle=None):
_cell_table_path = str(cell_table_path)
sep = '\t' if _cell_table_path.endswith('tsv') else ','
cell_table = pd.read_csv(cell_table_path, index_col=0, sep=sep, header=None,
names=['cell_id', 'cell_url', 'cell_group'])
if shuffle and (downsample_shuffle is not None):
# for shuffle background, downsample to downsample_shuffle to save time
if cell_table.shape[0] > downsample_shuffle:
cell_table = cell_table.sample(downsample_shuffle)
output_dir = pathlib.Path(output_dir).absolute()
output_dir.mkdir(exist_ok=True)
# a single dir for raw matrix
if raw_resolution_str == '10K':
raw_resolution = 10000
elif raw_resolution_str is None:
raw_resolution = None
else:
raise NotImplementedError
if (raw_resolution is not None) and (not shuffle):
cell_table_raw = make_raw_matrix_cell_table(cell_table, raw_resolution_str)
raw_dir = output_dir / 'raw'
raw_dir.mkdir(exist_ok=True)
raw_table_path = raw_dir / 'cell_table.tsv'
cell_table_raw.to_csv(raw_table_path, sep='\t', header=None)
merge_raw_cmd = f'hic-internal merge-raw-scool ' \
f'--chrom_size_path {chrom_size_path} ' \
f'--resolution {raw_resolution} ' \
f'--cell_table_path {raw_table_path} ' \
f'--output_dir {raw_dir} ' \
f'--cpu {cpu_per_job}'
else:
merge_raw_cmd = ''
if log_e:
log_e_str = '--log_e'
else:
log_e_str = ''
chunk_parameters = dict(
dist=dist,
cap=cap,
pad=pad,
gap=gap,
resolution=resolution,
min_cutoff=min_cutoff,
chrom_size_path=chrom_size_path,
keep_cell_matrix=keep_cell_matrix,
log_e_str=log_e_str,
shuffle=shuffle
)
total_chunk_dirs = []
group_chunks = {}
for group, group_df in cell_table.groupby('cell_group'):
group_chunks[group] = []
if group_df.shape[0] <= chunk_size:
this_dir = output_dir / f'{group}_chunk0'
prepare_dir(this_dir, group_df, **chunk_parameters)
total_chunk_dirs.append(this_dir)
group_chunks[group].append(this_dir)
else:
group_df['chunk'] = [i // chunk_size for i in range(group_df.shape[0])]
for chunk, chunk_df in group_df.groupby('chunk'):
this_dir = output_dir / f'{group}_chunk{chunk}'
prepare_dir(this_dir, chunk_df, **chunk_parameters)
total_chunk_dirs.append(this_dir)
group_chunks[group].append(this_dir)
with open(output_dir / 'snakemake_cmd_step1.txt', 'w') as f:
f.write(merge_raw_cmd + '\n')
for chunk_dir in total_chunk_dirs:
cmd = f'snakemake -d {chunk_dir} --snakefile {chunk_dir}/Snakefile -j {cpu_per_job} --scheduler greedy'
f.write(cmd + '\n')
# prepare the second step that merge cell chunks into groups
scool_parameters = dict(
output_dir=f'"{output_dir}"',
chrom_size_path=f'"{chrom_size_path}"',
resolution=resolution,
shuffle=shuffle
)
parameters_str = '\n'.join(f'{k} = {v}'
for k, v in scool_parameters.items())
with open(output_dir / 'Snakefile', 'w') as f:
f.write(parameters_str + GENERATE_MATRIX_SCOOL_TEMPLATE)
with open(output_dir / 'snakemake_cmd_step2.txt', 'w') as f:
cmd = f'snakemake -d {output_dir} --snakefile {output_dir}/Snakefile -j {cpu_per_job} --scheduler greedy'
f.write(cmd + '\n')
return
[docs]
def check_chunk_dir_finish(output_dir):
output_dir = pathlib.Path(output_dir)
flag_path = output_dir / 'chunk_finished'
if flag_path.exists():
return
path_not_finished = []
for chunk_dir in output_dir.glob('*_chunk*'):
if not (chunk_dir / 'finish').exists():
path_not_finished.append(str(chunk_dir))
if len(path_not_finished) > 0:
path_str = '\n'.join(path_not_finished)
raise ValueError('These chunk dirs have not finish successfully:\n' + path_str)
else:
with open(flag_path, 'w') as _:
pass
return
[docs]
def _run_snakemake(output_dir):
output_dir = pathlib.Path(output_dir).absolute()
step1 = f'{output_dir}/snakemake_cmd_step1.txt'
step2 = f'{output_dir}/snakemake_cmd_step2.txt'
subprocess.run(['sh', step1], check=True)
subprocess.run(['sh', step2], check=True)
[docs]
def call_loop(cell_table_path,
output_dir,
chrom_size_path,
shuffle=True,
chunk_size=200,
dist=5050000,
cap=5,
pad=5,
gap=2,
resolution=10000,
min_cutoff=1e-6,
keep_cell_matrix=False,
cpu_per_job=10,
log_e=True,
raw_resolution_str=None,
downsample_shuffle=None,
black_list_path=None,
fdr_pad=7,
fdr_min_dist=5,
fdr_max_dist=500,
fdr_thres=0.1,
dist_thres=20000,
size_thres=1,
cleanup=True):
if shuffle and (black_list_path is None):
raise ValueError('Please provide black_list_path when shuffle=True')
pathlib.Path(output_dir).mkdir(exist_ok=True)
# deal with cell table path, if the path has two col, add one group cal internally
_cell_table_path = str(cell_table_path)
sep = '\t' if _cell_table_path.endswith('tsv') else ','
cell_table = pd.read_csv(cell_table_path, index_col=0, sep=sep, header=None)
cell_table.index.name = 'cell_id'
if cell_table.shape[1] == 1:
cell_table.columns = ['cell_url']
cell_table['cell_group'] = 'group'
elif cell_table.shape[1] == 2:
cell_table.columns = ['cell_url', 'cell_group']
else:
raise ValueError(f'Expect cell_table_path to be '
f'two columns (cell_id, cell_url) or '
f'three columns (cell_id, cell_url, cell_group), '
f'got {cell_table.shape[1]} columns.')
cell_table_path = f'{output_dir}/cell_table.csv'
cell_table.to_csv(cell_table_path, header=False, index=True)
groups = cell_table['cell_group'].unique()
kwargs = locals()
shuffle = kwargs.pop('shuffle')
# prepare snakemake and execute
real_dir = None
shuffle_dir = None
_use_kwargs = {k: v
for k, v in kwargs.items()
if k in inspect.signature(prepare_loop_snakemake).parameters}
if shuffle:
output_dir = _use_kwargs.pop('output_dir')
real_dir = output_dir
shuffle_dir = f'{output_dir}/shuffle'
pathlib.Path(shuffle_dir).mkdir(exist_ok=True, parents=True)
prepare_loop_snakemake(shuffle=False, output_dir=real_dir, **_use_kwargs)
prepare_loop_snakemake(shuffle=True, output_dir=shuffle_dir, **_use_kwargs)
_run_snakemake(real_dir)
_run_snakemake(shuffle_dir)
else:
# not shuffle, just use normal loop pipeline
prepare_loop_snakemake(shuffle=False, **_use_kwargs)
output_dir = _use_kwargs.pop('output_dir')
_run_snakemake(output_dir)
# final call loop if shuffle
if shuffle:
for group in groups:
real_group_prefix = f'{real_dir}/{group}/{group}'
shuffle_group_prefix = f'{shuffle_dir}/{group}/{group}'
tot = compute_t(real_group_prefix)
_ = compute_t(shuffle_group_prefix, tot)
permute_fdr(chrom_size_path=chrom_size_path,
black_list_path=black_list_path,
shuffle_group_prefix=shuffle_group_prefix,
real_group_prefix=real_group_prefix,
res=resolution,
pad=fdr_pad,
min_dist=fdr_min_dist,
max_dist=fdr_max_dist)
total_loops = update_fdr_qval(chrom_size_path,
real_group_prefix,
shuffle_group_prefix,
res=resolution,
min_dist=fdr_min_dist,
max_dist=fdr_max_dist)
# redo filter loops because FDR changed
filter_loops(total_loops,
output_prefix=real_group_prefix,
fdr_thres=fdr_thres,
resolution=resolution,
dist_thres=dist_thres,
size_thres=size_thres)
if cleanup:
subprocess.run(f'rm -rf {output_dir}/shuffle/*/*.npz', shell=True)
subprocess.run(f'rm -rf {output_dir}/*/*.npz', shell=True)
with open(f'{output_dir}/Success', 'w') as f:
f.write('42')
return
[docs]
def merge_loop(group,
output_dir,
chrom_size_path,
shuffle=True,
chunk_size=200,
dist=5050000,
cap=5,
pad=5,
gap=2,
resolution=10000,
min_cutoff=1e-6,
keep_cell_matrix=False,
cpu_per_job=10,
log_e=True,
raw_resolution_str=None,
downsample_shuffle=None,
black_list_path=None,
fdr_pad=7,
fdr_min_dist=5,
fdr_max_dist=500,
fdr_thres=0.1,
dist_thres=20000,
size_thres=1,
cleanup=True):
group_list = pd.read_csv(f'{output_dir}/group_list.txt', header=None, index_col=None)[0].values
if len(group_list)==1:
tmp = group_list[0].split('/')[-1]
pathlib.Path(f'{output_dir}/{group}').mkdir(exist_ok=True, parents=True)
for xx in pathlib.Path(f'{group_list[0]}/{tmp}').iterdir():
subprocess.run(f'rsync -arv {group_list[0]}/{tmp}/{xx.name} {output_dir}/{group}/{xx.name.replace(tmp, group)}', shell=True)
pathlib.Path(f'{output_dir}/shuffle/{group}').mkdir(exist_ok=True, parents=True)
for xx in pathlib.Path(f'{group_list[0]}/shuffle/{tmp}').iterdir():
subprocess.run(f'rsync -arv {group_list[0]}/shuffle/{tmp}/{xx.name} {output_dir}/shuffle/{group}/{xx.name.replace(tmp, group)}', shell=True)
with open(f'{output_dir}/Success', 'w') as f:
f.write('42')
return
cell_table = pd.concat([pd.read_csv(f'{xx}/cell_table.csv', index_col=0, header=None,
names=['cell_id', 'cell_url', 'cell_group'])
for xx in group_list], axis=0)
cell_table['cell_group'] = group
cell_table.to_csv(f'{output_dir}/cell_table.csv', header=False, index=True)
kwargs = locals()
_merge_kwargs = {k: v for k, v in kwargs.items() if k in inspect.signature(merge_group_to_bigger_group_cools).parameters}
_merge_kwargs['shuffle'] = False
# print(_merge_kwargs)
merge_group_to_bigger_group_cools(**_merge_kwargs)
_loop_kwargs = {k: v for k, v in kwargs.items() if k in inspect.signature(call_loops).parameters}
# print(_loop_kwargs)
call_loops(group_prefix=f'{output_dir}/{group}/{group}', output_prefix=f'{output_dir}/{group}/{group}', **_loop_kwargs)
# prepare snakemake and execute
if shuffle:
real_dir = output_dir
shuffle_dir = f'{output_dir}/shuffle'
pathlib.Path(shuffle_dir).mkdir(exist_ok=True, parents=True)
if cell_table.shape[0]>downsample_shuffle:
_prep_kwargs = {k: v for k, v in kwargs.items() if k in inspect.signature(prepare_loop_snakemake).parameters}
_prep_kwargs['output_dir'] = shuffle_dir
prepare_loop_snakemake(cell_table_path=f'{output_dir}/cell_table.csv', **_prep_kwargs)
_run_snakemake(shuffle_dir)
else:
_merge_kwargs['shuffle'] = True
_merge_kwargs['output_dir'] = shuffle_dir
merge_group_to_bigger_group_cools(**_merge_kwargs)
real_group_prefix = f'{real_dir}/{group}/{group}'
shuffle_group_prefix = f'{shuffle_dir}/{group}/{group}'
tot = compute_t(real_group_prefix)
_ = compute_t(shuffle_group_prefix, tot)
permute_fdr(chrom_size_path=chrom_size_path,
black_list_path=black_list_path,
shuffle_group_prefix=shuffle_group_prefix,
real_group_prefix=real_group_prefix,
res=resolution,
pad=fdr_pad,
min_dist=fdr_min_dist,
max_dist=fdr_max_dist)
total_loops = update_fdr_qval(chrom_size_path,
real_group_prefix,
shuffle_group_prefix,
res=resolution,
min_dist=fdr_min_dist,
max_dist=fdr_max_dist)
# redo filter loops because FDR changed
filter_loops(total_loops,
output_prefix=real_group_prefix,
fdr_thres=fdr_thres,
resolution=resolution,
dist_thres=dist_thres,
size_thres=size_thres)
if cleanup:
subprocess.run(f'rm -rf {output_dir}/*/*.npz', shell=True)
subprocess.run(f'rm -rf {output_dir}/shuffle/*/*.npz', shell=True)
with open(f'{output_dir}/Success', 'w') as f:
f.write('42')