def test(realization_of_tee): a, b = realization_of_tee(iter(xrange(10))) b.next() return zip(a, b) from collections import deque def tee1(iterable, n=2): buffs = [deque() for _ in xrange(n)] def pull(): x = iterable.next() for b in buffs: b.appendleft(x) def worker(buff): while True: if not buff: pull() yield buff.pop() return map(worker, buffs) print test(tee1) def tee2(iterable, n=2): # индексы воркеров используются постоянно и достаточно маленький, # поэтому лучше его сразу сохранить indexes = tuple(range(n)) # лучше всегда именовать переменные хорошими именами, # даже в "черновом" коде source = iter(iterable) buff = [] # буфер список, т.к. обращение по индексу в списках работет быстро positions = [0] * n # c immutable-ячейками можно так делать def cleanup(): """ Отбрасывает из хвоста буфера элементы, пройденные всеми курсорами """ min_pos = min(positions) if min_pos > 0: buff.pop(0) for n in indexes: positions[n] -= 1 def pullup(): """ Дополненяет буфер, если какой либо из курсоров достиг его головы """ max_pos = max(positions) if max_pos >= len(buff): buff.append(source.next()) def worker(pos): while True: pullup() yield buff[positions[pos]] positions[pos] += 1 cleanup() return map(worker, indexes) print test(tee2) from itertools import tee test(tee) == test(tee1) == test(tee2) def test_peekable(realization, skip_tests=()): """ Тест реализаций peekable @skip_tests - список проверок, которые нужно пропустить """ class Source(object): u""" Тестовый итератор, регистрирующий собственные запуск, итерацию и остановку """ def __init__(self): self.started = False self.iterated = False self.stopped = False self._value = 0 def __iter__(self): self.started = True return self def next(self): self._value += 1 if self._value >= 10: self.stopped = True raise StopIteration() self.iterated = True return self._value src = Source() p = realization(src) if not 'SKIP_START' in skip_tests and ( src.started or src.iterated ): # До запуска итерации обёртки # не должна запускаться и итерация источника print u"Итерация источника запущена до начала итерации обёртки!" return p = iter(p) if not 'SKIP_NEXT' in skip_tests and ( src.iterated ): # источник данных не должен итерироваться, # пока данные из него не потребуются print u"Данные из источника запрошены до затребования их у обёртки!" return p.peek() if not 'SKIP_STOP' in skip_tests and ( src.stopped ): # Эта проверка добавлена для отсечения таких возможных реализаций, # которые загружают весь исходный итератор в буфер, и итерируются по буферу print u"Источник загружен целиком до окончания итерации по обёртке!" return result = [] for i in p: result.append((i, p.peek(None))) if result != zip(range(1, 10), range(2, 10) + [None]): print u"Результат итерации обёртки отичается от ожидаемого!" return return True class MyPeekable(object): # значение, обозначающее отсутствие значения по умолчанию (ЗПУ), # т.к. None, это допустимое ЗПУ __NODEFAULT = (((),),) def __init__(self, iterable): self._iterable = iterable self._iterated = False self._peeked = False def __iter__(self): # источник итерируется ровно один раз при итерации обёртки if not self._iterated: self._iterated = True self._iterable = iter(self._iterable) return self def peek(self, default=__NODEFAULT): """ Возвращает следующее значение исходного итератора, не меняя при этом исходный набор значений, т.е. не влияет на результаты последующих вызовов next() Если итератор больше не содержит значений, то возвращается значение по умолчанию @default, елси оно указано, в противном случае возбуждается исключение StopIteration """ if not self._peeked: iter(self) # итерируем себя на всякий случай try: self._value = self._iterable.next() except StopIteration: if default is self.__NODEFAULT: raise return default self._peeked = True return self._value def next(self): if not self._peeked: self.peek() self._peeked = False return self._value test_peekable(MyPeekable) from more_itertools import peekable test_peekable(peekable) test_peekable(peekable, ('SKIP_START',))