교재 http://www.greenteapress.com/thinkbayes/html/thinkbayes009.html#fig.redline0
그래프는 redline.py 를 실행시키면 생성된다.
케임브리지에서 일하던 시절, 레드라인(지하철)을 타고 갈때,
플랫폼에서 기다리는 승객 수를 기초로, 다음 기차가 도착할 때까지 걸리는 시간을 추정하곤 했다.
승객수가 별로 없으면 출발한지 얼마 안된 것이고,
승객수가 어느정도 있다면 출발한지 꽤 되서 다음열차가 곧 올 것이고,
승객수가 많다면 이것은 장기열차 지연원인이 생긴 것이라 추측하고 바로 택시타러 간다
베이지안 추정을 통해 대기시간 예측한 다음
기차를 탈지, 택시를 탈지 결정하는데 얼마나 도움이 되었을까?
import thinkbayes
import thinkplot
import numpy
UPPER_BOUND = 1200
gap_times = [
428.0, 705.0, 407.0, 465.0, 433.0, 425.0, 204.0, 506.0, 143.0, 351.0,
450.0, 598.0, 464.0, 749.0, 341.0, 586.0, 754.0, 256.0, 378.0, 435.0,
176.0, 405.0, 360.0, 519.0, 648.0, 374.0, 483.0, 537.0, 578.0, 534.0,
577.0, 619.0, 538.0, 331.0, 186.0, 629.0, 193.0, 360.0, 660.0, 484.0,
512.0, 315.0, 457.0, 404.0, 740.0, 388.0, 357.0, 485.0, 567.0, 160.0,
428.0, 387.0, 901.0, 187.0, 622.0, 616.0, 585.0, 474.0, 442.0, 499.0,
437.0, 620.0, 351.0, 286.0, 373.0, 232.0, 393.0, 745.0, 636.0, 758.0,
]
def MakeRange(low, high=UPPER_BOUND, skip=10):
return range(low, high+skip, skip)
cdf_z = thinkbayes.MakeCdfFromList(gap_times).Scale(1.0/60)
xs = MakeRange(low=0, high=20)
pdf_z = thinkbayes.EstimatedPdf(gap_times)
pmf_z = pdf_z.MakePmf(xs, name="z")
문제 풀기에 앞서, 몇가지 모델링 원칙을 정하자.
승객은 어떤 시간이건 동일한 확률로 도착한다.
즉, 분당 승객수가 임의비율 lambda 인 포아송 프로세스를 따른다.
매일 동일한 시간에 짧은 일수동안 확인한 결과, lambda 를 상수로 가정했다.
(추가로 기차 공간은 충분히 커서 플랫폼의 승객들은 모두 탑승한다.)
반면, 기차 도착은 포아송 프로세스를 따르지 않는다.
노선 끝에서 7~8분마다 출발하지만, '켄달 스퀘어' 역에서는 3분 ~ 12분으로 다양하다.
http://www.mbta.com/rider_tools/developers/ 에서 데이터를 받아,
redline_data.py 스크립트를 통해 5일간 매일 4시~6시 사이 실행해서 약 15개씩 도착내역 기록
이것을 그림8-1 의 z 로 나타내었다.
(일단은 z 분포 그래프만 보자)
그림 8-1의 zb 는 승객들이 보는 (편향된) 기차시간 분포이다.
그런데 승객들이 보는 기차시간의 분포는 z 와 다르다.
데이터를 보면, 승객들의 데이터 (zb)가 실제 기차간격보다 더 길게 체감하고 있다고 볼 수 있다.
왜냐하면, 대기시간이 길때 탑승한 승객수가 짧은때보다 많기 때문이다.
예를들어,
기차가 5분만에 1대, 10분만에 1대가 왔다면, 평균은 7.5분 된다.
하지만, 승객들은 10분 기차에 2배가 더 많이 탔기 때문에 승객들 데이터로는 평균 8.33 된다.
=> 관측차 편향 실제보다 '오버 샘플링' 됬다고 볼 수 있다.
# pmf => biased pmf
def BiasPmf(pmf):
new_pmf = pmf.Copy()
for x, p in pmf.Items():
new_pmf.Mult(x, x) # 자신의 시간 비율만큼 곱해준다.
new_pmf.Normalize()
return new_pmf
편향된 PMF 를 간단하게 만들 수 있다.
기존 PMF 를 가지고
각 분포값에 자신의 기차시간 비율만큼 곱한다음 정규화(normalize) 하면 된다.
# 앞 기차 도착 시간에서 승객도착시간을 x 라고 하고,
# 승객도착시간에서 뒤 기차 도착시간까지 시간을 y(대기시간) 라고 하여,
# zb = x + y 라고 정의하자.
# 예를들어, 앞에 5분짜리 1대와 10분짜리 1대가 있을 경우라면,
# zb 를 구성하는 x, y 는 각각 1/3, 2/3 확률(가중치?)이다.
# 5분 간격일때 y 는 0 ~ 5분 사이 균등분포를 따르고,
# 10분 간격일때 y 는 0 ~ 10분 사이 균등분포를 따른다.
# 따라서 전체분포는 각 확률(가중치)이 곱해진 균등분포의 '혼합'형태이다.
# (* 메타 Pmf 를 통한 혼합은 75페이지 5.6 혼합을 참조)
# 이렇게 zb 를 구하면, 이를 통해 y 를 구할 수 있다.
# 정리해보자면,
# 1) 각 기차의 분포마다 균등분포 그래프를 구하고,
# 2) 각 분포의 가중치 만큼 곱해서 혼합해서 전체 y 분포를 구한다.
# 대기시간 분포 pmf_y
def PmfOfWaitTime(pmf_zb):
metapmf = thinkbayes.Pmf()
for gap, prob in pmf_zb.Items():
uniform = MakeUniformPmf(0, gap) # 각 열차의 균등분포
metapmf.Set(uniform, prob) # 에 대한 확률(가중치) 메타pmf
pmf_y = thinkbayes.MakeMixture(metapmf) # 그래프를 혼합
return pmf_y
# 균등분포
def MakeUniformPmf(low, high):
pmf = thinkbayes.Pmf()
for x in MakeRange(low=low, high=high):
pmf.Set(x, 1)
pmf.Normalize()
return pmf
def MakeRange(low, high=UPPER_BOUND, skip=10):
return range(low, high+skip, skip)
# 이런 과정을 캡슐화해보자.
# < 대기시간 계산 >
class WaitTimeCalculator(object):
def __init__(self, pmf_z):
self.pmf_z = pmf_z
self.pmf_zb = BiasPmf(pmf_z)
self.pmf_y = PmfOfWaitTime(self.pmf_zb)
self.pmf_x = self.pmf_y
# 참고로 y 분포와 x 분포는 같다.
# y 가 0 ~ zp 까지 균등분포라는 것을 생각하면,
# x = zp - y
# x 분포도 0 ~ zp 까지 균등분포이다.
#(????? 왜 갑자기 zb 에서 zp 로 바꾼거지?? -_-; 다른건가... 오타인가...;)
# 각 분포를 Cdf 로 변환해서 그래프로 그려보자.
# 동일한 축에 여러 분포를 그려야하는 경우 Cdf 가 용이하며, 평균찾기도 쉽다.
# z 평균 7.8
# zb 평균 8.8 (z 보다 13% 더 높다.)
# y 평균 4.4 (zb 평균의 반)
# 생각해보면 평균 8.8분 열차에서 기다리는 평균시간 4.4분이 맞는거 같다.
# 다시 시작했던 문제로 돌아가자.
# 내가 플랫폼 도착시, 10명이 대기중이였다.
# 다음기차까지 얼마나 기다려야 할까??
가장 단순한 케이스부터 시작해서 확장해가자.
실제분포 z가 있고, 승객도착비율 lambda=2 라는 것을 알고 있다고 가정하자.
(정리하자면,
내가 현재 기다리는 열차의 대기시간을 추정하는 것이 목표이고,
바로 대기시간 계산은 어려우니, 경과시간 x 분포에 베이지안 적용해서 푼다.
x 분포 베이지안 적용
prior 이 계산해서 나온 x 분포를 사용하고,
data = (승객수 10, lambda 분당승객수 2) 로 업뎃해서
posterior 을 구한다.
zb - x = y 를 통해 대기시간 분포 y 를 구한다.)
def RemoveNegatives(pmf):
for val in pmf.Values():
if val < 0:
pmf.Remove(val)
pmf.Normalize()
# 대기시간 추정
def PredictWaitTime(pmf_zb, pmf_x):
pmf_y = pmf_zb - pmf_x # 분포의 차를 계산
RemoveNegatives(pmf_y) # 기차 5분인데 대기시간이 5분보다 긴것들은 쳐낸다.
return pmf_y
# 경과시간
class Elapsed(thinkbayes.Suite):
def Likelihood(self, data, hypo):
x = hypo
lam, k = data
# lam * x = x분당 평균 승객수
like = thinkbayes.EvalPoissonPmf(lam * x, k) # 평균수, 관측수
return like
# 경과시간 추정 => 대기시간 추정
class ElapsedTimeEstimator(object):
# wtc = 대기시간추정을 통해 pmf_zb, pmf_x, pmf_y 계산해놓은 객체
def __init__(self, wtc, lam, num_passengers):
self.prior_x = Elapsed(wtc.pmf_x)
# 경과시간 pmf_x 사전분포를 업뎃해서 사후분포 pmf_x 를 구하자.
self.post_x = self.prior_x.Copy()
self.post_x.Update((lam, num_passengers))
# 대기시간 추정 (pmf_zb - pmf_x)
self.pmf_y = PredictWaitTime(wtc.pmf_zb, self.post_x)
# wtc = 대기시간추정을 통해 pmf_zb, pmf_x, pmf_y 계산해놓은 객체
wtc = WaitTimeCalculator(pmf_z)
ete = ElapsedTimeEstimator(wtc,
lam=2.0/60, # 초당 승객수
num_passengers=15)
# x 사전분포보다 x 사후분포가 시간이 더 길어지고 있다.
# 생각해보면, 승객수 15명이면, 분당 2명 기준으로 약 7.5분이 지났다고 예측할 수 있다.
# 그렇다면 경과시간이 7.5분 이전에 대한 확률이 '팍' 주는 것이 당연하다고 볼 수 있다.
# 지금까지 (1)실제간격분포 와 (2)승객도착비율 을 안다고 가정했었다.
# 두번째 가정을 바꿔보자.
# 승객도착비율을 전혀 모른다고 가정하자.
# 몇일 측정을 통해 lambda 를 정량적 추정할 수 있다.
# 5일간 다음과 같은 데이터가 나온다.
# k1 y k2
# -- --- --
# 17 4.6 9
# 22 1.0 0
# 23 1.4 4
# 18 5.4 12
# 4 5.8 11
passenger_data = [(17, 4.6, 9),
(22, 1.0, 0),
(23, 1.4, 4),
(18, 5.4, 12),
(4, 5.8, 11)]
# k1 도착시 승객수
# y 분단위 대기시간
# k2 대기동안 도착한 승객수
# y 와 k2 를 종합해보면, 18분 기다렸고, 36명 승객이 도착했으니, 분당 2명을 추정할 수 있다.
# 하지만, 이 추정을 ***그대로 쓰지 않고*** ,
# lambda 의 사후분포를 계산해서 이후 분석에서 (좀 더 간지나게) 이 분포를 사용해보겠다.
# 승객 도착 비율 pmf
class ArrivalRate(thinkbayes.Suite):
def Likelihood(self, data, hypo):
lam = hypo
y, k = data
# 분당 도착 비율이 lam 라고 가정할때 (가설), k 명이 있을 확률
like = thinkbayes.EvalPoissonPmf(lam * y, k) # lam, k/y
return like
# (승객) 도착비율 추정
class ArrivalRateEstimator(object):
def __init__(self, passenger_data):
low, high = 0, 5
n = 51
# 가설들 분당속도 0~5 사이라고 하고 0.1 분 단위로 짜른다.
# [0.0, 0.1, 0.2, ... ,4.9, 5.0]
hypos = numpy.linspace(low, high, n) / 60
self.prior_lam = ArrivalRate(hypos)
self.post_lam = self.prior_lam.Copy()
# 승객 도착 데이터로 모든 가설들 업데이트
for k1, y, k2 in passenger_data:
self.post_lam.Update((y, k2))
이제 승객도착비율을 상수 (lam=2) 에서 분포로 변경되었다.
이처럼, 분석시 입력값 중 하나라도 불확실한 것이 있다면, 다음과 같은 프로세스를 고려할 수 있다.
1,2 단계는 앞에서 이미 실행했다.
3,4 단계를 위해서 아래 코드를 사용하자.
class WaitMixtureEstimator(object):
def __init__(self, wtc, are, num_passengers=15):
self.metapmf = thinkbayes.Pmf()
# 설명은 없지만, are = ArrivalRateEstimator 승객도착비율 분포
# 라고 자연스럽게 추정하자...
# 각 lam (가설)에 대해서 대기시간추정, 메타pmf 혼합을 모두 적용한다.
for lam, prob in sorted(are.post_lam.Items()):
ete = ElapsedTimeEstimator(wtc, lam, num_passengers)
self.metapmf.Set(ete.pmf_y, prob)
self.mixture = thinkbayes.MakeMixture(self.metapmf)
그럼 지금까지 분석을 토대로 볼때,
언제 기차를 기다리는 것을 멈추고 택시타러 가야 할까??
문제 시나리오는, '사우스 스테이션' 에 가서 통근선을 타야한다. 현재 여유 15분이 있다.
이 경우, num_passengers 함수로 y 가 15분 이상일 확률을 구하면 된다.
그런데 문제가 있다. 대기시간이 긴 경우가 드물다.
이 경우 빈도에 민감해져서 빈도 추정하기 어렵다.
관측 데이터는 '1주일'뿐이고, 내가 관측한 가장 긴 대기시간이 '15분' 이다.
이것으로 이것보다 대기시간이 더 긴 경우를 정확하게 추정하기 어렵다.
하지만, 기존의 관측데이터를 가지고 대략적 추정은 가능하다.
1년간 레드라인 통근하면서, 신호문제, 전원노후, 경찰활동 등으로 3번의 대기시간이 긴 경우가 있었다. 따라서 '1년간 3번의 주요 장기 대기시간' 이 있다고 추정하자.
하지만, 이 관측은 편향적임을 기억하자. 따라서 내 관측은 zb 샘플로 사용해야 한다.
통근하던 해에, 집으로 가는 레드라인 220번 탔었고, 그에따른 시간차 샘플을 만들었다.
(?? 언제 사용할 줄 알고 미리 만들어 둔 것인가... 아니면 기억을 더듬어 급조한것인가;...
베이지안은 주관적 데이터도 사전분포로 사용가능하니 문제는 없겠군;)
거기에 장기 대기시간 30,40,50분 (주관적인 데이터인듯;) 를 추가하자.
# gap_times = 열차 간격 데이터
n = 220
cdf_z = thinkbayes.MakeCdfFromList(gap_times)
# ?? 책에 나온대로라면,
# '내가 관측한 시간 차 gap_times 에서 220 개의 시간차 샘플을 만들고'
# ... 내가 관측한 시간차 데이터를 다 쓰지 않고 샘플링 해서 사용하는 이유는?
# Cdf.Sample 함수가 더 효율적이니 바꿔서 사용한다.
sample_z = cdf_z.Sample(n)
pmf_z = thinkbayes.MakePmfFromList(sample_z)
cdf_zp = BiasPmf(pmf_z).MakeCdf()
sample_zb = cdf_zp.Sample(n) + [1800, 2400, 3000]
# 샘플(고르지 않은 이산형)을 KDE 작업으로 pdf(연속형) 를 만들고 pmf(고른 이산형) 를 만든다.
pdf_zb = thinkbayes.EstimatedPdf(sample_zb)
xs = MakeRange(low=60)
pmf_zb = pdf_zb.MakePmf(xs)
def UnbiasPmf(pmf_zb):
new_pmf = pmf_zb.Copy()
for x, p in pmf_zb.Items():
new_pmf.Mult(x, 1.0/x)
new_pmf.Normalize()
return new_pmf
# 계산을 통해 비편향 분포 pmf_z 를 구할 수 있다.
pmf_z = UnbiasPmf(pmf_zb)
wtc = WaitTimeCalculator(pmf_z)
def ProbLongWait(num_passengers, minutes): # 승객수, 한계시간
# 대기시간 분포 y 계산
ete = ElapsedTimeEstimator(wtc, lam, num_passengers)
# cdf(연속형) 으로 만들어서 그래프로 그리자.
cdf_y = ete.pmf_y.MakeCdf()
prob = 1 - cdf_y.Prob(minutes * 60)
결과를 보면,
승객수 20 이하일때 지하철 시스템이 일반적으로 돌아간다고 볼 수 있다. 장기 대기 시간 확률은 작다.
만약 30명 승객이 있다면 마지막 기차가 떠난지 15분 됬다고 추정할 수 있다...??
만약 '사우스 스테이션' 에서 통근선 놓칠 확률이 10% 이라면,
플랫폼에 대기하는 승객이 30명 미만이면 기다리고, 더 많다면 택시를 타야한다.
아니면, 분석을 한단계 더 진행해서, 통근선을 놓쳤을때 비용과 택시 타는 비용을 정량화 해서 예상 비용 최소화 지점을 찾을 수 있다.
지금까지는 항상 승객도착비율이 매일 동일하다고 가정하고 했다.
만약 근처에 특별한 행사가 있을 경우, 많은 사람들이 같은 시간에 도착할 것이다.
이 경우, lam 의 추정치는 너무 낮아지고, x,y 추정치는 너무 높아질 것이다...?
특별한 행사가 주요 장기 대기만큼 일어난다면 이를 모델에 추가해야 한다.
lam 분포에 간헐적 발생하는 큰값을 포함하여 확장하는 식으로 할 수 있다.
z 분포를 알고 있다고 가정하고 분석을 시작했는데,
이 대신 승객이 z를 추정할 수도 있지만, 이는 쉽지 않다.
승객은 자신의 대기시간 y 만 관측하기에 기차간 간격 z 를 정확히 알 수 없다.
하지만, zb 는 추론할 수 있다.
도착했을 때 대기 승객 수를 기록해 두면, 마지막 기차로부터 경과시간 x 를 추정할 수 있다.
그럼 y 를 관측할 수 있다. 관측된 y 에 x 사후분포를 더하면 zb 관측값에 대한 사후신뢰를 나타내는 분포가 된다...?
이 분포를 사용해서 zb 분포에 대한 신뢰를 갱신할 수 있다.
이를 역계산을 통해 z 분포를 구할 수 있다.
이 연습문제는 독자에게 남겨두겠다.
한가지 제안을 하자면, 15장을 먼저 읽어두자.