from gzip import GzipFile from StringIO import StringIO from urllib import urlopen fagz = urlopen('ftp://ftp.ncbi.nlm.nih.gov/genbank/genomes/Eukaryotes/vertebrates_mammals/Homo_sapiens/GRCh38/non-nuclear/assembled_chromosomes/FASTA/chrMT.fa.gz').read() print GzipFile(fileobj=StringIO(fagz)).read() def parse_fasta(fh): fa = {} current_short_name = None # Part 1: compile list of lines per sequence for ln in fh: if ln[0] == '>': # new name line; remember current sequence's short name long_name = ln[1:].rstrip() current_short_name = long_name.split()[0] fa[current_short_name] = [] else: # append nucleotides to current sequence fa[current_short_name].append(ln.rstrip()) # Part 2: join lists into strings for short_name, nuc_list in fa.iteritems(): # join this sequence's lines into one long string fa[short_name] = ''.join(nuc_list) return fa fasta_example = StringIO( '''>sequence1_short_name with optional additional info after whitespace ACATCACCCCATAAACAAATAGGTTTGGTCCTAGCCTTTCTATTAGCTCTTAGTAAGATTACACATGCAA GCATCCCCGTTCCAGTGAGTTCACCCTCTAAATCACCACGATCAAAAGGAACAAGCATCAAGCACGCAGC AATGCAGCTCAAAACGCTTAGCCTAGCCACACCCCCACGGGAAACAGCAGTGAT >sequence2_short_name with optional additional info after whitespace GCCCCAAACCCACTCCACCTTACTACCAGACAACCTTAGCCAAACCATTTACCCAAATAAAGTATAGGCG ATAGAAATTGAAACCTGGCGCAATAGATATAGTACCGCAAGGGAAAGATGAAAAATTATAACCAAGCATA ATATAG''') parsed_fa = parse_fasta(fasta_example) parsed_fa parsed_fa['sequence2_short_name'][100:130] def index_fasta(fh): index = [] current_short_name = None current_byte_offset, running_seq_length, running_byte_offset = 0, 0, 0 line_length_including_ws, line_length_excluding_ws = 0, 0 for ln in fh: ln_stripped = ln.rstrip() running_byte_offset += len(ln) if ln[0] == '>': if current_short_name is not None: index.append((current_short_name, running_seq_length, current_byte_offset, line_length_excluding_ws, line_length_including_ws)) long_name = ln_stripped[1:] current_short_name = long_name.split()[0] current_byte_offset = running_byte_offset running_seq_length = 0 else: line_length_including_ws = max(line_length_including_ws, len(ln)) line_length_excluding_ws = max(line_length_excluding_ws, len(ln_stripped)) running_seq_length += len(ln_stripped) if current_short_name is not None: index.append((current_short_name, running_seq_length, current_byte_offset, line_length_excluding_ws, line_length_including_ws)) return index fasta_example = StringIO( '''>sequence1_short_name with optional additional info after whitespace ACATCACCCCATAAACAAATAGGTTTGGTCCTAGCCTTTCTATTAGCTCTTAGTAAGATTACACATGCAA GCATCCCCGTTCCAGTGAGTTCACCCTCTAAATCACCACGATCAAAAGGAACAAGCATCAAGCACGCAGC AATGCAGCTCAAAACGCTTAGCCTAGCCACACCCCCACGGGAAACAGCAGTGAT >sequence2_short_name with optional additional info after whitespace GCCCCAAACCCACTCCACCTTACTACCAGACAACCTTAGCCAAACCATTTACCCAAATAAAGTATAGGCG ATAGAAATTGAAACCTGGCGCAATAGATATAGTACCGCAAGGGAAAGATGAAAAATTATAACCAAGCATA ATATAG''') idx = index_fasta(fasta_example) idx import re class FastaOOB(Exception): """ Out-of-bounds exception for FASTA sequences """ def __init__(self, value): self.value = value def __str__(self): return repr(self.value) class FastaIndexed(object): """ Encapsulates a set of indexed FASTA files. Does not load the FASTA files into memory but still allows the user to extract arbitrary substrings, with the help of the index. """ __removeWs = re.compile(r'\s+') def __init__(self, fafns): self.fafhs = {} self.faidxs = {} self.chr2fh = {} self.offset = {} self.lens = {} self.charsPerLine = {} self.bytesPerLine = {} for fafn in fafns: # Open FASTA file self.fafhs[fafn] = fh = open(fafn, 'r') # Parse corresponding .fai file with open(fafn + '.fai') as idxfh: for ln in idxfh: toks = ln.rstrip().split() if len(toks) == 0: continue assert len(toks) == 5 # Parse and save the index line chr, ln, offset, charsPerLine, bytesPerLine = toks self.chr2fh[chr] = fh self.offset[chr] = int(offset) # 0-based self.lens[chr] = int(ln) self.charsPerLine[chr] = int(charsPerLine) self.bytesPerLine[chr] = int(bytesPerLine) def __enter__(self): return self def __exit__(self, type, value, traceback): # Close all the open FASTA files for fafh in self.fafhs.itervalues(): fafh.close() def has_name(self, refid): return refid in self.offset def name_iter(self): return self.offset.iterkeys() def length_of_ref(self, refid): return self.lens[refid] def get(self, refid, start, ln): ''' Return the specified substring of the reference. ''' assert refid in self.offset if start + ln > self.lens[refid]: raise ReferenceOOB('"%s" has length %d; tried to get [%d, %d)' % (refid, self.lens[refid], start, start + ln)) fh, offset, charsPerLine, bytesPerLine = \ self.chr2fh[refid], self.offset[refid], \ self.charsPerLine[refid], self.bytesPerLine[refid] byteOff = offset byteOff += (start // charsPerLine) * bytesPerLine into = start % charsPerLine byteOff += into fh.seek(byteOff) left = charsPerLine - into # Count the number of line breaks interrupting the rest of the # string we're trying to read if ln < left: return fh.read(ln) else: nbreaks = 1 + (ln - left) // charsPerLine res = fh.read(ln + nbreaks * (bytesPerLine - charsPerLine)) res = re.sub(self.__removeWs, '', res) return res # first we'll write a new FASTA file with open('tmp.fa', 'w') as fh: fh.write('''>sequence1_short_name with optional additional info after whitespace ACATCACCCCATAAACAAATAGGTTTGGTCCTAGCCTTTCTATTAGCTCTTAGTAAGATTACACATGCAA GCATCCCCGTTCCAGTGAGTTCACCCTCTAAATCACCACGATCAAAAGGAACAAGCATCAAGCACGCAGC AATGCAGCTCAAAACGCTTAGCCTAGCCACACCCCCACGGGAAACAGCAGTGAT >sequence2_short_name with optional additional info after whitespace GCCCCAAACCCACTCCACCTTACTACCAGACAACCTTAGCCAAACCATTTACCCAAATAAAGTATAGGCG ATAGAAATTGAAACCTGGCGCAATAGATATAGTACCGCAAGGGAAAGATGAAAAATTATAACCAAGCATA ATATAG''') with open('tmp.fa') as fh: idx = index_fasta(fh) with open('tmp.fa.fai', 'w') as fh: fh.write('\n'.join(['\t'.join(map(str, x)) for x in idx])) with FastaIndexed(['tmp.fa']) as fa_idx: print fa_idx.get('sequence2_short_name', 100, 30)