import pandas as pd
import numpy as np
import re
import os
import math
from multiprocessing import Pool
## init
mySpecie='Homo_sapiens'
##change this dir to point to the updated csv
full_meta_dir="/cellar/users/btsui/Project/METAMAP/notebook/Parsing/sra_dump.csv"
inSrrDir='/nrnb/users/btsui/Data/all_seq/snp/'
tmp_dir='/nrnb/users/btsui/Data/all_seq/tmp/'
inAllFames=pd.Series(os.listdir(inSrrDir))
#os.system('rm '+tmp_dir+'*')
perFaStatS=inAllFames[inAllFames.str.contains('per_fa_record_stat.txt.gz')].values
TEST=False
if TEST:
toRunSrrs=perFaStatS[:10]
chunkSize=5
nThread=1
else:
toRunSrrs=perFaStatS
chunkSize=1000
nThread=64
## get the microbes FA
tmpDf=pd.read_csv(inSrrDir+'ERR1497972_per_fa_record_stat.txt.gz',sep='\t',header=None)
ignoreList=['1', '2', '3', '4', '5', '6', '7',
'8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18',
'19', '20', '21', '22', 'X', 'Y', 'MT', '*']
microbes_fa=tmpDf[0][~tmpDf[0].isin(ignoreList)].values
#inFname=toRunSrrs[0]
def parseFile(inFname):
"""
input: inAllFames
for each file, merge the microbe data file
"""
#The output is TAB-delimited with each line consisting of reference sequence name, sequence length, # mapped reads and # unmapped reads.
fDir=inSrrDir+inFname
tmpDf2=pd.read_csv(fDir,sep='\t',header=None)
tmpDf2.columns=['fa_name','','n_mapped_reads','']
tmpS=tmpDf2[~tmpDf2['fa_name'].isin(ignoreList)].set_index('fa_name')['n_mapped_reads']
tmpS.name=inFname
#tmpS.index.name=inFname
return tmpS
#tmpS=parseFile(inFname)
def mergeSrrsL(i):
tmpL=[]
failedSrrsL=[]
for srr in toRunSrrs[i:(i+chunkSize)]:
try:
tmpL.append(parseFile(srr))
except :
print 'failed: '+srr
failedSrrsL.append(srr)
tmpMergedDf=pd.DataFrame(tmpL).T
#print tmpMergedDf.shape
#tmpMergedDf=pd.concat([parseSrr(srr) for srr in toRunSrrs[i:(i+chunkSize)]])
reorderedDf=tmpMergedDf.sort_index()
if TEST:
print tmp_dir+str(i)+'.pickle.gz'
reorderedDf.to_pickle(tmp_dir+str(i)+'.pickle.gz',compression='gzip')
return failedSrrsL
Chunks=np.arange(0, len(toRunSrrs),chunkSize)
if TEST:
failed_srr_l=map(mergeSrrsL,Chunks.tolist())
else:
from multiprocessing import Pool
p=Pool(nThread)
### sweep for uncompleted chunks
failed_srr_l=p.map(mergeSrrsL,Chunks.tolist())
p.close()
#testDf3=pd.read_pickle('/nrnb/users/btsui/Data/all_seq/tmp/0.pickle.gz')
myL=[]
for fname in os.listdir(tmp_dir):
tmpDf10=pd.read_pickle(tmp_dir+fname).astype(np.float32)
myL.append(tmpDf10)
mergedDf=pd.concat(myL,axis=1)
mergedDf.columns=mergedDf.columns.str.replace('_per_fa_record_stat\.txt\.gz','')