#!/usr/bin/env python # coding: utf-8 # In[1]: import ipyrad as ip import ipyparallel as ipp # In[2]: ipyclient = ipp.Client() # In[4]: data = ip.load_json("6-tortas/test.json") test2 = data.branch("test2") #test2.run("5", force=True) test2.remote_calculate_depths() test2.remote_make_chunks() # In[ ]: # In[4]: self = Step5(test2, True, ipyclient) self.remote_calculate_depths() self.remote_make_chunks() # In[ ]: # In[26]: data = test2 sample = data.samples["PZ03concat"] isref = True for sample in [sample]: chunks = glob.glob(os.path.join( data.tmpdir, "{}.chunk.*".format(sample.name))) chunks.sort(key=lambda x: int(x.split('.')[-1])) chunkfile = chunks[1] chunkfile # In[27]: p = Processor(data, sample, chunkfile, isref) p.run() # In[30]: p.counters, p.filters # In[9]: class Processor: def __init__(self, data, sample, chunkfile, isref): self.data = data self.sample = sample self.chunkfile = chunkfile self.isref = isref # prepare the processor self.set_params() self.init_counters() self.init_arrays() self.chroms2ints() # run the processor #self.run() def run(self): self.process_chunk() self.write_chunk() def set_params(self): # set max limits self.tmpnum = int(self.chunkfile.split(".")[-1]) self.optim = int(self.chunkfile.split(".")[-2]) self.este = self.data.stats.error_est.mean() self.esth = self.data.stats.hetero_est.mean() self.maxlen = self.data._hackersonly["max_fragment_length"] if 'pair' in self.data.paramsdict["datatype"]: self.maxhet = sum(self.data.paramsdict["max_Hs_consens"]) self.maxn = sum(self.data.paramsdict["max_Ns_consens"]) else: self.maxhet = self.data.paramsdict["max_Hs_consens"][0] self.maxn = self.data.paramsdict["max_Ns_consens"][0] # not enforced for ref if self.isref: self.maxn = int(1e6) def init_counters(self): # store data for stats counters. self.counters = { "name": self.tmpnum, "heteros": 0, "nsites": 0, "nconsens": 0, } # store data for what got filtered self.filters = { "depth": 0, "maxh": 0, "maxn": 0, } # store data for writing self.storeseq = {} def init_arrays(self): # local copies to use to fill the arrays self.catarr = np.zeros((self.optim, self.maxlen, 4), dtype=np.uint32) self.nallel = np.zeros((self.optim, ), dtype=np.uint8) self.refarr = np.zeros((self.optim, 3), dtype=np.int64) def chroms2ints(self): # if reference-mapped then parse the fai to get index number of chroms if self.isref: fai = pd.read_csv( self.data.paramsdict["reference_sequence"] + ".fai", names=['scaffold', 'size', 'sumsize', 'a', 'b'], sep="\t") self.faidict = {j: i for i, j in enumerate(fai.scaffold)} self.revdict = {j: i for i, j in self.faidict.items()} # --------------------------------------------- def process_chunk(self): # stream through the clusters with open(self.chunkfile, 'rb') as inclust: pairdealer = izip(*[iter(inclust)] * 2) done = 0 while not done: done, chunk = clustdealer(pairdealer, 1) if chunk: self.parse_cluster(chunk) if self.filter_mindepth(): self.build_consens_and_array() self.get_heteros() if self.filter_maxhetero(): if self.filter_maxN_minLen(): self.get_alleles() self.store_data() def parse_cluster(self, chunk): # get names and seqs piece = chunk[0].decode().strip().split("\n") self.names = piece[0::2] self.seqs = piece[1::2] # pull replicate read info from seqs self.reps = [int(n.split(";")[-2][5:]) for n in self.names] # ref positions self.ref_position = (-1, 0, 0) if self.isref: # parse position from name string rname = self.names[0].rsplit(";", 2)[0] chrom, posish = rname.rsplit(":") pos0, pos1 = posish.split("-") # pull idx from .fai reference dict chromint = self.faidict[chrom] + 1 self.ref_position = (int(chromint), int(pos0), int(pos1)) def filter_mindepth(self): # exit if nreps is less than mindepth setting if not nfilter1(self.data, self.reps): self.filters['depth'] += 1 return 0 return 1 def build_consens_and_array(self): # get stacks of base counts sseqs = [list(seq) for seq in self.seqs] arrayed = np.concatenate( [[seq] * rep for (seq, rep) in zip(sseqs, self.reps)] ).astype(bytes) # ! enforce maxlen limit ! self.arrayed = arrayed[:, :self.maxlen] # get unphased consens sequence from arrayed self.consens = base_caller( self.arrayed, self.data.paramsdict["mindepth_majrule"], self.data.paramsdict["mindepth_statistical"], self.esth, self.este, ) # trim Ns from the left and right ends self.consens[self.consens == b"-"] = b"N" trim = np.where(self.consens != b"N")[0] ltrim, rtrim = trim.min(), trim.max() self.consens = self.consens[ltrim:rtrim + 1] self.arrayed = self.arrayed[:, ltrim:rtrim + 1] # update position for trimming self.ref_position = ( self.ref_position[0], self.ref_position[1] + ltrim, self.ref_position[1] + ltrim + rtrim + 1, ) def get_heteros(self): self.hidx = [ i for (i, j) in enumerate(self.consens) if j.decode() in list("RKSYWM")] self.nheteros = len(self.hidx) def filter_maxhetero(self): if not nfilter2(self.nheteros, self.maxhet): self.filters['maxh'] += 1 return 0 return 1 def filter_maxN_minLen(self): if not nfilter3(self.consens, self.maxn): self.filters['maxn'] += 1 return 0 return 1 def get_alleles(self): # if less than two Hs then there is only one allele if len(self.hidx) < 2: self.nalleles = 1 else: # array of hetero sites harray = self.arrayed[:, self.hidx] # remove reads with - or N at variable site harray = harray[~np.any(harray == b"-", axis=1)] harray = harray[~np.any(harray == b"N", axis=1)] # get counts of each allele (e.g., AT:2, CG:2) ccx = Counter([tuple(i) for i in harray]) # remove low freq alleles if more than 2, since they may reflect # seq errors at hetero sites, making a third allele, or a new # allelic combination that is not real. if len(ccx) > 2: totdepth = harray.shape[0] cutoff = max(1, totdepth // 10) alleles = [i for i in ccx if ccx[i] > cutoff] else: alleles = ccx.keys() self.nalleles = len(alleles) if self.nalleles == 2: try: self.consens = storealleles(self.consens, self.hidx, alleles) except (IndexError, KeyError): # the H sites do not form good alleles ip.logger.info("failed at phasing loc, skipping") ip.logger.info(""" consens %s hidx %s alleles %s """, self.consens, self.hidx, alleles) # todo return phase or no-phase and store for later # ... def store_data(self): # current counter cidx = self.counters["nconsens"] self.nallel[cidx] = self.nalleles self.refarr[cidx] = self.ref_position # store a reduced array with only CATG catg = np.array( [np.sum(self.arrayed == i, axis=0) for i in [b'C', b'A', b'T', b'G']], dtype='uint16').T # do not allow ints larger than 65535 (uint16) self.catarr[cidx, :catg.shape[0], :] = catg # store the seqdata and advance counters self.storeseq[cidx] = b"".join(list(self.consens)) self.counters["name"] += 1 self.counters["nconsens"] += 1 self.counters["heteros"] += self.nheteros # ---------------------------------------------- def write_chunk(self): # find last empty size end = np.where(np.all(self.refarr == 0, axis=1))[0] if np.any(end): end = end.min() else: end = self.refarr.shape[0] # write final consens string chunk consenshandle = os.path.join( self.data.tmpdir, "{}_tmpcons.{}.{}".format(self.sample.name, end, self.tmpnum)) # write chunk. if self.storeseq: with open(consenshandle, 'wt') as outfile: # denovo just write the consens simple if not self.isref: outfile.write( "\n".join( [">" + self.sample.name + "_" + str(key) + \ "\n" + self.storeseq[key].decode() for key in self.storeseq])) # reference needs to store if the read is revcomp to reference else: outfile.write( "\n".join( ["{}:{}:{}-{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}" .format( self.sample.name, self.refarr[i][0], self.refarr[i][1], self.refarr[i][2], 0, self.revdict[self.refarr[i][0] - 1], self.refarr[i][1], 0, #"{}M".format(refarr[i][2] - refarr[i][1]), make_allele_cigar( "".join(['.' if i.islower() else i for i in self.storeseq[i].decode()]), ".", "S", ), #"{}M".format(len(self.storeseq[i].decode())), "*", 0, self.refarr[i][2] - self.refarr[i][1], self.storeseq[i].decode(), "*", # "XT:Z:{}".format( # make_indel_cigar(storeseq[i].decode()) # ), ) for i in self.storeseq.keys()] )) # store reduced size arrays with indexes matching to keep indexes tmp5 = consenshandle.replace("_tmpcons.", "_tmpcats.") with h5py.File(tmp5, 'w') as io5: # reduce catarr to uint16 size for easier storage self.catarr[self.catarr > 65535] = 65535 self.catarr = self.catarr.astype(np.uint16) io5.create_dataset(name="cats", data=self.catarr[:end]) io5.create_dataset(name="alls", data=self.nallel[:end]) io5.create_dataset(name="chroms", data=self.refarr[:end]) del self.catarr del self.nallel del self.refarr # return stats self.counters['nsites'] = sum([len(i) for i in self.storeseq.values()]) del self.storeseq # In[ ]: res = ipyclient.apply(process_chunks, *(data, sample, chunkfile, isref)) res.result() # In[5]: statsdicts = Processor(data, sample, chunkfile, isref) # In[8]: statsdicts.counters, statsdicts.filters # In[11]: aa = ipyclient[0].apply(Processor, ) # In[141]: # write sequences to SAM file combs1 = glob.glob(os.path.join( data.tmpdir, "{}_tmpcons.*".format(sample.name))) combs1.sort(key=lambda x: int(x.split(".")[-1])) combs1 # In[138]: # write sequences to SAM file combs1 = glob.glob(os.path.join( data.tmpdir, "{}_tmpcons.*".format(sample.name))) combs1.sort(key=lambda x: int(x.split(".")[-1])) # open sam handle for writing to bam samfile = sample.files.consens.replace(".bam", ".sam") with open(samfile, 'w') as outf: # parse fai file for writing headers fai = "{}.fai".format(data.paramsdict["reference_sequence"]) fad = pd.read_csv(fai, sep="\t", names=["SN", "LN", "POS", "N1", "N2"]) headers = ["@HD\tVN:1.0\tSO:coordinate"] headers += [ "@SQ\tSN:{}\tLN:{}".format(i, j) for (i, j) in zip(fad["SN"], fad["LN"]) ] outf.write("\n".join(headers) + "\n") # write to file with sample names imputed to line up with catg array counter = 0 for fname in combs1: with open(fname) as infile: # impute catg ordered seqnames data = infile.readlines() fdata = [] for line in data: name, chrom, rest = line.rsplit(":", 2) fdata.append( "{}_{}:{}:{}".format(name, counter, chrom, rest) ) counter += 1 outf.write("".join(fdata) + "\n") # In[14]: class Processor: def __init__(self, data, sample, chunkfile, isref): self.data = data self.sample = sample self.chunkfile = chunkfile self.isref = isref # prepare the processor self.set_params() self.init_counters() self.init_arrays() self.chroms2ints() # run the processor def run(self): self.process_chunk() self.write_chunk() def set_params(self): # set max limits self.tmpnum = int(self.chunkfile.split(".")[-1]) self.optim = int(self.chunkfile.split(".")[-2]) self.este = self.data.stats.error_est.mean() self.esth = self.data.stats.hetero_est.mean() self.maxlen = self.data._hackersonly["max_fragment_length"] if 'pair' in self.data.paramsdict["datatype"]: self.maxhet = sum(self.data.paramsdict["max_Hs_consens"]) self.maxn = sum(self.data.paramsdict["max_Ns_consens"]) else: self.maxhet = self.data.paramsdict["max_Hs_consens"][0] self.maxn = self.data.paramsdict["max_Ns_consens"][0] # not enforced for ref if self.isref: self.maxn = int(1e6) def init_counters(self): # store data for stats counters. self.counters = { "name": self.tmpnum, "heteros": 0, "nsites": 0, "nconsens": 0, } # store data for what got filtered self.filters = { "depth": 0, "maxh": 0, "maxn": 0, } # store data for writing self.storeseq = {} def init_arrays(self): # local copies to use to fill the arrays self.catarr = np.zeros((self.optim, self.maxlen, 4), dtype=np.uint32) self.nallel = np.zeros((self.optim, ), dtype=np.uint8) self.refarr = np.zeros((self.optim, 3), dtype=np.int64) def chroms2ints(self): # if reference-mapped then parse the fai to get index number of chroms if self.isref: fai = pd.read_csv( self.data.paramsdict["reference_sequence"] + ".fai", names=['scaffold', 'size', 'sumsize', 'a', 'b'], sep="\t") self.faidict = {j: i for i, j in enumerate(fai.scaffold)} self.revdict = {j: i for i, j in self.faidict.items()} # --------------------------------------------- def process_chunk(self): # stream through the clusters with open(self.chunkfile, 'rb') as inclust: pairdealer = izip(*[iter(inclust)] * 2) done = 0 while not done: done, chunk = clustdealer(pairdealer, 1) if chunk: self.parse_cluster(chunk) if self.filter_mindepth(): self.build_consens_and_array() self.get_heteros() if self.filter_maxhetero(): if self.filter_maxN_minLen(): self.get_alleles() self.store_data() def parse_cluster(self, chunk): # get names and seqs piece = chunk[0].decode().strip().split("\n") self.names = piece[0::2] self.seqs = piece[1::2] # pull replicate read info from seqs self.reps = [int(n.split(";")[-2][5:]) for n in self.names] # ref positions self.ref_position = (-1, 0, 0) if self.isref: # parse position from name string rname = self.names[0].rsplit(";", 2)[0] chrom, posish = rname.rsplit(":") pos0, pos1 = posish.split("-") # pull idx from .fai reference dict chromint = self.faidict[chrom] + 1 self.ref_position = (int(chromint), int(pos0), int(pos1)) def filter_mindepth(self): # exit if nreps is less than mindepth setting if not nfilter1(self.data, self.reps): self.filters['depth'] += 1 return 0 return 1 def build_consens_and_array(self): # get stacks of base counts sseqs = [list(seq) for seq in self.seqs] arrayed = np.concatenate( [[seq] * rep for (seq, rep) in zip(sseqs, self.reps)] ).astype(bytes) # ! enforce maxlen limit ! self.arrayed = arrayed[:, :self.maxlen] # get unphased consens sequence from arrayed self.consens = base_caller( self.arrayed, self.data.paramsdict["mindepth_majrule"], self.data.paramsdict["mindepth_statistical"], self.esth, self.este, ) # trim Ns from the left and right ends self.consens[self.consens == b"-"] = b"N" trim = np.where(self.consens != b"N")[0] ltrim, rtrim = trim.min(), trim.max() self.consens = self.consens[ltrim:rtrim + 1] self.arrayed = self.arrayed[:, ltrim:rtrim + 1] # update position for trimming self.ref_position = ( self.ref_position[0], self.ref_position[1] + ltrim, self.ref_position[1] + ltrim + rtrim + 1, ) def get_heteros(self): self.hidx = [ i for (i, j) in enumerate(self.consens) if j.decode() in list("RKSYWM")] self.nheteros = len(self.hidx) def filter_maxhetero(self): if not nfilter2(self.nheteros, self.maxhet): self.filters['maxh'] += 1 return 0 return 1 def filter_maxN_minLen(self): if not nfilter3(self.consens, self.maxn): self.filters['maxn'] += 1 return 0 return 1 def get_alleles(self): # if less than two Hs then there is only one allele if len(self.hidx) < 2: self.nalleles = 1 else: # array of hetero sites harray = self.arrayed[:, self.hidx] # remove reads with - or N at variable site harray = harray[~np.any(harray == b"-", axis=1)] harray = harray[~np.any(harray == b"N", axis=1)] # get counts of each allele (e.g., AT:2, CG:2) ccx = Counter([tuple(i) for i in harray]) # remove low freq alleles if more than 2, since they may reflect # seq errors at hetero sites, making a third allele, or a new # allelic combination that is not real. if len(ccx) > 2: totdepth = harray.shape[0] cutoff = max(1, totdepth // 10) alleles = [i for i in ccx if ccx[i] > cutoff] else: alleles = ccx.keys() self.nalleles = len(alleles) if self.nalleles == 2: try: self.consens = storealleles(self.consens, self.hidx, alleles) except (IndexError, KeyError): # the H sites do not form good alleles ip.logger.info("failed at phasing loc, skipping") ip.logger.info(""" consens %s hidx %s alleles %s """, self.consens, self.hidx, alleles) def store_data(self): # current counter cidx = self.counters["nconsens"] self.nallel[cidx] = self.nalleles self.refarr[cidx] = self.ref_position # store a reduced array with only CATG catg = np.array( [np.sum(self.arrayed == i, axis=0) for i in [b'C', b'A', b'T', b'G']], dtype='uint32').T self.catarr[cidx, :catg.shape[0], :] = catg # store the seqdata and advance counters self.storeseq[cidx] = b"".join(list(self.consens)) self.counters["name"] += 1 self.counters["nconsens"] += 1 self.counters["heteros"] += self.nheteros # ---------------------------------------------- def write_chunk(self): # find last empty size end = np.where(np.all(self.refarr == 0, axis=1))[0] if np.any(end): end = end.min() else: end = self.refarr.shape[0] # write final consens string chunk consenshandle = os.path.join( self.data.tmpdir, "{}_tmpcons.{}.{}".format(self.sample.name, end, self.tmpnum)) # write chunk. if self.storeseq: with open(consenshandle, 'wt') as outfile: # denovo just write the consens simple if not self.isref: outfile.write( "\n".join( [">" + self.sample.name + "_" + str(key) + \ "\n" + self.storeseq[key].decode() for key in self.storeseq])) # reference needs to store if the read is revcomp to reference else: outfile.write( "\n".join( ["{}:{}:{}-{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}" .format( self.sample.name, self.refarr[i][0], self.refarr[i][1], self.refarr[i][2], 0, self.revdict[self.refarr[i][0] - 1], self.refarr[i][1], 0, # why doesn't this line up with +2 ? # make_indel_cigar(storeseq[i].decode()), #"{}M".format(refarr[i][2] - refarr[i][1]), "{}M".format(len(self.storeseq[i].decode())), "*", 0, ## + 2 here self.refarr[i][2] - self.refarr[i][1], #len(self.storeseq[i].decode()), self.storeseq[i].decode(), "*", # "XT:Z:{}".format( # make_indel_cigar(storeseq[i].decode()) # ), ) for i in self.storeseq.keys()] )) # store reduced size arrays with indexes matching to keep indexes tmp5 = consenshandle.replace("_tmpcons.", "_tmpcats.") with h5py.File(tmp5, 'w') as io5: io5.create_dataset(name="cats", data=self.catarr[:end]) io5.create_dataset(name="alls", data=self.nallel[:end]) io5.create_dataset(name="chroms", data=self.refarr[:end]) del self.catarr del self.nallel del self.refarr # return stats self.counters['nsites'] = sum([len(i) for i in self.storeseq.values()]) del self.storeseq #return self.counters, self.filters # In[87]: with open(self.chunkfile, 'rb') as inclust: pairdealer = izip(*[iter(inclust)] * 2) for i in range(20): _, chunk = clustdealer(pairdealer, 1) if chunk: self.parse_cluster(chunk) if self.filter_mindepth(): print(self.reps) self.build_consens_and_array() # get stacks of base counts sseqs = [list(seq) for seq in self.seqs] arrayed = np.concatenate( [[seq] * rep for (seq, rep) in zip(sseqs, self.reps)] ).astype(bytes) self.arrayed = arrayed[:, :self.maxlen] # get consens call for each site, applies paralog-x-site filter self.consens = base_caller( arrayed, self.data.paramsdict["mindepth_majrule"], self.data.paramsdict["mindepth_statistical"], self.esth, self.este, ) self.consens[self.consens == b"-"] = b"N" trim = np.where(self.consens != b"N")[0] ltrim, rtrim = trim.min(), trim.max() self.consens = self.consens[ltrim:rtrim + 1] self.arrayed = self.arrayed[:, ltrim:rtrim + 1] print(self.ref_position) # update position for trimming self.ref_position = ( self.ref_position[0], self.ref_position[1] + ltrim, self.ref_position[1] + ltrim + rtrim + 1, ) print(self.ref_position) # In[39]: # min where it is not N np.argmin(np.where(arr != "N")) np.where(arr != "N") # In[40]: consens = arr trim = np.where(consens != "N")[0] ltrim, rtrim = trim.min(), trim.max() consens = consens[ltrim:rtrim + 1] # In[71]: print(consens) print(pos[0] + ltrim, rtrim + 1) # In[54]: arr[7:17] # In[4]: test2.run("5", force=True) # In[50]: from ipyrad.assemble.consens_se import * self = Step5(test2, True, ipyclient) self.remote_calculate_depths() self.remote_make_chunks() # In[54]: self.data.tmpdir test2.tmpdir # In[58]: data = test2 sample = data.samples["PZ03concat"] isref = True for sample in [sample]: chunks = glob.glob(os.path.join( self.data.tmpdir, "{}.chunk.*".format(sample.name))) chunks.sort(key=lambda x: int(x.split('.')[-1])) chunkfile = chunks[0] chunkfile # In[125]: # temporarily store the mean estimates to Assembly este = data.stats.error_est.mean() esth = data.stats.hetero_est.mean() # get index number from tmp file name tmpnum = int(chunkfile.split(".")[-1]) optim = int(chunkfile.split(".")[-2]) # prepare data for reading and use global maxfraglen clusters = open(chunkfile, 'rb') pairdealer = izip(*[iter(clusters)] * 2) maxlen = data._hackersonly["max_fragment_length"] # local copies to use to fill the arrays catarr = np.zeros((optim, maxlen, 4), dtype=np.uint32) nallel = np.zeros((optim, ), dtype=np.uint8) refarr = np.zeros((optim, 3), dtype=np.int64) # if reference-mapped then parse the fai to get index number of chroms if isref: fai = pd.read_csv( data.paramsdict["reference_sequence"] + ".fai", names=['scaffold', 'size', 'sumsize', 'a', 'b'], sep="\t") faidict = {j: i for i, j in enumerate(fai.scaffold)} revdict = {j: i for i, j in faidict.items()} # store data for stats counters. counters = { "name": tmpnum, "heteros": 0, "nsites": 0, "nconsens": 0, } # store data for what got filtered filters = { "depth": 0, "maxh": 0, "maxn": 0, } # store data for writing storeseq = {} ## set max limits if 'pair' in data.paramsdict["datatype"]: maxhet = sum(data.paramsdict["max_Hs_consens"]) maxn = sum(data.paramsdict["max_Ns_consens"]) else: maxhet = data.paramsdict["max_Hs_consens"][0] maxn = data.paramsdict["max_Ns_consens"][0] # In[126]: # load the refmap dictionary if refmapping done = 0 while counters["name"] < 20: done, chunk = clustdealer(pairdealer, 1) if chunk: # get names and seqs piece = chunk[0].decode().strip().split("\n") names = piece[0::2] seqs = piece[1::2] # pull replicate read info from seqs reps = [int(sname.split(";")[-2][5:]) for sname in names] ref_position = (-1, 0, 0) if isref: # parse position from name string rname = names[0].rsplit(";", 2)[0] chrom, posish = rname.rsplit(":") pos0, pos1 = posish.split("-") # pull idx from .fai reference dict chromint = faidict[chrom] + 1 ref_position = (int(chromint), int(pos0), int(pos1)) # apply filters and fill arrays if nfilter1(data, reps): # get stacks of base counts sseqs = [list(seq) for seq in seqs] arrayed = np.concatenate( [[seq] * rep for seq, rep in zip(sseqs, reps)] ).astype(bytes) arrayed = arrayed[:, :maxlen] # get consens call for each site, applies paralog-x-site filter consens = base_caller( arrayed, data.paramsdict["mindepth_majrule"], data.paramsdict["mindepth_statistical"], esth, este, ) print(rname) print(len(consens), ref_position[2] - ref_position[1]) # apply a filter to remove low coverage sites/Ns that # are likely sequence repeat errors. This is only applied to # clusters that already passed the read-depth filter (1) # and is not applied to ref aligned data which is expected to # have internal spacers that we leave in place here. if b"N" in consens: if not isref: try: consens, arrayed = removerepeats(consens, arrayed) except ValueError: ip.logger.info( "Caught bad chunk w/ all Ns. Skip it.") continue else: pass #print("here") # get hetero sites hidx = [ i for (i, j) in enumerate(consens) if j.decode() in list("RKSYWM")] nheteros = len(hidx) # filter for max number of hetero sites if nfilter2(nheteros, maxhet): # filter for maxN, & minlen if nfilter3(consens, maxn): # counter right now current = counters["nconsens"] # get N alleles and get lower case in consens consens, nhaps = nfilter4(consens, hidx, arrayed) # store the number of alleles observed nallel[current] = nhaps # store a reduced array with only CATG catg = np.array( [np.sum(arrayed == i, axis=0) for i in [b'C', b'A', b'T', b'G']], dtype='uint32').T catarr[current, :catg.shape[0], :] = catg refarr[current] = ref_position # store the seqdata for tmpchunk storeseq[current] = b"".join(list(consens)) counters["name"] += 1 counters["nconsens"] += 1 counters["heteros"] += nheteros else: filters['maxn'] += 1 else: filters['maxh'] += 1 else: filters['depth'] += 1 # close infile io clusters.close() # In[127]: 67768-68840 # In[112]: ttt = list("NNNNNNNNNNNAAAAAAAAAAAAAAAAAAANNNNNNNNNNNNNN") ttt = np.array(ttt, dtype=bytes) ttt # In[102]: ttt.nbytes # In[107]: ttt = np.array(list(b"NNNNNNNNNNNAAAAAAAAAAAAAAAAAAANNNNNNNNNNNNNN")) ttt.nbytes # In[83]: np.argmin(consens == b"N"), np.argmax(consens == b"N") # In[17]: # find last empty size end = np.where(np.all(refarr == 0, axis=1))[0] if np.any(end): end = end.min() else: end = refarr.shape[0] # write final consens string chunk consenshandle = os.path.join( data.tmpdir, "{}_tmpcons.{}.{}".format(sample.name, end, tmpnum)) # write chunk. if storeseq: with open(consenshandle, 'wt') as outfile: # denovo just write the consens simple if not isref: outfile.write( "\n".join([">" + sample.name + "_" + str(key) + \ "\n" + storeseq[key].decode() for key in storeseq])) # reference needs to store if the read is revcomp to reference else: outfile.write( "\n".join( ["{}:{}:{}-{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}" .format( sample.name, refarr[i][0], refarr[i][1], refarr[i][2], 0, revdict[refarr[i][0] - 1], refarr[i][1], 0, # why doesn't this line up with +2 ? # make_indel_cigar(storeseq[i].decode()), #"{}M".format(refarr[i][2] - refarr[i][1]), "{}M".format(len(storeseq[i].decode())), "*", 0, ## + 2 here refarr[i][2] - refarr[i][1], storeseq[i].decode(), "*", # "XT:Z:{}".format( # make_indel_cigar(storeseq[i].decode()) # ), ) for i in storeseq.keys()] )) # store reduced size arrays with indexes matching to keep indexes tmp5 = consenshandle.replace("_tmpcons.", "_tmpcats.") with h5py.File(tmp5, 'w') as io5: io5.create_dataset(name="cats", data=catarr[:end]) io5.create_dataset(name="alls", data=nallel[:end]) io5.create_dataset(name="chroms", data=refarr[:end]) del catarr del nallel del refarr # return stats counters['nsites'] = sum([len(i) for i in storeseq.values()]) del storeseq return counters, filters # In[ ]: # In[ ]: # In[ ]: # In[98]: data = ip.load_json("6-tortas/tortas.json") # In[12]: data = ip.Assembly("tortas") data.set_params("project_dir", "6-tortas") data.set_params("sorted_fastq_path", "/home/deren/Documents/Tortoise/fastq-concats-GG-d10-118/*.gz") data.set_params("restriction_overhang", ("CATG", "AATT")) data.set_params("datatype", "pairddrad") data.set_params("assembly_method", "reference") data.set_params("reference_sequence", "/home/deren/Documents/Tortoise/lgeorge.genome.fa") data.get_params() # In[5]: data.ipcluster['threads'] = 4 # In[6]: data.run("2") # In[7]: data.stats.head() # In[12]: from ipyrad.assemble.clustmap import * self = Step3(data, 8, True, ipyclient) # In[14]: self.run() # In[ ]: # In[38]: test = self.data.branch('test', [i.name for i in self.samples[:4]]) # In[40]: test.run('45') # In[3]: test = ip.load_json("6-tortas/test.json") # In[29]: test.run("5", force=True) # In[4]: data = test samples = list(data.samples.values()) # In[5]: from ipyrad.assemble.cluster_across import * self = Step6(data, True, ipyclient) # In[6]: self.remote_concat_bams() # In[40]: self.remote_build_ref_regions() # In[41]: # prepare i/o bamfile = AlignmentFile( os.path.join( self.data.dirs.across, "{}.cat.sorted.bam".format(self.data.name)), 'rb') outfile = gzip.open( os.path.join( self.data.dirs.across, "{}.catcons.gz".format(self.data.name)), 'wb') # write header with sample names outfile.write( b" ".join([i.name.encode() for i in self.samples]) + b"\n") # In[35]: def remote_build_ref_clusters(self): "build clusters and find variants/indels to store" # prepare i/o bamfile = AlignmentFile( os.path.join( self.data.dirs.across, "{}.cat.sorted.bam".format(self.data.name)), 'rb') outfile = gzip.open( os.path.join( self.data.dirs.across, "{}.catcons.gz".format(self.data.name)), 'wb') # write header with sample names snames = sorted([i.name for i in self.samples]) nsamples = len(snames) outfile.write( b" ".join([i.encode() for i in snames]) + b"\n") # get clusters iregions = iter(self.regions) clusts = [] while 1: # pull in the cluster try: region = next(iregions) reads = bamfile.fetch(*region) except StopIteration: break # pull in the reference for this region refn, refs = get_ref_region( data.paramsdict["reference_sequence"], region[0], region[1] + 1, region[2] + 1, ) # build cluster dict for sorting rdict = {} for read in reads: rdict[read.qname] = read.seq keys = sorted(rdict.keys(), key=lambda x: x.rsplit(":", 2)[0]) # make empty array arr = np.zeros((nsamples + 1, len(refs)), dtype=bytes) # fill it arr[0] = list(refs) for idx, key in enumerate(keys): # get start and stop from name sname = key.rsplit("_", 1)[0] rstart, rstop = key.rsplit(":", 2)[-1].split("-") sidx = snames.index(sname) # get start relative to ref start = int(rstart) - int(region[1]) - 1 stop = start + int(rstop) - int(rstart) print(sidx + 1, start, stop, arr.shape[1]) try: arr[sidx + 1, int(start): int(stop)] = list(rdict[key]) except ValueError: print(rdict[key]) # get consens seq and variant site index print("") clust = [] avars = refvars(arr.view(np.uint8), PSEUDO_REF) dat = b"".join(avars.view("S1")[:, 0]).decode() snpstring = "".join(["*" if i else " " for i in avars[:, 1]]) clust.append("ref_{}:{}-{}\n{}".format(*region, dat)) # or, just build variant string (or don't...) # write all loci with [locids, nsnps, npis, nindels, ?] # for key in keys: # clust.append("{}\n{}".format(key, rdict[key])) # clust.append("SNPs\n" + snpstring) # clusts.append("\n".join(clust)) outfile.write(str.encode("\n//\n//\n".join(clusts) + "\n//\n//\n")) outfile.close() # In[63]: iregions = iter(self.regions[:50]) def build_ref_clusters2(self): "build clusters and find variants/indels to store" # prepare i/o bamfile = AlignmentFile( os.path.join( self.data.dirs.across, "{}.cat.sorted.bam".format(self.data.name)), 'rb') outfile = gzip.open( os.path.join( self.data.dirs.across, "{}.catcons.gz".format(self.data.name)), 'wb') # write header with sample names snames = sorted([i.name for i in self.samples]) nsamples = len(snames) outfile.write( b" ".join([i.encode() for i in snames]) + b"\n") # get clusters clusts = [] while 1: # pull in the cluster try: region = next(iregions) reads = bamfile.fetch(*region) except StopIteration: break # pull in the reference for this region refn, refs = get_ref_region( data.paramsdict["reference_sequence"], region[0], region[1] + 1, region[2] + 1, ) # build cluster dict for sorting rdict = {} for read in reads: rdict[read.qname] = read.seq keys = sorted(rdict.keys(), key=lambda x: x.rsplit(":", 2)[0]) # make empty array arr = np.zeros((len(rdict), len(refs)), dtype=bytes) # fill it arr[0] = list(refs) for idx, key in enumerate(keys): # get start and stop from name sname = key.rsplit("_", 1)[0] rstart, rstop = key.rsplit(":", 2)[-1].split("-") # get start relative to ref start = int(rstart) - int(region[1]) - 1 stop = start + int(rstop) - int(rstart) try: sequence = list(rdict[key]) arr[idx, int(start): int(stop)] = sequence except ValueError: print(key) print(sname, len(sequence), start, stop, arr.shape[1]) print(rdict[key]) return arr build_ref_clusters2(self) # In[ ]: # get consens seq and variant site index print("") clust = [] avars = refvars(arr.view(np.uint8), PSEUDO_REF) dat = b"".join(avars.view("S1")[:, 0]).decode() snpstring = "".join(["*" if i else " " for i in avars[:, 1]]) clust.append("ref_{}:{}-{}\n{}".format(*region, dat)) # or, just build variant string (or don't...) # write all loci with [locids, nsnps, npis, nindels, ?] # for key in keys: # clust.append("{}\n{}".format(key, rdict[key])) # clust.append("SNPs\n" + snpstring) # clusts.append("\n".join(clust)) outfile.write(str.encode("\n//\n//\n".join(clusts) + "\n//\n//\n")) outfile.close() # In[34]: remote_build_ref_clusters(self) # In[12]: "use bedtools to pull in clusters and match catgs with alignments" cmd1 = [ ipyrad.bins.bedtools, "bamtobed", "-i", os.path.join( data.dirs.across, "{}.cat.sorted.bam".format(self.data.name) ) ] " ".join(cmd1) # In[13]: cmd2 = [ ipyrad.bins.bedtools, "merge", "-d", "0", "-i", "-", ] " ".join(cmd2) # In[35]: self.remote_build_ref_clusters() # In[ ]: # In[5]: test.run("6", ipyclient=ipyclient) # In[4]: test2 = test.branch("test2") # In[5]: from ipyrad.assemble.consens_se import * # In[6]: self = Step5(test2, True, ipyclient) # In[7]: self.remote_calculate_depths() self.remote_make_chunks() # In[8]: statsdicts = self.remote_process_chunks() # In[9]: sample = self.samples[0] data = self.data # In[10]: # write sequences to SAM file combs1 = glob.glob(os.path.join( data.tmpdir, "{}_tmpcons.*".format(sample.name))) combs1.sort(key=lambda x: int(x.split(".")[-1])) # open sam handle for writing to bam samfile = sample.files.consens.replace(".bam", ".sam") with open(samfile, 'w') as outf: # parse fai file for writing headers fai = "{}.fai".format(data.paramsdict["reference_sequence"]) fad = pd.read_csv(fai, sep="\t", names=["SN", "LN", "POS", "N1", "N2"]) headers = ["@HD\tVN:1.0\tSO:coordinate"] headers += [ "@SQ\tSN:{}\tLN:{}".format(i, j) for (i, j) in zip(fad["SN"], fad["LN"]) ] outf.write("\n".join(headers) + "\n") # write to file with sample names imputed to line up with catg array counter = 0 for fname in combs1: with open(fname) as infile: # impute catg ordered seqnames data = infile.readlines() fdata = [] for line in data: name, chrom, rest = line.rsplit(":", 2) fdata.append( "{}_{}:{}:{}".format(name, counter, chrom, rest) ) counter += 1 outf.write("".join(fdata) + "\n") # In[12]: cmd = [ip.bins.samtools, "view", "-Sb", samfile] with open(sample.files.consens, 'w') as outbam: proc = sps.Popen(cmd, stderr=sps.PIPE, stdout=outbam) comm = proc.communicate()[1] if proc.returncode: raise IPyradError("error in samtools: {}".format(comm)) # In[14]: 368852-370424 # In[8]: self.remote_concatenate_chunks() self.data_store(statsdicts) # In[ ]: shutil.rmtree(self.data.tmpdir) self.data.save() # In[ ]: # In[5]: from ipyrad.assemble.cluster_across import * # In[7]: self = Step6(test, True, ipyclient) # In[40]: self.remote_concat_bams() # In[39]: self.remote_build_ref_regions() # In[41]: self.regions[:10] # In[95]: # access reads from bam file using pysam bamfile = AlignmentFile( os.path.join( self.data.dirs.across, "cat.sorted.bam"), 'rb') # catcons output file for raw clusters and db samples outfile = gzip.open( os.path.join( self.data.dirs.across, "{}.catcons.gz".format(self.data.name)), 'wb') # write header line to catcons with sample names snames = ['ref'] + sorted([i.name for i in self.samples]) outfile.write( b" ".join([i.encode() for i in snames]) + b"\n") for region in self.regions[:100]: reads = bamfile.fetch(*region) # get reference sequence refn, refs = get_ref_region( self.data.paramsdict["reference_sequence"], region[0], region[1] + 1, region[2] + 1 ) # build cluster dict for sorting rdict = {} for read in reads: rdict[read.qname] = read.seq keys = sorted(rdict.keys(), key=lambda x: x.rsplit(":", 2)[0]) # make an empty array arr = np.zeros((len(snames), len(refs)), dtype=bytes) # fill the array arr[0] = list(refs) for idx, key in enumerate(keys): # get start and stop from this seq sname = key.rsplit("_", 1)[0] rstart, rstop = key.rsplit(":", 2)[-1].split("-") sidx = snames.index(sname) # get start and stop relative to reference region start = int(rstart) - int(region[1]) - 1 stop = start + int(rstop) - int(rstart) print(key) print(rstart, rstop, start, stop, len(rdict[key]), len(refs)) arr[sidx, start: stop] = list(rdict[key]) print("") arr[arr == b""] = b"N" clusts.append(b"\n".join([b"".join(line) for line in arr])) # In[88]: # avars = refvars(arr.view(np.uint8), PSEUDO_REF) # dat = b"".join(avars.view("S1")[:, 0]).decode() # snpstring = "".join(["*" if i else " " for i in avars[:, 1]]) # cname = "ref_{}:{}-{}\n{}".format(*region, dat) # cname # In[94]: 67768 -68840 # In[89]: clusts.append(b"\n".join([b"".join(line) for line in arr])) # In[15]: outfile.close() # In[4]: test.run('6') # In[26]: ## do cigar building again, this time with orientations data = self.data sample = list(data.samples.values())[0] sample.name # In[21]: # get all regions with reads. Generator to yield (str, int, int) fullregions = bedtools_merge(data, sample).strip().split("\n") regions = (i.split("\t") for i in fullregions) regions = [(i, int(j), int(k)) for (i, j, k) in regions] # In[27]: regs = regions[:20] # In[35]: # access reads from bam file using pysam bamfile = AlignmentFile( os.path.join( data.dirs.refmapping, "{}-mapped-sorted.bam".format(sample.name)), 'rb') # iterate over all regions opath = os.path.join( data.dirs.clusts, "{}.alt.clustS.gz".format(sample.name)) out = gzip.open(opath, 'wt') idx = 0 clusters = [] for reg in regs: # uncomment and compare against ref sequence when testing ref = get_ref_region(data.paramsdict["reference_sequence"], *reg) reads = bamfile.fetch(*reg) # store reads in a dict rdict = {} # paired-end data cluster building if "pair" in data.paramsdict["datatype"]: # match paired reads together in a dictionary for read in reads: print("\t".join( str(i) for i in [read.is_read1, read.is_reverse, read.seq[:6], read.seq[-6:], *reg ])) if read.qname not in rdict: rdict[read.qname] = [read, None] else: rdict[read.qname][1] = read # sort keys by derep number keys = sorted( rdict.keys(), key=lambda x: int(x.split("=")[-1]), reverse=True) # build the cluster based on map positions, orientation, cigar clust = [] clust.append("{}\n{}".format('ref', ref[1])) for key in keys: r1, r2 = rdict[key] if r1 and r2: #lref = len(ref[1]) aref = np.array(list(ref[1])) lref = reg[2] - reg[1] arr1 = np.zeros(lref, dtype="U1") arr2 = np.zeros(lref, dtype="U1") arr1.fill("-") arr2.fill("-") # how far ahead of the start does this read begin seq = cigared(r1.seq, r1.cigar) start = r1.reference_start - reg[1] arr1[start:start + len(seq)] = list(seq) seq = cigared(r2.seq, r2.cigar) start = r2.reference_start - reg[1] arr2[start:start + len(seq)] = list(seq) arr3 = join_arrays(arr1, arr2) pairseq = "".join(arr3) ori = "+" if r1.is_reverse: ori = "-" derep = r1.qname.split("=")[-1] rname = "{}:{}-{};size={};{}".format(*reg, derep, ori) clust.append("{}\n{}".format(rname, pairseq)) # single-end data cluster building else: for read in reads: rdict[read.qname] = read # sort keys by derep number keys = sorted( rdict.keys(), key=lambda x: int(x.split("=")[-1]), reverse=True) # build the cluster based on map positions, orientation, cigar clust = [] for key in keys: r1 = rdict[key] #aref = np.array(list(ref[1])) lref = reg[2] - reg[1] arr1 = np.zeros(lref, dtype="U1") arr1.fill("-") # how far ahead of the start does this read begin seq = cigared(r1.seq, r1.cigar) start = r1.reference_start - reg[1] arr1[start:start + len(seq)] = list(seq) aseq = "".join(arr1) ori = "+" if r1.is_reverse: ori = "-" derep = r1.qname.split("=")[-1] rname = "{}:{}-{};size={};{}".format(*reg, derep, ori) clust.append("{}\n{}".format(rname, aseq)) # store this cluster if clust: clusters.append("\n".join(clust)) idx += 1 # if 1000 clusters stored then write to disk if not idx % 10000: if clusters: out.write("\n//\n//\n".join(clusters) + "\n//\n//\n") clusters = [] # write final remaining clusters to disk if clusters: out.write("\n//\n//\n".join(clusters) + "\n//\n//\n") out.close() # In[16]: ## branch to reduce size # In[17]: ## run step 6 until building clusters and check orientations # In[ ]: # In[ ]: self.remote_index_refs() self.remote_run( function=concat_multiple_edits, printstr=("concatenating ", "s3"), args=(), ) self.remote_run( function=merge_end_to_end, printstr=("join unmerged pairs ", "s3"), args=(False, False,), ) self.remote_run( function=dereplicate, printstr=("dereplicating ", "s3"), args=(self.nthreads,), threaded=True, ) self.remote_run( function=split_endtoend_reads, printstr=("splitting dereps ", "s3"), args=(), ) self.remote_run( function=mapping_reads, printstr=("mapping reads ", "s3"), args=(self.nthreads,), threaded=True, ) # In[ ]: # In[16]: data.run("123") # In[13]: step.remote_concat_bams() # In[14]: step.remote_build_ref_regions() # In[27]: self = step # In[29]: regs = [next(self.regions) for i in range(10)] regs # In[139]: # access reads from bam file using pysam bamfile = AlignmentFile( os.path.join( self.data.dirs.across, "cat.sorted.bam"), 'rb') # catcons output file for raw clusters and db samples outfile = gzip.open( os.path.join( self.data.dirs.across, "{}.catcons.gz".format(self.data.name)), 'wb') # write header line to catcons with sample names outfile.write( b" ".join([i.name.encode() for i in self.samples]) + b"\n") # get clusters lidx = 0 clusts = [] # while 1: # try: # region = next(self.regions) # reads = bamfile.fetch(*region) # except StopIteration: # break for region in regs: reads = bamfile.fetch(*region) # get reference refn, refs = get_ref_region( data.paramsdict["reference_sequence"], region[0], region[1]+1, region[2]+1) # build cluster dict for sorting rdict = {} for read in reads: print(read) rdict[read.qname] = read.seq keys = sorted(rdict.keys(), key=lambda x: x.rsplit(":", 2)[0]) # build cluster based on map positions (reads are already oriented) arr = np.zeros((len(rdict) + 1, len(refs)), dtype=bytes) arr[0] = list(refs) for idx, key in enumerate(keys): rstart, rstop = key.rsplit(":", 1)[-1].split("-") start = max(0, region[1] - (int(rstart) - 1)) stop = min(len(refs), int(rstop) - int(rstart)) #print(start, stop, len(rdict[key]), refn) #print(rdict[key]) arr[idx + 1, int(start): int(stop)] = list(rdict[key]) arr[arr == b""] = b"N" for line in arr: outfile.write(line.tostring() + b"\n") #print(line.tostring(), file=outfile) #print(b"".join(line), file=outfile) outfile.write(b"\n") outfile.close() # In[60]: region[1] # In[141]: revcomp("AATTCCATTCTTCCTTTCCCATACCTCCCGCCCTGCTCCTTTCCKCTCTTGATTTCTTCTTGAGGGAGGCAGAGGANNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNGGCACCAGATTTTCTCACTGTTCAGGTCAGGGTTTGACTTCAGCCCCATCTCTAATACAAGCCATGNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN") # In[37]: # get consens seq and variant site index clust = [] avars = refvars(arr.view(np.uint8), PSEUDO_REF) dat = b"".join(avars.view("S1")[:, 0]).decode() snpstring = "".join(["*" if i else " " for i in avars[:, 1]]) clust.append("ref_{}:{}-{}\n{}".format(*region, dat)) # or, just build variant string (or don't...) # write all loci with [locids, nsnps, npis, nindels, ?] for key in keys: clust.append("{}\n{}".format(key, rdict[key])) clust.append("SNPs\n" + snpstring) clusts.append("\n".join(clust)) # advance locus counter lidx += 1 # write chunks to file if not lidx % 1000: outfile.write( str.encode("\n//\n//\n".join(clusts) + "\n//\n//\n")) clusts = [] # write remaining if clusts: outfile.write( str.encode("\n//\n//\n".join(clusts) + "\n//\n//\n")) outfile.close() # In[16]: step.remote_build_ref_clusters() # In[4]: from ipyrad.assemble.clustmap import * # In[6]: self = Step3(data, 8, True, ipyclient) # In[7]: self.run() # In[8]: self.data.run("45") # In[13]: self.remote_index_refs() self.remote_run( function=concat_multiple_edits, printstr=("concatenating ", "s3"), args=(), ) self.remote_run( function=merge_end_to_end, printstr=("join unmerged pairs ", "s3"), args=(False, False,), ) # In[15]: self.remote_run( function=dereplicate, printstr=("dereplicating ", "s3"), args=(self.nthreads,), threaded=True, ) # In[16]: self.remote_run( function=split_endtoend_reads, printstr=("splitting dereps ", "s3"), args=(), ) # In[ ]: self.remote_run( function=mapping_reads, printstr=("mapping reads ", "s3"), args=(self.nthreads,), threaded=True, ) # In[ ]: sample = list(self.data.samples.values())[0] merge_end_to_end(self.data, sample, True, True) # In[ ]: # In[15]: sample = list(data.samples.values())[0] infiles = [ os.path.join(data.dirs.edits, "{}.trimmed_R1_.fastq.gz".format(sample.name)), os.path.join(data.dirs.edits, "{}_R1_concatedit.fq.gz".format(sample.name)), os.path.join(data.tmpdir, "{}_merged.fastq".format(sample.name)), os.path.join(data.tmpdir, "{}_declone.fastq".format(sample.name)), ] infiles = [i for i in infiles if os.path.exists(i)] infile = infiles[-1] infile # In[19]: strand = "plus" if data.paramsdict["datatype"] is ('gbs' or '2brad'): strand = "both" nthreads=2 cmd = [ ip.bins.vsearch, "--derep_fulllength", infile, "--strand", strand, "--output", os.path.join(data.tmpdir, sample.name + "_derep.fastq"), "--threads", str(nthreads), "--fasta_width", str(0), "--fastq_qmax", "1000", "--sizeout", "--relabel_md5", ] cmd # In[20]: proc = sps.Popen(cmd, stderr=sps.STDOUT, stdout=sps.PIPE, close_fds=True) errmsg = proc.communicate()[0] if proc.returncode: ip.logger.error("error inside dereplicate %s", errmsg) raise IPyradWarningExit(errmsg) # In[10]: s.remote_run( function=dereplicate, printstr=("dereplicating ", "s3"), args=(s.nthreads,), threaded=True, )