Проверять реализации tee буду так
def test(realization_of_tee):
a, b = realization_of_tee(iter(xrange(10)))
b.next()
return zip(a, b)
Реализация tee с индивидуальными буферами для копий
from collections import deque
def tee1(iterable, n=2):
buffs = [deque() for _ in xrange(n)]
def pull():
x = iterable.next()
for b in buffs:
b.appendleft(x)
def worker(buff):
while True:
if not buff:
pull()
yield buff.pop()
return map(worker, buffs)
Проверим
print test(tee1)
[(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9)]
А теперь реализация tee с единым буфером
def tee2(iterable, n=2):
# индексы воркеров используются постоянно и достаточно маленький,
# поэтому лучше его сразу сохранить
indexes = tuple(range(n))
# лучше всегда именовать переменные хорошими именами,
# даже в "черновом" коде
source = iter(iterable)
buff = [] # буфер список, т.к. обращение по индексу в списках работет быстро
positions = [0] * n # c immutable-ячейками можно так делать
def cleanup():
"""
Отбрасывает из хвоста буфера элементы,
пройденные всеми курсорами
"""
min_pos = min(positions)
if min_pos > 0:
buff.pop(0)
for n in indexes:
positions[n] -= 1
def pullup():
"""
Дополненяет буфер, если какой либо из курсоров достиг его головы
"""
max_pos = max(positions)
if max_pos >= len(buff):
buff.append(source.next())
def worker(pos):
while True:
pullup()
yield buff[positions[pos]]
positions[pos] += 1
cleanup()
return map(worker, indexes)
Проверим
print test(tee2)
[(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9)]
Последняя проверка :)
from itertools import tee
test(tee) == test(tee1) == test(tee2)
True
Тестировать буду этим:
def test_peekable(realization, skip_tests=()):
"""
Тест реализаций peekable
@skip_tests - список проверок, которые нужно пропустить
"""
class Source(object):
u"""
Тестовый итератор, регистрирующий собственные
запуск, итерацию и остановку
"""
def __init__(self):
self.started = False
self.iterated = False
self.stopped = False
self._value = 0
def __iter__(self):
self.started = True
return self
def next(self):
self._value += 1
if self._value >= 10:
self.stopped = True
raise StopIteration()
self.iterated = True
return self._value
src = Source()
p = realization(src)
if not 'SKIP_START' in skip_tests and (
src.started or src.iterated
):
# До запуска итерации обёртки
# не должна запускаться и итерация источника
print u"Итерация источника запущена до начала итерации обёртки!"
return
p = iter(p)
if not 'SKIP_NEXT' in skip_tests and (
src.iterated
):
# источник данных не должен итерироваться,
# пока данные из него не потребуются
print u"Данные из источника запрошены до затребования их у обёртки!"
return
p.peek()
if not 'SKIP_STOP' in skip_tests and (
src.stopped
):
# Эта проверка добавлена для отсечения таких возможных реализаций,
# которые загружают весь исходный итератор в буфер, и итерируются по буферу
print u"Источник загружен целиком до окончания итерации по обёртке!"
return
result = []
for i in p:
result.append((i, p.peek(None)))
if result != zip(range(1, 10), range(2, 10) + [None]):
print u"Результат итерации обёртки отичается от ожидаемого!"
return
return True
Реализация peekable
class MyPeekable(object):
# значение, обозначающее отсутствие значения по умолчанию (ЗПУ),
# т.к. None, это допустимое ЗПУ
__NODEFAULT = (((),),)
def __init__(self, iterable):
self._iterable = iterable
self._iterated = False
self._peeked = False
def __iter__(self):
# источник итерируется ровно один раз при итерации обёртки
if not self._iterated:
self._iterated = True
self._iterable = iter(self._iterable)
return self
def peek(self, default=__NODEFAULT):
"""
Возвращает следующее значение исходного итератора,
не меняя при этом исходный набор значений, т.е. не влияет
на результаты последующих вызовов next()
Если итератор больше не содержит значений, то возвращается
значение по умолчанию @default, елси оно указано,
в противном случае возбуждается исключение StopIteration
"""
if not self._peeked:
iter(self) # итерируем себя на всякий случай
try:
self._value = self._iterable.next()
except StopIteration:
if default is self.__NODEFAULT:
raise
return default
self._peeked = True
return self._value
def next(self):
if not self._peeked:
self.peek()
self._peeked = False
return self._value
Проверим реализацию
test_peekable(MyPeekable)
True
from more_itertools import peekable
test_peekable(peekable)
Итерация источника запущена до начала итерации обёртки!
Эталон запускает итерацию источника при инстанцировании!
Ну что же, упростим задачу:
test_peekable(peekable, ('SKIP_START',))
True