#!pwd
import os
os.chdir('/cellar/users/btsui/Project/METAMAP/notebook/RapMapTest/Pipelines/RNAseq/merge')
import sys
sys.path+=['../']
import pandas as pd
import numpy as np
from tqdm import tqdm
from multiprocessing import Pool
import param
#from IPython.utils import io
import gc
sra_dump_df=pd.read_csv(param.full_meta_dir)
/cellar/users/btsui/anaconda3/lib/python3.6/site-packages/IPython/core/interactiveshell.py:2785: DtypeWarning: Columns (5,6,25,26) have mixed types. Specify dtype option on import or set low_memory=False. interactivity=interactivity, compiler=compiler, result=result)
allProcessedFnames=pd.Series(os.listdir(param.count_out_dir),dtype=np.str)
postfix_m=allProcessedFnames.str.contains('.abundance.tsv.gz$')
processedRnaseqSrr=allProcessedFnames[postfix_m].str.split('.').str[0].unique()
print ('n SRR: ', len(processedRnaseqSrr))
sra_dump_df['Skymap_TranscriptCount_Processed']=sra_dump_df['Run'].isin(processedRnaseqSrr)
m_processed=sra_dump_df['Skymap_TranscriptCount_Processed']
n SRR: 841085
outputFolder='/nrnb/users/btsui/Data/all_seq/rnaseq_merged_chunks/'
os.system('find -size {} 0 -print0 |xargs -0 rm --'.format(outputFolder))
31488
processedFnames=pd.Series(os.listdir(outputFolder))
refreshDb=False
if refreshDb:
os.system('rm -r {}'.format(outputFolder))
#processedFnames
import itertools
import gc
itertools
<module 'itertools' (built-in)>
faBaseDir='/cellar/users/btsui/Data/ensembl/release/cdna/'
nProcess=3
metricToDtypeDict={'est_counts':np.uint32,'tpm':np.float32}
from itertools import product
specieMetricPairs=list(product( param.supporting_species,metricToDtypeDict.keys()))
#selectedSpecies=
#%%capture Stdout
#selectedSpecies ,countMetric=('Canis_familiaris', 'tpm')
ignoreLastNDigitsForChunking=5 #less than 1000
for selectedSpecies ,countMetric in specieMetricPairs:
#identify samples
print (selectedSpecies ,countMetric)
m_specie=sra_dump_df['new_ScientificName']==selectedSpecies
m=m_processed&m_specie
sra_dump_df_sub=sra_dump_df[m]
sra_dump_df_in=sra_dump_df_sub#.head(n=10)
nTotal=sra_dump_df_sub.shape[0]
print ('# of samples to merge:',nTotal)
inSrrs=sra_dump_df_in['Run'].unique()
### identify the datatype in consideration.
myDtype=metricToDtypeDict[countMetric]
#paralize on chunk level instead.
inSrrS=pd.Series(inSrrs)
#each prefix is a Chunk ID
groupRunDf=pd.DataFrame({'Prefix':inSrrS.str[:-ignoreLastNDigitsForChunking],'Run':inSrrS})
print ("n chunks:",groupRunDf['Prefix'].nunique())
"""
For each chunk, export to the folder directly after merging, no message passing
"""
#check metric if it is matching
processedFnamesSub=processedFnames[processedFnames.str.contains("{}.transcript.{}"
.format( selectedSpecies,countMetric))]
groupRunDf['Processed']=groupRunDf['Prefix'].isin(processedFnamesSub.str.split('.').str[-2].unique())
def parseOne(inSrr):
abundanceDir=param.count_out_dir+'{}.abundance.tsv.gz'.format(inSrr)
tmpDf=pd.read_csv(abundanceDir,sep='\t')
tmpDf2=tmpDf.set_index('target_id')[countMetric].astype(myDtype)
return tmpDf2
g=groupRunDf.groupby('Prefix')['Run']
def mergeChunk(Prefix):
outputFname='{}.transcript.{}.{}.pickle'.format(
selectedSpecies,countMetric,Prefix)
if outputFname not in processedFnames:
outputDir=outputFolder+outputFname
try :
subRunS=g.get_group(Prefix)
myL=list(map(parseOne,subRunS))
mergedDf=pd.concat(myL,axis=1,keys=subRunS.values,copy=False,sort=False)
mergedDf.to_pickle(outputDir)
del (mergedDf, myL)
gc.collect()
except:
print ('Failed: '+Prefix)
### check if the data is processed or not, remove if it is first iteration.
m=~groupRunDf['Processed']
Prefixes=groupRunDf['Prefix'][m].unique()
print ('n chunks remaining: {}'.format(len(Prefixes)))
with Pool(nProcess) as p:
r=tqdm( p.map(mergeChunk,Prefixes), total=len(Prefixes))
#list(tqdm( map(mergeChunk,Prefixes), total=len(Prefixes)))
Homo_sapiens est_counts # of samples to merge: 304908 n chunks: 109 n chunks remaining: 0
0it [00:00, ?it/s]
Homo_sapiens tpm # of samples to merge: 304908 n chunks: 109 n chunks remaining: 0
0it [00:00, ?it/s]
Mus_musculus est_counts # of samples to merge: 489541 n chunks: 110 n chunks remaining: 1
len(Prefixes)
#processedFnamesSub.str.split('.').str[-2]
Prefixes
!ls -laSh /nrnb/users/btsui/Data/all_seq/rnaseq_merged_chunks/ | wc -l
asdfaf
#ERR17
groupRunDf[groupRunDf.Prefix=='ERR17'].shape
#Prefixes
#map( , g.get_group()
groupRunDf
#pd.__version__
!ls -laht /nrnb/users/btsui/Data/all_seq/rnaseq_merged_chunks/
"""#%%capture Stdout
#selectedSpecies ,countMetric=('Canis_familiaris', 'tpm')
ignoreLastNDigitsForChunking=4 #less than 1000
for selectedSpecies ,countMetric in specieMetricPairs:
#identify samples
print (selectedSpecies ,countMetric)
m_specie=sra_dump_df['new_ScientificName']==selectedSpecies
m=m_processed&m_specie
sra_dump_df_sub=sra_dump_df[m]
sra_dump_df_in=sra_dump_df_sub#.head(n=10)
nTotal=sra_dump_df_sub.shape[0]
print ('# of samples to merge:',nTotal)
inSrrs=sra_dump_df_in['Run'].unique()
### identify the datatype in consideration.
myDtype=metricToDtypeDict[countMetric]
#paralize on chunk level instead.
def parseOne(inSrr):
abundanceDir=param.count_out_dir+'{}.abundance.tsv.gz'.format(inSrr)
tmpDf=pd.read_csv(abundanceDir,sep='\t')
tmpDf2=tmpDf.set_index('target_id')[countMetric].astype(myDtype)
return tmpDf2
#do the merge
inSrrS=pd.Series(inSrrs)
#each prefix is a Chunk ID
groupRunDf=pd.DataFrame({'Prefix':inSrrS.str[:-ignoreLastNDigitsForChunking],'Run':inSrrS})
print ("n chunks:",groupRunDf['Prefix'].nunique())
"""
For each chunk, export to the folder directly after merging, no message passing
"""
for Prefix,subRunS in groupRunDf.groupby('Prefix')['Run']:
with Pool(nProcess) as p:
myL=list(tqdm( p.imap(parseOne,subRunS),total=len(subRunS)))
#3 mins per chunk
mergedDf=pd.concat(myL,axis=1,keys=subRunS.values,copy=False,sort=False)
outputDir='/nrnb/users/btsui/Data/all_seq/rnaseq_merged_chunks/{}.transcript.{}.{}.pickle'.format(
selectedSpecies,countMetric,Prefix)
mergedDf.to_pickle(outputDir)
del (mergedDf, myL)
gc.collect()
"""