import logging
import pathlib
import cooler
import numpy as np
import pandas as pd
import h5py
from scipy.sparse import csr_matrix , load_npz
[docs]
def get_chrom_offsets ( bins_df ):
chrom_offset = { chrom : bins_df [ bins_df [ 'chrom' ] == chrom ] . index [ 0 ]
for chrom in bins_df [ 'chrom' ] . cat . categories }
return chrom_offset
[docs]
def write_coo ( path , matrix , chunk_size = 5000000 ):
"""
Write chromosome contacts as chunked pixel df (cooler input)
"""
matrix = matrix . tocoo ( copy = False )
df = pd . DataFrame ({ 'bin1_id' : matrix . row , 'bin2_id' : matrix . col , 'count' : matrix . data })
with pd . HDFStore ( path , mode = 'w' , complib = 'zlib' , complevel = 3 ) as hdf :
if chunk_size is None :
# no chunk
hdf [ 'c0' ] = df
else :
for i , chunk_start in enumerate ( range ( 0 , df . shape [ 0 ], chunk_size )):
hdf [ f 'c { i } ' ] = df . iloc [ chunk_start : chunk_start + chunk_size ]
return
[docs]
def chrom_iterator ( input_dir , chrom_order , chrom_offset , chrom_wildcard = ' {chrom} .hdf' , csr = False ):
for chrom in chrom_order :
logging . debug ( chrom )
chrom_file = chrom_wildcard . format ( chrom = chrom )
output_path = f ' { input_dir } / { chrom_file } '
if not pathlib . Path ( output_path ) . exists ():
continue
if ( not csr ) and ( chrom_wildcard [ - 3 :] == 'hdf' ):
with pd . HDFStore ( output_path , 'r' ) as hdf :
logging . debug ( chrom )
keys = { int ( i [ 2 :]): i for i in hdf . keys ()}
for i in sorted ( keys . keys ()):
key = keys [ i ]
chunk = hdf [ key ]
chunk . iloc [:, : 2 ] += chrom_offset [ chrom ]
yield chunk
else :
chunk_size = 5000000
if ( chrom_wildcard [ - 3 :] == 'npz' ):
Q = load_npz ( output_path ) . tocoo ()
elif csr and ( chrom_wildcard [ - 3 :] == 'hdf' ):
f = h5py . File ( output_path , 'r' )
g = f [ 'Matrix' ]
Q = csr_matrix (( g [ 'data' ][()], g [ 'indices' ][()], g [ 'indptr' ][()]), g . attrs [ 'shape' ]) . tocoo ()
df = pd . DataFrame ({ 'bin1_id' : Q . row , 'bin2_id' : Q . col , 'count' : Q . data })
df = df [ df [ 'bin1_id' ] <= df [ 'bin2_id' ]]
for i , chunk_start in enumerate ( range ( 0 , df . shape [ 0 ], chunk_size )):
chunk = df . iloc [ chunk_start : chunk_start + chunk_size ]
chunk . iloc [:, : 2 ] += chrom_offset [ chrom ]
yield chunk
if csr and ( chrom_wildcard [ - 3 :] == 'hdf' ):
f . close ()
[docs]
def aggregate_chromosomes ( chrom_size_path ,
resolution ,
input_dir ,
output_path ,
chrom_wildcard = ' {chrom} .hdf' ,
csr = False ):
chrom_sizes = cooler . read_chromsizes ( chrom_size_path , all_names = True )
bins_df = cooler . binnify ( chrom_sizes , resolution )
chrom_offset = get_chrom_offsets ( bins_df )
cooler . create_cooler ( cool_uri = output_path ,
bins = bins_df ,
pixels = chrom_iterator ( input_dir = input_dir ,
chrom_order = bins_df [ 'chrom' ] . unique (),
chrom_offset = chrom_offset ,
chrom_wildcard = chrom_wildcard ,
csr = csr ),
ordered = True ,
dtypes = { 'count' : np . float32 })
return
[docs]
def cell_chunk ( cell_url , chrom_sizes , chunk = 50000000 ):
cell_cool = cooler . Cooler ( cell_url )
chunk_df = cooler . binnify ( chrom_sizes , chunk )
for _ , row in chunk_df . iterrows ():
chrom , start , end = row
region = f ' { chrom } : { start } - { end } '
# this fetch selected a rectangle region, row is region, col is whole chrom
data = cell_cool . matrix ( balance = False , as_pixels = True ) . fetch ( region , chrom )
yield data
[docs]
def aggregate_cells ( output_path , cell_dir , chrom_size_path , resolution ):
chrom_sizes = cooler . read_chromsizes ( chrom_size_path , all_names = True )
bins_df = cooler . binnify ( chrom_sizes , resolution )
cell_pixel_dict = {
cell_url . name . split ( '.' )[ 0 ]: cell_chunk ( str ( cell_url ), chrom_sizes = chrom_sizes )
for cell_url in pathlib . Path ( cell_dir ) . glob ( '*cool' )
}
cooler . create_scool ( output_path ,
bins = bins_df ,
cell_name_pixels_dict = cell_pixel_dict ,
ordered = True ,
mode = 'a' )
return
Copy to clipboard