The code in here is somewhat involved mostly because it needs to parallelize the data reading process, otherwise it could be a lot simpler.
parseSrr is probably what u need to read the most
import pandas as pd
import numpy as np
import re
import os
import math
from multiprocessing import Pool
## init
mySpecie='Homo_sapiens'
outMergedDir='/cellar/users/btsui/all_seq_snp/'+mySpecie+'_all_merged_snp.h5'
##change this dir to point to the updated csv
full_meta_dir="/cellar/users/btsui/Project/METAMAP/notebook/Parsing/sra_dump.csv"
inSrrDir='/nrnb/users/btsui/Data/all_seq/snp/'
tmp_dir='/nrnb/users/btsui/Data/all_seq/tmp/'
#!df -h /data/cellardata/
%%time
#%matplotlib inline
##machine: 5-1
full_meta_df=pd.read_csv(full_meta_dir)
#inSrrDir='/cellar/users/btsui/Project/METAMAP/notebook/RapMapTest/XGS_WGS/'
#existingMergedDf=pd.read_pickle(outMergedDir)
mySpecieDf=full_meta_df[full_meta_df['ScientificName']==mySpecie]
#find the chromosomes
tmpBedDf=pd.read_csv('/data/cellardata/users/btsui/dbsnp/snp_beds/'+mySpecie+'.bed',header=None,sep='\t')
unique_chroms=tmpBedDf[0].astype(np.str).unique()
### start merging one by one
if os.path.exists(tmp_dir):
os.system('rm -r '+tmp_dir)
os.system('mkdir -p '+tmp_dir)
os.chdir(tmp_dir)
#identify non empty files
os.system('ls -la '+inSrrDir+' > ls_out.txt ')
ls_df=pd.read_csv('ls_out.txt',sep='\s+',header=None,names=np.arange(9)).iloc[1:]
#ls_df=
size_S=ls_df[4]
m4=size_S.astype(np.int)>1000
m5=ls_df[8].str.contains('.txt.snp.gz$')
non_empty_files=ls_df[m4&m5][8].str.split('/').str[-1].str.split('.').str[0].values
CPU times: user 0 ns, sys: 0 ns, total: 0 ns Wall time: 11.7 µs
/cellar/users/btsui/anaconda3/lib/python3.6/site-packages/IPython/core/interactiveshell.py:2785: DtypeWarning: Columns (5,6,25,26) have mixed types. Specify dtype option on import or set low_memory=False. interactivity=interactivity, compiler=compiler, result=result) /cellar/users/btsui/anaconda3/lib/python3.6/site-packages/IPython/core/interactiveshell.py:2785: DtypeWarning: Columns (0) have mixed types. Specify dtype option on import or set low_memory=False. interactivity=interactivity, compiler=compiler, result=result)
print ('# of files to merge:',len(non_empty_files))
# of files to merge: 253005
%%time
"""
given: srr id
return: the merged file
"""
def parseSrr(inSrr):
#print inSrr
fname=inSrrDir+inSrr+'.txt.snp.gz'
tmpDf_all=pd.read_csv(fname,sep='\s+',header=None,names=np.arange(50),index_col=None,error_bad_lines=False)
myCols=['Chr','Pos','Ref','rd_all','','A','C','G','T','N']
tmpDf=tmpDf_all.iloc[:,:len(myCols)]
tmpDf.columns=myCols
tmpDf2=tmpDf.set_index(['Chr','Pos'])
myBases=['A','C','G','T']
myL=[]
for base in myBases:
splitL=tmpDf2[base].str.split(':',expand=True)
### extract the read count and base quality
tmpDf5=splitL[[1,3]].astype(np.float)
tmpDf5.columns=['ReadDepth','AverageBaseQuality']
myL.append(tmpDf5)
tmpDf6=pd.concat(myL,keys=myBases,axis=0,names=['base'])
tmpDf6.columns.name='features'
mergedDf=tmpDf6.astype(np.uint16)
non_zero_df=mergedDf[mergedDf['ReadDepth']>0]
tmpDf7=non_zero_df.reset_index()
Run_digits=re.search('[DES]RR(\d+)', inSrr)
Run_Db=re.search('([DES]RR)\d+', inSrr)
tmpDf7['Run_digits']=Run_digits.group(1)
tmpDf7['Run_db']=Run_Db.group(1)
###convert the datatypes
tmpDf7['Pos']=tmpDf7['Pos'].astype(np.uint32)
tmpDf7['Run_digits']=tmpDf7['Run_digits'].astype(np.uint64)
tmpDf7['Chr']=tmpDf7['Chr'].astype(np.str).astype('category',
categories=unique_chroms,ordered=True)
tmpDf7['Run_db']=tmpDf7['Run_db'].astype(np.str).astype('category',
categories=['DRR','ERR','SRR'],ordered=True)
tmpDf7['base']=tmpDf7['base'].astype('category',
categories=myBases,ordered=True)
srr_pickle_df=tmpDf7.set_index(['Run_db','Run_digits',u'Chr', u'Pos',u'base']).sort_index()
return srr_pickle_df
### identify files to be merged
fnames=pd.Series(os.listdir(inSrrDir))
snpFnames=fnames[fnames.str.contains('.snp.gz$')]
srrsWithData=snpFnames.str.split('.').str[0]
#mergedSrrs=existingMergedDf.index.get_level_values('Run')
#m1=~srrsWithData.isin(mergedSrrs)
m2=srrsWithData.isin(mySpecieDf['Run'])
m3=srrsWithData.isin(non_empty_files)
toMergeSrrs=srrsWithData[m2&m3].values
CPU times: user 2.86 s, sys: 540 ms, total: 3.4 s Wall time: 3.93 s
TEST=True
if TEST:
toRunSrrs=toMergeSrrs[:10]
chunkSize=5
nThread=1
else:
toRunSrrs=toMergeSrrs
chunkSize=1000
nThread=64
#optional: free up the memory
if not TEST:
del mySpecieDf, full_meta_df
def mergeSrrsL(i):
tmpL=[]
failedSrrsL=[]
for srr in toRunSrrs[i:(i+chunkSize)]:
try:
tmpL.append(parseSrr(srr))
except :
print ('failed: '+srr)
failedSrrsL.append(srr)
tmpMergedDf=pd.concat(tmpL)
#tmpMergedDf=pd.concat([parseSrr(srr) for srr in toRunSrrs[i:(i+chunkSize)]])
reorderedDf=tmpMergedDf.sort_index()
reorderedDf.to_pickle(tmp_dir+str(i)+'.pickle.gz',compression='gzip')
return failedSrrsL
Chunks=np.arange(0, len(toRunSrrs),chunkSize)
if TEST:
failed_srr_l=map(mergeSrrsL,Chunks.tolist())
else:
from multiprocessing import Pool
p=Pool(nThread)
### sweep for uncompleted chunks
failed_srr_l=p.map(mergeSrrsL,Chunks.tolist())
p.close()
#!ls /cellar/users/btsui/Data/dbsnp/
snpBed='/cellar/users/btsui/Data/dbsnp/snp_beds/'+mySpecie+'.bed'
### find the data that overlap exactly
tmpDf_ref_all_snps_df=pd.read_csv(snpBed,
sep='\s+',header=None,names=np.arange(50),index_col=None,error_bad_lines=False)
myCols=['Chr','Pos','Ref','rd_all','','A','C','G','T','N']
tmpDf=tmpDf_ref_all_snps_df.iloc[:,:len(myCols)]
tmpDf.columns=myCols
myReindexedDf=tmpDf.set_index(['Chr','Pos'])
refIndex=myReindexedDf.index
/cellar/users/btsui/anaconda3/lib/python3.6/site-packages/IPython/core/interactiveshell.py:2785: DtypeWarning: Columns (0) have mixed types. Specify dtype option on import or set low_memory=False. interactivity=interactivity, compiler=compiler, result=result)
"""
changes: need to do the split for different bins in here
Map the regions into the right bin
"""
#myInDir=''
#feature=''
from multiprocessing import Pool
import pandas as pd
import numpy as np
import os
tmp_dir='/nrnb/users/btsui/Data/all_seq/tmp/'
TEST=False
"""
change directory and identify intermediate pickle objects
"""
os.chdir(tmp_dir)
my_ls_S=pd.Series(os.listdir('./'))
ChunksFnames=my_ls_S[my_ls_S.str.contains('.pickle.gz$')].values
### create another dirctory for
tmp_dir_2='/nrnb/users/btsui/Data/all_seq/tmp_2/'
os.system('rm -r '+tmp_dir_2)
os.system('mkdir '+tmp_dir_2)
"""
input: pickle name
output: split into chunks
"""
feature_for_hash='Pos'
#2483 blocks per file
def splitChunk(inChunkFname):
tmpDf3=pd.read_pickle(inChunkFname).reset_index()
my_base_shift=int(10**5)#int(10**(max_size_order-n))
my_block_S=(tmpDf3[feature_for_hash]/my_base_shift).astype(np.int)*my_base_shift
##add one for the loop at the end
tmpDf3['block']=my_block_S #6.5s
tmpDf3=tmpDf3.sort_values(feature_for_hash) # might improve perfromance of group by
"""
export chunks
"""
g=tmpDf3.groupby('block')
for myBlock,tmpDf9 in g:
myKeyName='Chunk_'+str(inChunkFname.split('.')[0])+'.'+feature_for_hash+'_block_'+str(myBlock)
#print (tmp_dir_2+myKeyName)
tmpDf9.to_pickle(tmp_dir_2+myKeyName)
#splitChunk('60700.pickle.gz')
"""
test on a small number of files first
"""
p=Pool(64)
if TEST:
p.map(splitChunk,ChunksFnames[:10])
else:
p.map(splitChunk,ChunksFnames)
p.close()
"""
"""
tmp_dir_3='/nrnb/users/btsui/Data/all_seq/tmp_3/'
os.system('rm -r '+tmp_dir_3)
os.system('mkdir '+tmp_dir_3)
my_chunked_pickles=pd.Series(os.listdir(tmp_dir_2))
tmpDf6=pd.DataFrame({"chunk":my_chunked_pickles.str.split('.').str[1],"fname":my_chunked_pickles})
g=tmpDf6.groupby('chunk')['fname']
myChunks=g.groups.keys()
def mergeChunks(inputChunk):
myFiles=g.get_group(inputChunk)
myL=[]
for myFile in myFiles:
myL.append(pd.read_pickle(tmp_dir_2+myFile))
myMergedDf=pd.concat(myL,axis=0)
myMergedDf.set_index(['Chr','base','Run_db']).to_hdf(tmp_dir_3+inputChunk,'chunk',mode='w',format='fixed')
p=Pool(64)
p.map(mergeChunks,myChunks)
p.close()
"""
for each chunks
"""
"""
for each chunk, merge into a hdf5 file
merge all the tiny chunks into
"""
myDfDf=pd.read_hdf('Pos_block_140700000',mode='r')
#subDf=myDfDf.loc['7']
#subDf[subDf['Pos']==140753336].reset_index()#.index.get_level_values('base')
"""
changes: need to do the split for different bins in here
Map the regions into the right bin
"""
#myInDir=''
#feature=''
import pandas as pd
from multiprocessing import Pool
import numpy as np
import os
"""
change directory and identify intermediate pickle objects
"""
os.chdir(tmp_dir)
my_ls_S=pd.Series(os.listdir('./'))
ChunksFnames=my_ls_S[my_ls_S.str.contains('.pickle.gz$')].values
### create another dirctory for
tmp_dir_2='/nrnb/users/btsui/Data/all_seq/tmp_2/'
os.system('rm -r '+tmp_dir_2)
os.system('mkdir '+tmp_dir_2)
"""
input: pickle name
output: split into chunks
"""
feature_for_hash='Pos'
inChunkFname=ChunksFnames[0]
tmpDf3=pd.read_pickle(inChunkFname).reset_index()
my_max=tmpDf3[feature_for_hash].max()
print (my_max)# 6121752
#max_size_order=math.ceil(np.log10(my_max))
#print ('order of max',max_size_order)
#n=4
my_base_shift=int(10**5)#int(10**(max_size_order-n))
my_block_S=(tmpDf3[feature_for_hash]/my_base_shift).astype(np.int)*my_base_shift
nBlocks=my_block_S.max()
print ('# of blocks',)
blocks_VC=my_block_S.value_counts()
##add one for the loop at the end
my_range=blocks_VC.index#np.arange(0,nBlocks+1,my_base_shift)
print ('range size: ',len(my_range))
outH5Name=outMergedDir.replace('.h5','.'+feature_for_hash+'.'+str(my_base_shift)+'.chunked.h5')
print ('output name:',outH5Name)
os.system('rm '+outH5Name)
%time tmpDf3['block']=my_block_S #6.5s
%time tmpDf3=tmpDf3.sort_values(feature_for_hash) # 30 mins
"""
export chunks
"""
#myOrderDict={'Pos':[u'Chr', u'Pos',u'base','Run_digits','Run_db'],
# 'Run_digits':['Run_digits','Run_db',u'Chr', u'Pos',u'base']}
g=tmpDf3.groupby('block')
for myBlock,tmpDf9 in g:
tmpDf9=tmpDf3[tmpDf3['block']==myBlock]# For run, 27s, For Pos: 2 min :30 s
my_index_order=myOrderDict[feature_for_hash]
#tmpDf10=tmpDf9.set_index(my_index_order)
myKeyName=feature_for_hash+'_'+str(myBlock)+'.block_'+str(myBlock)
print (tmp_dir_2+myKeyName)
tmpDf9.to_pickle(tmp_dir_2+myKeyName)
#print (myKeyName,tmpDf9.shape)
#myKeyName
array(['tmp.pickle.gz', '0.pickle.gz', '1000.pickle.gz', 'tmp2.pickle.gz'], dtype=object)
### merge the files
"""
my_max=tmpDf3[feature_for_hash].max()
print (my_max)# 6121752
max_size_order=math.ceil(np.log10(my_max))
print ('order of max',max_size_order)
n=4
my_base_shift=int(10**(max_size_order-n))
my_block_S=(tmpDf3[feature_for_hash]/my_base_shift).astype(np.int)*my_base_shift
##check the data type of my_block_S and Run_digits
#my_block_S=my_block_S
### make the data become int
nBlocks=my_block_S.max()
print ('# of blocks',)
blocks_VC=my_block_S.value_counts()
##add one for the loop at the end
my_range=blocks_VC.index#np.arange(0,nBlocks+1,my_base_shift)
print ('range size: ',len(my_range))
outH5Name=outMergedDir.replace('.h5','.'+feature_for_hash+'.'+str(my_base_shift)+'.chunked.h5')
print ('output name:',outH5Name)
os.system('rm '+outH5Name)
%time tmpDf3['block']=my_block_S #6.5s
%time tmpDf3=tmpDf3.sort_values(feature_for_hash) # 30 mins
"""
"""
changes: need to do the split for different bins in here
"""
"""
myTmpL=[]
my_ls_S=pd.Series(os.listdir('./'))
ChunksFnames=my_ls_S[my_ls_S.str.contains('.pickle.gz$')].values
for chunkFname in ChunksFnames:
tmpDf=pd.read_pickle(chunkFname)
tmpDf2=tmpDf.reset_index()
#tmpDf3=tmpDf2.set_index(['Chr','Pos'])
#m=tmpDf3.index.isin(refIndex)
subTmpDf2=tmpDf2#[m]
print '% sites in targets:', (m.mean())
print subTmpDf2.shape
myTmpL.append(subTmpDf2)
mergedDf=pd.concat(myTmpL)
#mergedS
all_mergedDf=mergedDf.set_index(
['Run_db','Run_digits']).sort_index()
#all_mergedDf.to_pickle(outMergedDir)
"""
all_mergedDf.to_hdf(outMergedDir,key='master',mode="w")
allFailedSrrs=pd.Series(reduce(lambda a,x:a+x,failed_srr_l,[]))
allFailedSrrs.to_csv('/nrnb/users/btsui/Data/all_seq/'+mySpecie+'.failed.srrs.txt')
###change corrdinate to chromsome base for slicing
#mySpecie='Homo_sapiens'
#skymap_snp_dir='/cellar/users/btsui/all_seq_snp/'+mySpecie+'_all_merged_snp.h5'
sorted_snp_dir='/cellar/users/btsui/all_seq_snp/'+mySpecie+'_all_merged_snp.chrom_pos_sorted.h5'
tmpDf=pd.read_hdf(outMergedDir,key='master',mode='r')
tmpDf4=tmpDf.reset_index()
sortedDf=tmpDf4.set_index( [u'Chr', u'Pos',u'base']).sort_index()
sortedDf.to_hdf(sorted_snp_dir,key='master',mode='w')
#sorted_snp_dir='/cellar/users/btsui/all_seq_snp/'+mySpecie+'_all_merged_snp.chrom_pos_sorted.h5'
tmpDf=pd.read_hdf(outMergedDir,key='master',mode='r')
tmpDf3=tmpDf.reset_index()
tmpDf3['base']=tmpDf3['base'].astype('category')
tmpDf3['Chr']=tmpDf3['Chr'].astype('category')
tmpDf3['Run_db']=tmpDf3['Run_db'].astype('category')
tmpDf3['Pos']=tmpDf3['Pos'].astype(np.uint32)
tmpDf3['Run_digits']=tmpDf3['Run_digits'].astype(np.uint32)
#tmpDf4=tmpDf3.set_index(['Run_digits','Run_db',u'Chr', u'Pos',u'base'])
#tmpDf5=tmpDf4.sort_index()
outRunSortedOutDir=outMergedDir.replace('.h5','.pickle')
os.system('rm '+outRunSortedOutDir)
tmpDf3.to_pickle(outRunSortedOutDir)
#tmpDf3.to_hdf(outRunSortedOutDir,key='master',mode='w')
'/cellar/users/btsui/'
import pandas as pd
import numpy as np
import re
import os
from multiprocessing import Pool
## init
mySpecie='Homo_sapiens'
outMergedDir='/cellar/users/btsui/all_seq_snp/'+mySpecie+'_all_merged_snp.h5'
%time tmpDf=pd.read_hdf(outMergedDir,key='master',mode='r')
%time tmpDf3=tmpDf.reset_index()
"""
CPU times: user 8.21 s, sys: 19.9 s, total: 28.2 s
Wall time: 1min 18s
CPU times: user 1min 35s, sys: 57.8 s, total: 2min 33s
Wall time: 2min 33s
"""
%time tmpDf5=tmpDf3.set_index(['Run_digits','Run_db'])
"""
#CPU times: user 2min 7s, sys: 1min 54s, total: 4min 1s
Wall time: 4min 1s
"""
%time tmpDf5.to_hdf('./test_wo_chroms.hdf','master',mode='w',append=False,format='table')
#Exception: cannot find the correct atom type -> [dtype->object,items->Index(['Chr', 'base'], dtype='object')]
math.ceil()
#myCol=np.array(['Run_digits','Run_db',u'Chr', u'Pos',u'base'])
### what's the datatype for those guys:
### let's just
import pandas as pd
import numpy as np
import re
import os
import math
from multiprocessing import Pool
## init
mySpecie='Homo_sapiens'
outMergedDir='/cellar/users/btsui/all_seq_snp/'+mySpecie+'_all_merged_snp.h5'
%time tmpDf=pd.read_hdf(outMergedDir,key='master',mode='r')
%time tmpDf3=tmpDf.reset_index()
#feature_for_hash='Run_digits'
feature_for_hash='Pos'
myOrderDict={'Pos':[u'Chr', u'Pos',u'base','Run_digits','Run_db'],
'Run_digits':['Run_digits','Run_db',u'Chr', u'Pos',u'base']}
my_max=tmpDf3[feature_for_hash].max()
print (my_max)# 6121752
max_size_order=math.ceil(np.log10(my_max))
print ('order of max',max_size_order)
n=4
my_base_shift=int(10**(max_size_order-n))
my_block_S=(tmpDf3[feature_for_hash]/my_base_shift).astype(np.int)*my_base_shift
##check the data type of my_block_S and Run_digits
#my_block_S=my_block_S
### make the data become int
nBlocks=my_block_S.max()
print ('# of blocks',)
blocks_VC=my_block_S.value_counts()
##add one for the loop at the end
my_range=blocks_VC.index#np.arange(0,nBlocks+1,my_base_shift)
print ('range size: ',len(my_range))
outH5Name=outMergedDir.replace('.h5','.'+feature_for_hash+'.'+str(my_base_shift)+'.chunked.h5')
print ('output name:',outH5Name)
os.system('rm '+outH5Name)
%time tmpDf3['block']=my_block_S #6.5s
%time tmpDf3=tmpDf3.sort_values(feature_for_hash) # 30 mins
#%time g=tmpDf3.groupby('block') #1min 35s
#
for myBlock in my_range:
#for myBlock,tmpDf9 in g:
"""
in Pos,
"""
%time tmpDf9=tmpDf3[tmpDf3['block']==myBlock]# For run, 27s, For Pos: 2 min :30 s
my_index_order=myOrderDict[feature_for_hash]
#tmpDf10=tmpDf9.set_index(my_index_order)
myKeyName=feature_for_hash+'_'+str(myBlock)
print (myKeyName,tmpDf9.shape)
tmpDf9.to_hdf(outH5Name,myKeyName,mode='a',format='fixed')
#if myBlock >3000:
# break
#('range size: ', 1201)
0.0015764881876101904
### make sure the results is chunked as expects
#!ls /cellar/users/btsui/all_seq_snp/
#2 seconds per data push
Acinetobacter_baumannii_all_merged_snp.pickle Acropora_millepora_all_merged_snp.pickle activated_sludge_metagenome_all_merged_snp.pickle Aedes_aegypti_all_merged_snp.pickle air_metagenome_all_merged_snp.pickle algae_metagenome_all_merged_snp.pickle anaerobic_digester_metagenome_all_merged_snp.pickle Anopheles_arabiensis_all_merged_snp.pickle Anopheles_gambiae_all_merged_snp.pickle Apis_mellifera_all_merged_snp.pickle aquatic_metagenome_all_merged_snp.pickle Arabidopsis_thaliana_all_merged_snp.pickle Bacteria_all_merged_snp.pickle biofilm_metagenome_all_merged_snp.pickle bioreactor_metagenome_all_merged_snp.pickle bird_metagenome_all_merged_snp.pickle Boechera_stricta_all_merged_snp.pickle Bordetella_pertussis_all_merged_snp.pickle Bos_taurus_all_merged_snp.pickle bovine_gut_metagenome_all_merged_snp.pickle bovine_metagenome_all_merged_snp.pickle Brachypodium_distachyon_all_merged_snp.pickle Brassica_napus_all_merged_snp.pickle Brassica_rapa_all_merged_snp.pickle Burkholderia_pseudomallei_all_merged_snp.pickle Caenorhabditis_elegans_all_merged_snp.pickle Calidris_pugnax_all_merged_snp.pickle Campylobacter_all_merged_snp.pickle Campylobacter_coli_all_merged_snp.pickle Campylobacter_jejuni_all_merged_snp.pickle Campylobacter_sp._all_merged_snp.pickle Candida_albicans_all_merged_snp.pickle Canis_lupus_familiaris_all_merged_snp.pickle Cannabis_sativa_all_merged_snp.pickle Capra_hircus_all_merged_snp.pickle Centaurea_solstitialis_all_merged_snp.pickle chicken_gut_metagenome_all_merged_snp.pickle Chlamydomonas_reinhardtii_all_merged_snp.pickle Chlorocebus_sabaeus_all_merged_snp.pickle Ciona_robusta_all_merged_snp.pickle Clostridioides_difficile_all_merged_snp.pickle [Clostridium]_difficile_all_merged_snp.pickle coral_metagenome_all_merged_snp.pickle Crassostrea_gigas_all_merged_snp.pickle Danio_rerio_all_merged_snp.pickle Drosophila_melanogaster_all_merged_snp.pickle endophyte_metagenome_all_merged_snp.pickle Enterobacter_cloacae_all_merged_snp.pickle Enterococcus_faecium_all_merged_snp.pickle environmental_samples_all_merged_snp.pickle Equus_caballus_all_merged_snp.pickle Erythranthe_guttata_all_merged_snp.pickle Escherichia_coli_all_merged_snp.pickle Escherichia_coli_str._K-12_substr._MG1655_all_merged_snp.pickle Fagopyrum_tataricum_all_merged_snp.pickle feces_metagenome_all_merged_snp.pickle fish_gut_metagenome_all_merged_snp.pickle fish_metagenome_all_merged_snp.pickle food_fermentation_metagenome_all_merged_snp.pickle food_metagenome_all_merged_snp.pickle freshwater_metagenome_all_merged_snp.pickle freshwater_sediment_metagenome_all_merged_snp.pickle Gallus_gallus_all_merged_snp.pickle Gasterosteus_aculeatus_all_merged_snp.pickle Glycine_max_all_merged_snp.pickle Gossypium_hirsutum_all_merged_snp.pickle gut_metagenome_all_merged_snp.pickle Haemonchus_contortus_all_merged_snp.pickle Helianthus_annuus_all_merged_snp.pickle Hepatitis_C_virus_all_merged_snp.pickle Homo_sapiens_all_merged_snp.chrom_pos_sorted.h5 Homo_sapiens_all_merged_snp.h5 Homo_sapiens_all_merged_snp.pickle Homo_sapiens_all_merged_snp.pickle.gz Homo_sapiens_all_merged_snp.run_sorted.h5 Homo_sapiens_all_merged_snp.run_sorted.pickle Hordeum_vulgare_all_merged_snp.pickle Hordeum_vulgare_subsp._vulgare_all_merged_snp.pickle hot_springs_metagenome_all_merged_snp.pickle human_gut_metagenome_all_merged_snp.pickle Human_immunodeficiency_virus_1_all_merged_snp.pickle human_lung_metagenome_all_merged_snp.pickle human_metagenome_all_merged_snp.pickle human_nasopharyngeal_metagenome_all_merged_snp.pickle human_oral_metagenome_all_merged_snp.pickle human_skin_metagenome_all_merged_snp.pickle human_vaginal_metagenome_all_merged_snp.pickle hydrocarbon_metagenome_all_merged_snp.pickle hydrothermal_vent_metagenome_all_merged_snp.pickle indoor_metagenome_all_merged_snp.pickle Influenza_A_virus_all_merged_snp.pickle insect_gut_metagenome_all_merged_snp.pickle insect_metagenome_all_merged_snp.pickle Klebsiella_pneumoniae_all_merged_snp.pickle lake_water_metagenome_all_merged_snp.pickle Lates_calcarifer_all_merged_snp.pickle leaf_metagenome_all_merged_snp.pickle Legionella_pneumophila_all_merged_snp.pickle Listeria_monocytogenes_all_merged_snp.pickle Lolium_perenne_all_merged_snp.pickle lung_metagenome_all_merged_snp.pickle Macaca_fascicularis_all_merged_snp.pickle Macaca_mulatta_all_merged_snp.pickle Manihot_esculenta_all_merged_snp.pickle Mannheimia_haemolytica_all_merged_snp.pickle marine_metagenome_all_merged_snp.pickle marine_sediment_metagenome_all_merged_snp.pickle Medicago_truncatula_all_merged_snp.pickle Menidia_menidia_all_merged_snp.pickle metagenome_all_merged_snp.pickle metagenomes_all_merged_snp.pickle metagenome_sequence_all_merged_snp.pickle microbial_mat_metagenome_all_merged_snp.pickle milk_metagenome_all_merged_snp.pickle Miscanthus_sinensis_all_merged_snp.pickle mosquito_metagenome_all_merged_snp.pickle mouse_gut_metagenome_all_merged_snp.pickle mouse_metagenome_all_merged_snp.pickle Mus_musculus_domesticus_all_merged_snp.pickle Mycobacterium_bovis_all_merged_snp.pickle Mycobacterium_tuberculosis_all_merged_snp.pickle Mycobacterium_tuberculosis_complex_bacterium_all_merged_snp.pickle Neisseria_gonorrhoeae_all_merged_snp.pickle Neisseria_meningitidis_all_merged_snp.pickle Neurospora_crassa_all_merged_snp.pickle Nothobranchius_furzeri_all_merged_snp.pickle Oncorhynchus_mykiss_all_merged_snp.pickle Oncorhynchus_nerka_all_merged_snp.pickle Oncorhynchus_tshawytscha_all_merged_snp.pickle oral_metagenome_all_merged_snp.pickle Oryza_sativa_all_merged_snp.pickle Oryza_sativa_Indica_Group_all_merged_snp.pickle Oryza_sativa_Japonica_Group_all_merged_snp.pickle Ovis_aries_all_merged_snp.pickle Panicum_hallii_all_merged_snp.pickle Panicum_virgatum_all_merged_snp.pickle Pan_troglodytes_all_merged_snp.pickle phyllosphere_metagenome_all_merged_snp.pickle Picea_abies_all_merged_snp.pickle pig_gut_metagenome_all_merged_snp.pickle plant_metagenome_all_merged_snp.pickle Plasmodium_falciparum_all_merged_snp.pickle Plasmodium_vivax_all_merged_snp.pickle Populus_trichocarpa_all_merged_snp.pickle Pseudomonas_aeruginosa_all_merged_snp.pickle Pseudomonas_fluorescens_all_merged_snp.pickle Pseudotsuga_menziesii_all_merged_snp.pickle rat_gut_metagenome_all_merged_snp.pickle Rattus_norvegicus_all_merged_snp.pickle Rhinella_marina_all_merged_snp.pickle rhizosphere_metagenome_all_merged_snp.pickle root_associated_fungus_metagenome_all_merged_snp.pickle root_metagenome_all_merged_snp.pickle Saccharomyces_cerevisiae_all_merged_snp.pickle Saccharomyces_cerevisiae_S288C_all_merged_snp.pickle Salmonella_enterica_all_merged_snp.pickle Salmonella_enterica_subsp._enterica_all_merged_snp.pickle Salmonella_enterica_subsp._enterica_serovar_Enteritidis_all_merged_snp.pickle Salmonella_enterica_subsp._enterica_serovar_Typhi_all_merged_snp.pickle Salmonella_enterica_subsp._enterica_serovar_Typhimurium_all_merged_snp.pickle Salmo_salar_all_merged_snp.pickle Salvelinus_namaycush_all_merged_snp.pickle Schizosaccharomyces_pombe_all_merged_snp.pickle Schmidtea_mediterranea_all_merged_snp.pickle seawater_metagenome_all_merged_snp.pickle sediment_metagenome_all_merged_snp.pickle Sesamum_indicum_all_merged_snp.pickle Setaria_italica_all_merged_snp.pickle Shigella_flexneri_all_merged_snp.pickle Shigella_sonnei_all_merged_snp.pickle skin_metagenome_all_merged_snp.pickle sludge_metagenome_all_merged_snp.pickle soil_metagenome_all_merged_snp.pickle Solanum_lycopersicum_all_merged_snp.pickle Solanum_tuberosum_all_merged_snp.pickle Sorghum_bicolor_all_merged_snp.pickle sponge_metagenome_all_merged_snp.pickle Staphylococcus_aureus_all_merged_snp.pickle Streptococcus_agalactiae_all_merged_snp.pickle Streptococcus_pneumoniae_all_merged_snp.pickle Streptococcus_pyogenes_all_merged_snp.pickle Streptococcus_suis_all_merged_snp.pickle Sus_scrofa_all_merged_snp.pickle Syngnathus_scovelli_all_merged_snp.pickle synthetic_construct_all_merged_snp.pickle synthetic_metagenome_all_merged_snp.pickle terrestrial_metagenome_all_merged_snp.pickle Timema_cristinae_all_merged_snp.pickle Triticum_aestivum_all_merged_snp.pickle Triticum_turgidum_all_merged_snp.pickle unclassified_sequences_all_merged_snp.pickle uncultured_bacterium_all_merged_snp.pickle uncultured_fungus_all_merged_snp.pickle unidentified_all_merged_snp.pickle vaginal_metagenome_all_merged_snp.pickle Vibrio_cholerae_all_merged_snp.pickle viral_metagenome_all_merged_snp.pickle Vitis_vinifera_all_merged_snp.pickle wastewater_metagenome_all_merged_snp.pickle wetland_metagenome_all_merged_snp.pickle Zaire_ebolavirus_all_merged_snp.pickle Zea_mays_all_merged_snp.pickle Zea_mays_subsp._mays_all_merged_snp.pickle
#7 digits,
# 18gb to 0.1
#create an hdf5 object,
tmpDf.to_hdf('./test.h5','master',mode='w',append=False,format='table')
%time tmpDf.to_hdf('./test.h5','master',mode='w',append=False,format='table')
testDf=tmpDf.head()
testDf.to_hdf('./test.h5','master',mode='w',format='table')
#export succceedded, but the slicing operation time takes forever, the path only lead from /master/ to py table.
import pandas as pd
import numpy as np
#v6
#14.9s
%time testDf20=pd.read_hdf('/cellar/users/btsui/test.pos.chunked.h5','Run_digits_790000',mode='r')
CPU times: user 3.34 s, sys: 11.4 s, total: 14.7 s Wall time: 14.9 s
np.log10(testDf20.shape[0])
7.7110316966216308
(51408117, 7)
#dataset_name = '/Configure:0000/Run:0000/CalibCycle:0000/Camera::FrameV1/XppSb4Pim.1:Tm6740.1/image'
import h5py
import numpy as np
import matplotlib.pyplot as plt
/cellar/users/btsui/anaconda2/lib/python2.7/site-packages/h5py/__init__.py:34: FutureWarning: Conversion of the second argument of issubdtype from `float` to `np.floating` is deprecated. In future, it will be treated as `np.float64 == np.dtype(float).type`. from ._conv import register_converters as _register_converters
hdf5_file_name='/cellar/users/btsui/test.h5'
f = h5py.File(hdf5_file_name, 'r')
table=f['/master/table']
#obj.iteritems()
table[1000:2000]
table.dtype
dtype([('index', '<i8'), ('values_block_0', '<u2', (2,)), ('base', 'S1'), ('Pos', '<i8'), ('Chr', 'S2'), ('Run_digits', '<i8'), ('Run_db', 'S3')])
table.shape
(1456652195,)
for i,myObj in enumerate(obj.iteritems()):
print myObj
if i>10:
break
(u'_i_table', None) (u'table', <HDF5 dataset "table": shape (1456652195,), type "|V34">)
%time posDf=pd.read_hdf('/cellar/users/btsui/test.h5','master',where='Run_digits=15999')
ERROR: Internal Python error in the inspect module. Below is the traceback from this internal error. Unfortunately, your original traceback can not be constructed.
Traceback (most recent call last): File "/cellar/users/btsui/anaconda2/lib/python2.7/site-packages/IPython/core/ultratb.py", line 1118, in get_records return _fixed_getinnerframes(etb, number_of_lines_of_context, tb_offset) File "/cellar/users/btsui/anaconda2/lib/python2.7/site-packages/IPython/core/ultratb.py", line 300, in wrapped return f(*args, **kwargs) File "/cellar/users/btsui/anaconda2/lib/python2.7/site-packages/IPython/core/ultratb.py", line 345, in _fixed_getinnerframes records = fix_frame_records_filenames(inspect.getinnerframes(etb, context)) File "/cellar/users/btsui/anaconda2/lib/python2.7/inspect.py", line 1051, in getinnerframes framelist.append((tb.tb_frame,) + getframeinfo(tb, context)) File "/cellar/users/btsui/anaconda2/lib/python2.7/inspect.py", line 1011, in getframeinfo filename = getsourcefile(frame) or getfile(frame) File "/cellar/users/btsui/anaconda2/lib/python2.7/inspect.py", line 450, in getsourcefile if os.path.exists(filename): File "/cellar/users/btsui/anaconda2/lib/python2.7/genericpath.py", line 26, in exists os.stat(path) KeyboardInterrupt
--------------------------------------------------------------------------- IndexError Traceback (most recent call last) /cellar/users/btsui/anaconda2/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in run_code(self, code_obj, result) 2900 if result is not None: 2901 result.error_in_exec = sys.exc_info()[1] -> 2902 self.showtraceback() 2903 else: 2904 outflag = 0 /cellar/users/btsui/anaconda2/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in showtraceback(self, exc_tuple, filename, tb_offset, exception_only) 1828 except Exception: 1829 stb = self.InteractiveTB.structured_traceback(etype, -> 1830 value, tb, tb_offset=tb_offset) 1831 1832 self._showtraceback(etype, value, stb) /cellar/users/btsui/anaconda2/lib/python2.7/site-packages/IPython/core/ultratb.pyc in structured_traceback(self, etype, value, tb, tb_offset, number_of_lines_of_context) 1390 self.tb = tb 1391 return FormattedTB.structured_traceback( -> 1392 self, etype, value, tb, tb_offset, number_of_lines_of_context) 1393 1394 /cellar/users/btsui/anaconda2/lib/python2.7/site-packages/IPython/core/ultratb.pyc in structured_traceback(self, etype, value, tb, tb_offset, number_of_lines_of_context) 1298 # Verbose modes need a full traceback 1299 return VerboseTB.structured_traceback( -> 1300 self, etype, value, tb, tb_offset, number_of_lines_of_context 1301 ) 1302 else: /cellar/users/btsui/anaconda2/lib/python2.7/site-packages/IPython/core/ultratb.pyc in structured_traceback(self, etype, evalue, etb, tb_offset, number_of_lines_of_context) 1182 structured_traceback_parts += formatted_exception 1183 else: -> 1184 structured_traceback_parts += formatted_exception[0] 1185 1186 return structured_traceback_parts IndexError: string index out of range
"""
the multindex doesn't work no matter what
just use pickle for now:
"""
"""
[u'Chr', u'Pos',u'base','Run']
-rw-r--r-- 1 btsui users 1.8M Jan 2 16:42 0.pickle.gz
-rw-r--r-- 1 btsui users 2.0M Jan 2 16:42 10.pickle.gz
['Run', u'Chr', u'Pos',u'base']
-rw-r--r-- 1 btsui users 1.9M Jan 2 16:43 0.pickle.gz
-rw-r--r-- 1 btsui users 2.2M Jan 2 16:44 10.pickle.gz
"""
"""
### where did 1000.pickle.gz go from the first iteration
### out of 82, only 60 processed, what about the rest?
###
"""
#!rm /tmp/btsui/snp_merged/0.pickle
#18G
myDf=pd.read_hdf('/cellar/users/btsui/all_seq_snp/Homo_sapiens_all_merged_snp.h5',key='master')
#myDf
### keep everything in the same place?
##
### read in all the human.
###take only those in human and merge all of them together, be only additivite, add only those where it has not been added.
#srr_pickle_df.iloc[:0].to_pickle('/data/cellardata/users/btsui/dbsnp/empty_base_snp.pickle')
## merge all the data in bulk,
### generate an empty pickle for each species?
#orignal pickle, 2.7m
#!ls -lah ./tmp.pickle
#!ls -lah tmp2.pickle.gz
#!rm ./tmp2.pickle
### merge all the files, by 1000 steps
srr_pickle_df.to_pickle('./tmp2.pickle')
#srr_pickle_df.to_pickle('./tmp.pickle')
!ls -lah ./tmp.pickle ./tmp2.pickle
!rm ./tmp2.pickle.gz
!rm ./tmp.pickle.gz
!gzip ./tmp2.pickle
!gzip ./tmp.pickle
!ls -lah tmp2.pickle.gz
!ls -lah tmp.pickle.gz
### merge the pickle
#0.4 mb
np.log10(0.4*(4*(10**6)))
### the multindex doesn't increase the
#no filtering on variants
countDf=tmpDf6[tmpDf6.rd>0]#.rd.value_counts()
##at a position, tell what's the value
subDf.head()
my_data_L=[]
my_key_L=[]
g=countDf.stack().groupby(['base','features'])
for myTuple, subS in g:
sparse_Ids=subS.index.get_level_values(index_name)
tmp_row_col=(sparse_Ids,np.zeros(len(sparse_Ids)))
tmp_array=sparse.csc_matrix((subS ,tmp_row_col),shape=tmp_shape)
my_data_L.append(tmp_array)
my_key_L.append(myTuple)
sparseM=sparse.hstack(my_data_L)
index_name='Chr_Pos_Id'
baseDf.groupby(index_name)
#csc_matrix((data, (row, col)), shape=(3, 3)).toarray()
attrib='rd'
tmp_shape=(Chr_Pos_to_ID_S.max(),1)
for my_base, subDf in countDf.groupby(level='base'):
sparse_Ids=subDf.index.get_level_values(index_name)
tmp_row_col=(sparse_Ids,np.zeros(len(sparse_Ids)))
tmpS=sparse.csc_matrix((subDf[attrib] ,tmp_row_col),shape=tmp_shape)
countDf.shape
#countDf
#full_meta_df['Run']
import pandas as pd
import numpy as np
import re
import os
from multiprocessing import Pool
## init
mySpecie='Homo_sapiens'
outMergedDir='/cellar/users/btsui/all_seq_snp/'+mySpecie+'_all_merged_snp.h5'
full_meta_dir="/cellar/users/btsui/Project/METAMAP/notebook/Parsing/sra_dump.csv"
full_meta_df=pd.read_csv(full_meta_dir)
#inSrrDir='/cellar/users/btsui/Project/METAMAP/notebook/RapMapTest/XGS_WGS/'
inSrrDir='/nrnb/users/btsui/Data/all_seq/snp/'
existingMergedDf=pd.read_pickle(outMergedDir)
existingMergedDf.groupby(['Run_db','Run_digits']).size()
--------------------------------------------------------------------------- KeyError Traceback (most recent call last) <ipython-input-292-66bca8ab52f7> in <module>() ----> 1 existingMergedDf.groupby(['Run_db','Run_digits']).size() /cellar/users/btsui/anaconda2/lib/python2.7/site-packages/pandas/core/generic.pyc in groupby(self, by, axis, level, as_index, sort, group_keys, squeeze, **kwargs) 4414 return groupby(self, by=by, axis=axis, level=level, as_index=as_index, 4415 sort=sort, group_keys=group_keys, squeeze=squeeze, -> 4416 **kwargs) 4417 4418 def asfreq(self, freq, method=None, how=None, normalize=False, /cellar/users/btsui/anaconda2/lib/python2.7/site-packages/pandas/core/groupby.pyc in groupby(obj, by, **kwds) 1697 raise TypeError('invalid type: %s' % type(obj)) 1698 -> 1699 return klass(obj, by, **kwds) 1700 1701 /cellar/users/btsui/anaconda2/lib/python2.7/site-packages/pandas/core/groupby.pyc in __init__(self, obj, keys, axis, level, grouper, exclusions, selection, as_index, sort, group_keys, squeeze, **kwargs) 390 level=level, 391 sort=sort, --> 392 mutated=self.mutated) 393 394 self.obj = obj /cellar/users/btsui/anaconda2/lib/python2.7/site-packages/pandas/core/groupby.pyc in _get_grouper(obj, key, axis, level, sort, mutated) 2688 in_axis, name, level, gpr = False, None, gpr, None 2689 else: -> 2690 raise KeyError(gpr) 2691 elif isinstance(gpr, Grouper) and gpr.key is not None: 2692 # Add key to exclusions KeyError: 'Run_db'
import pandas as pd
import numpy as np
s = pd.Series(np.random.randn(5), index=['a', 'b', 'c', 'd', 'e'])
s.flags
C_CONTIGUOUS : True F_CONTIGUOUS : True OWNDATA : True WRITEABLE : True ALIGNED : True WRITEBACKIFCOPY : False UPDATEIFCOPY : False
s.index.flags
C_CONTIGUOUS : True F_CONTIGUOUS : True OWNDATA : False WRITEABLE : True ALIGNED : True WRITEBACKIFCOPY : False UPDATEIFCOPY : False
pd.__path__
['/cellar/users/btsui/anaconda2/lib/python2.7/site-packages/pandas']
store = pd.HDFStore('test.h5','w')
### check compression ratio with at last ten reads:
### select by variants, for select by