【Python】numpyで指数平滑移動平均を計算する
前回 pandas を使って指数平滑移動平均(EMA)を修正しつつ計算したが、実行が遅い(?)ので numpy で高速化できないかと思ってスクリプトを書いてみた。
前回の内容は以下。
指数平滑移動平均を計算する
とりあえず今回作ったスクリプトを載せておく。いつも通りpython2で書いてます。
ちなみに前回と同様でデータは 2015年度の日経225 を使用してます。
#! /usr/bin/env python # coding: utf-8 import codecs import numpy as np import pandas as pd def np_ema(closeList=[], term=5): '''numpyで指数平滑移動平均の計算''' dataArr = np.array([sum(closeList[0: term]) / term] + closeList[term:]) alpha = 2 / (term + 1.) wtArr = (1 - alpha)**np.arange(len(dataArr)) xArr = (dataArr[1:] * alpha * wtArr[-2::-1]).cumsum() / wtArr[-2::-1] emaArr = dataArr[0] * wtArr + np.hstack((0, xArr)) return np.hstack((np.empty(term - 1) * np.nan, emaArr)).tolist() def main(): with codecs.open('./nikkei_225.csv', 'r', encoding='utf-8') as f: # header = [next(f).encode('utf-8') for _ in np.arange(2)] [next(f).encode('utf-8') for _ in np.arange(2)] closeList = [float(row.split(',')[4]) for row in f] df = pd.DataFrame(dict(close=closeList, np_ema=np_ema(closeList))) print df if __name__ == '__main__': main()
実行結果を見やすくするために pandas を使っています。
以下、実行結果。
close np_ema 0 17070.0 NaN 1 16700.0 NaN 2 17140.0 NaN 3 17350.0 NaN 4 16940.0 17040.000000 5 17040.0 17040.000000 ~省略~ 239 18820.0 18875.635525 240 18750.0 18833.757017 241 18820.0 18829.171345 242 19110.0 18922.780896 243 19010.0 18951.853931
問題なく計算できてますよね(?)
np_ema() 関数の解説をざっくりしたいと思います。
np_ema() 関数は引数に、終値のリスト(closeList) と 平均する期間(term)を取ります。
dataArr = np.array([sum(closeList[0: term]) / term] + closeList[term:])
受け取った closeList の先頭(term期間)を平均値にして、それ以降の終値を連結したものを ndarray型 にして 変数dataArr に代入。
alpha = 2 / (term + 1.)
平滑定数αの公式に則って、算出結果を 変数alpha に代入。小数点以下が出る可能性があるので float型 になるようにピリオドをつけている。
wtArr = (1 - alpha)**np.arange(len(dataArr))
xArr = (dataArr[1:] * alpha * wtArr[-2::-1]).cumsum() / wtArr[-2::-1]
emaArr = dataArr[0] * wtArr + np.hstack((0, xArr))
numpy を使用して forループ を使うとかえって遅くなる(と思う)。少なくともpythonの ループ処理は低速だ。そのため forループ を回避するために、指数平滑移動平均(EMA)の計算式を展開して計算することにした。
指数平滑移動平均の計算式は以下ですよね。
y0 = 単純移動平均(SMA0)
yt = (1 - α)yt-1 + αxt
展開して整理した数式は以下。
法則性があるのが分かりますね。ちなみに今回は
y0 = x0 = SMA0
ですね。また、
y0 = x0(1 - α)0 + 0
とすればいい感じに計算できそうです。
以上を踏まえてpythonのコードに戻ります。
wtArr = (1 - alpha)**np.arange(len(dataArr))
x0(1 - α)t の (1 - α)t の部分を 変数wtArr に代入しています。
xArr = (dataArr[1:] * alpha * wtArr[-2::-1]).cumsum() / wtArr[-2::-1]
変数wtArr を流用しているので少し分かりにくいが α[x1(1-α)t-1+x2(1-α)t-2+...+xt-2(1-α)1+xt-1(1-α)0] の部分(t>0)を 変数xArr に代入している。
wtArr[-2::-1] を cumsum() したものを wtArr[-2::-1] で割ると(1-α)が約分されるので階上的な部分が一行で再現できる。
emaArr = dataArr[0] * wtArr + np.hstack((0, xArr))
上記で用意したものを使って指数平滑移動平均を計算して 変数emaArr に代入。np.hstack((0, xArr)) にしているのは先程用意した 変数xArr の式では t=0 の場合(xArr[0]=0)を再現できなかったので、この段階で 0 を先頭に追加した。
return np.hstack((np.empty(term - 1) * np.nan, emaArr)).tolist()
関数の戻り値。リストの長さが元の終値のリストと異なると気持ち悪いので、最初の単純移動平均の計算時に消失した部分に Nan を追加している。ndarray のままでもいいがなんとなく tolist() でpythonの list型 に直して返すようにした。
以上で、解説終わり。
実行速度
で、実行時間はどうなったのか確認したいと思います。
前回 pandas で指数平滑移動平均を計算した関数と比較したいと思います。
#! /usr/bin/env python # coding: utf-8 import codecs import timeit import numpy as np import pandas as pd def ema(closeList=[], term=5): '''指数平滑移動平均の計算''' return list(pd.Series(closeList).ewm(span=term).mean()) def ema1(closeList=[], term=5): '''指数平滑移動平均の計算(修正版)''' s = pd.Series(closeList) sma = s.rolling(term).mean()[:term] return list(pd.concat([sma, s[term:]]).ewm(span=term, adjust=False).mean()) def np_ema(closeList=[], term=5): '''numpyで指数平滑移動平均の計算''' dataArr = np.array([sum(closeList[0: term]) / term] + closeList[term:]) alpha = 2 / (term + 1.) wtArr = (1 - alpha)**np.arange(len(dataArr)) xArr = (dataArr[1:] * alpha * wtArr[-2::-1]).cumsum() / wtArr[-2::-1] emaArr = dataArr[0] * wtArr + np.hstack((0, xArr)) return np.hstack((np.empty(term - 1) * np.nan, emaArr)).tolist() def main(): # 終値の用意 with codecs.open('./nikkei_225.csv', 'r', encoding='utf-8') as f: [next(f).encode('utf-8') for _ in range(2)] closeList = [float(row.split(',')[4]) for row in f] loop = 1000 result = timeit.timeit(lambda: np_ema(closeList), number=loop) print('np_ema(): {0}'.format(result / loop)) result = timeit.timeit(lambda: ema(closeList), number=loop) print('ema(): {0}'.format(result / loop)) result = timeit.timeit(lambda: ema1(closeList), number=loop) print('ema1(): {0}'.format(result / loop)) if __name__ == '__main__': main()
np_ema(): 0.000101397037506 ema(): 0.000755774974823 ema1(): 0.00177794122696
前回の ema関数 より7.45倍、ema1関数 より17.53倍ほど高速化されたみたいです。多分メモリは結構食ってると思います。
おわりに
こんな面倒くさいことをしなくても numba ライブラリというものを使えば forループ が高速になるみたいですね。私は @jit なんぞそれって感じです。
何かの参考になれば幸いです。