#!ls -lah /cellar/users/btsui/all_seq_snp/Homo_sapiens_all_merged_snp.pickle.gz
#!ls -lath /cellar/users/btsui/all_seq_snp/ | head
import pandas as pd
import numpy as np
import re
import os
import math
from multiprocessing import Pool
from tqdm import tqdm
## init
#/nrnb/users/btsui/Data/tcga_extracted_lgg_snp/
mySpecie='Homo_sapiens'
#mySpecie='Homo_sapiens'
outMergedDir='/cellar/users/btsui/all_seq_snp/'+mySpecie+'_all_merged_snp.pickle'
#outMergedDir='/cellar/users/btsui/all_seq_snp/'+mySpecie+'_all_merged_snp.TCGA.pickle'
##change this dir to point to the updated csv
#full_meta_dir="/cellar/users/btsui/Project/METAMAP/notebook/Parsing/sra_dump.csv"
inSrrDir='/nrnb/users/btsui/Data/all_seq/snp/'
tmp_dir='/nrnb/users/btsui/Data/all_seq/tmp_chunks/'
#!ls -alh /nrnb/users/btsui/Data/tcga_extracted_lgg_snp/ |wc -l
tmpBedDf=pd.read_csv('/data/cellardata/users/btsui/dbsnp/snp_beds/'+mySpecie+'.bed',header=None,sep='\t')
unique_chroms=tmpBedDf[0].astype(np.str).unique()
### start merging one by one
if os.path.exists(tmp_dir):
os.system('rm -r '+tmp_dir)
os.system('mkdir -p '+tmp_dir)
/cellar/users/btsui/anaconda3/lib/python3.6/site-packages/IPython/core/interactiveshell.py:2785: DtypeWarning: Columns (0) have mixed types. Specify dtype option on import or set low_memory=False. interactivity=interactivity, compiler=compiler, result=result)
0
snp_fname_postfix='.txt.snp.gz'
%%time
os.chdir(tmp_dir)
#identify non empty files
os.system('ls -la '+inSrrDir+' > ls_out.txt ')
ls_df=pd.read_csv('ls_out.txt',sep='\s+',header=None,names=np.arange(9)).iloc[1:]
#ls_df=
size_S=ls_df[4]
m4=size_S.astype(np.int)>1000
m5=ls_df[8].str.contains(snp_fname_postfix+'$')
non_empty_files=ls_df[m4&m5][8].str.split('/').str[-1].str.split('.').str[0].values
CPU times: user 5.35 s, sys: 312 ms, total: 5.66 s Wall time: 3min 8s
print ('# of files merging',len(non_empty_files))
# of files merging 253056
from pandas.api.types import CategoricalDtype
chrom_type = CategoricalDtype(categories=unique_chroms,ordered=True)
Run_db_type=CategoricalDtype(categories=['DRR','ERR','SRR'],ordered=True)
myBases=['A','C','G','T']
myBases_type=CategoricalDtype(categories=myBases,ordered=True)
def parseSrr(inSrr):
#print inSrr
fname=inSrrDir+inSrr+snp_fname_postfix
tmpDf_all=pd.read_csv(fname,sep='\s+',low_memory=False,
header=None,names=np.arange(50),index_col=None,error_bad_lines=False)
myCols=['Chr','Pos','Ref','rd_all','','A','C','G','T','N']
tmpDf=tmpDf_all.iloc[:,:len(myCols)]
tmpDf.columns=myCols
tmpDf2=tmpDf.set_index(['Chr','Pos'])
myL=[]
for base in myBases:
splitL=tmpDf2[base].str.split(':',expand=True)
### extract the read count and base quality
tmpDf5=splitL[[1,3]].astype(np.float)
tmpDf5.columns=['ReadDepth','AverageBaseQuality']
myL.append(tmpDf5)
tmpDf6=pd.concat(myL,keys=myBases,axis=0,names=['base'])
tmpDf6.columns.name='features'
mergedDf=tmpDf6.astype(np.uint16)
non_zero_df=mergedDf[mergedDf['ReadDepth']>0]
tmpDf7=non_zero_df.reset_index()
Run_digits=re.search('[DES]RR(\d+)', inSrr)
Run_Db=re.search('([DES]RR)\d+', inSrr)
tmpDf7['Run_digits']=Run_digits.group(1)
tmpDf7['Run_db']=Run_Db.group(1)
###convert the datatypes
tmpDf7['Pos']=tmpDf7['Pos'].astype(np.uint32)
tmpDf7['Run_digits']=tmpDf7['Run_digits'].astype(np.uint64)
tmpDf7['Chr']=tmpDf7['Chr'].astype(np.str).astype(dtype=chrom_type)
tmpDf7['Run_db']=tmpDf7['Run_db'].astype(np.str).astype(dtype=Run_db_type)
tmpDf7['base']=tmpDf7['base'].astype(dtype=myBases_type)
myG=['Run_db','Run_digits',u'Chr', u'Pos',u'base']
tmpDf7=tmpDf7.drop_duplicates(myG)
srr_pickle_df=tmpDf7.set_index(myG).sort_index()
return srr_pickle_df
"""
given: srr id
return: the merged file
"""
### identify files to be merged
fnames=pd.Series(os.listdir(inSrrDir))
snpFnames=fnames[fnames.str.contains(snp_fname_postfix+'$')]
srrsWithData=snpFnames.str.split('.').str[0]
m3=srrsWithData.isin(non_empty_files)
toMergeSrrs=srrsWithData[m3].values
print ('n files to merge: ',len(toMergeSrrs) )
TEST=False
if TEST:
toRunSrrs=toMergeSrrs[:10]
chunkSize=5
nThread=1
else:
toRunSrrs=toMergeSrrs
chunkSize=100
nThread=64
n files to merge: 253056
import tqdm
#optional: free up the memory
#if not TEST:
# del mySpecieDf, full_meta_df
def mergeSrrsL(i):
tmpL=[]
failedSrrsL=[]
for srr in toRunSrrs[i:(i+chunkSize)]:
try:
tmpL.append(parseSrr(srr))
except :
print ('failed: '+srr)
failedSrrsL.append(srr)
tmpMergedDf=pd.concat(tmpL)
#tmpMergedDf=pd.concat([parseSrr(srr) for srr in toRunSrrs[i:(i+chunkSize)]])
reorderedDf=tmpMergedDf.sort_index()
reorderedDf.to_pickle(tmp_dir+str(i)+'.pickle.gz',compression='gzip')
return failedSrrsL
Chunks=np.arange(0, len(toRunSrrs),chunkSize)
if TEST:
failed_srr_l=list(map(mergeSrrsL,Chunks.tolist()))
else:
from multiprocessing import Pool
p=Pool(nThread)
### sweep for uncompleted chunks
#r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30))
Iter=tqdm.tqdm(p.imap(mergeSrrsL,Chunks.tolist())
,total=len(Chunks))
failed_srr_l=(list(Iter))
p.close()
0%| | 0/2531 [00:00<?, ?it/s]
failed: SRR527658 failed: SRR527586
0%| | 1/2531 [11:43<494:10:29, 703.17s/it]
"""from tqdm import tqdm
import os
#2531
len(os.listdir(tmp_dir))
myL=[]
for fname in tqdm(os.listdir(tmp_dir)):
if '.pickle.gz' in fname:
myL.append(pd.read_pickle(tmp_dir+fname))
#fail at this line all the time.
mergedDf=pd.concat(myL,axis=0,copy=False)
mergedDf.to_pickle(outMergedDir)"""
#!ls -lah /cellar/users/btsui/all_seq_snp/Homo_sapiens_all_merged_snp.TCGA.pickle
#!ls /nrnb/users/btsui/Data/all_seq/tmp_chunks/ |wc -l
#mergedDf
#https://stackoverflow.com/questions/37928794/which-is-faster-for-load-pickle-or-hdf5-in-python/37929007