turtlechanのブログ

無知の私がLinuxをいじりながら書いていくブログ

株価データをダウンロードする Pythonスクリプト 書いたよ!

今年の GW ももう終わりですね。
時間があったので、株価データをダウンロードする Pythonスクリプト 書いてみました。
私はまともに GUI を作ったことがなかったので、苦戦しましたがとりあえず出来たのでここで紹介させて下さい。

※ Python2 で書いています。

はじめに

Python2 で書いているので Python3 で動かすには一部書き直す必要があります。
Linux でしか動作確認していません。そしてデバッグしていないので不具合がある可能性が高いです。

依存モジュール(サードパーティ)は以下。

requests は pip でも使ってインストールして下さい。
Tkinter は 'apt-get install tk-dev' とかでインストール出来たかと思います。

作ったやつ

ファイル: turtlechan_price_dl_v1.zip

もし興味のある人がいればダウンロードして使ってみて下さい。
zip で圧縮しているので解凍して使ってください。
解凍先に移動して以下のコマンドで起動できるはずです。

$ python downloader.py

とりあえず GUI は以下。

起動時

ウィンドウ01

チェックボックスにチェックを入れて「Download」を押すと確認ダイアログが表示される。
OK でダウンロードが開始される。

ウィンドウ02

ダウンロード中は赤線引いたところが更新されていくので動いているのが確認できると思う。

ウィンドウ03

保存先は指定できません、downloader.py と同じディレクトリに mujinzouディレクトリ等を作成して、その中に保存する感じです。
すでにダウンロードファイルが存在していた場合にはダウンロード処理はパスします。

営業日の判定は workday_txt ディレクトリ内にあるテキストファイルを読み込んで判定しています。

wokday_txt に置いておきます。
turtlechan_price_dl_v1.zip には含めてあるのでわざわざダウンロードする必要はないです。

ソースコード

誰もダウンロードして中を覗いてくれないと寂しいので、ソースコードを書いておきます。

workday_txt

workday_txt ディレクトリ内のファイルは営業日が書かれているだけ。
以下に例を書いておく、ファイル名は '西暦.txt' である必要がある。

2000.txt
2000-01-04
2000-01-05
2000-01-06
2000-01-07
2000-01-11
~ 省略 ~
2000-12-26
2000-12-27
2000-12-28
2000-12-29

Python スクリプト

dataservice.py は、各サイトのダウンロードURLとかを生成してもらうためのもの。
ちなみに株価データ倉庫の zip ファイル名の先頭文字が 'd' のやつには対応できてない。

dataservice.py
#! /usr/bin/env python
# -*- coding: utf-8 -*-

import os
import datetime
import zipfile


class Mujinzou(object):
    NAME = 'mujinzou'
    DOMAIN = 'http://mujinzou.com'
    EXTEND = 'zip'
    ARCHIVE = 'T{0:%y%m%d}.{1}'
    PATH = '/k_data/{0:%Y}/{0:%y}_{0:%m}/{1}'
    DIR = os.path.join(os.path.dirname(__file__), NAME)

    def __init__(self, **kwargs):
        if 'date' in kwargs:
            self.date = kwargs['date']
            extend = self.__class__.EXTEND if self.date.year > 2014 else 'lzh'  # 2014年までlzh形式
            self.archive = self.__class__.ARCHIVE.format(self.date, extend)
        elif 'name' in kwargs:
            fn = kwargs['name'].split('.')
            self.date = datetime.datetime.strptime(fn[0][1:], '%y%m%d').date()
            extend = fn[-1]
            self.archive = kwargs['name']
        self.path = self.__class__.PATH.format(self.date, self.archive)
        self.domain = self.__class__.DOMAIN if self.date.year > 2018 else 'http://souba-data.com'  # 2019年からドメインが変更

    @property
    def url(self):
        return self.domain + self.path

    def exists(self):
        return os.path.isfile(os.path.join(self.__class__.DIR, self.archive))

    def load(self):
        with zipfile.ZipFile(os.path.join(self.__class__.DIR, self.archive), 'r') as zf:
            data = [row.split(',') for row in zf.open(zf.namelist()[0])]
        return {row[1]: tuple(row[4:9]) for row in data}


class Stock_databox(object):
    NAME = 'stock-databox'
    DOMAIN = 'http://stock-databox.net'
    EXTEND = 'zip'
    ARCHIVE = 'y{0:%y%m%d}.{1}'
    PATH = '/{1}'
    DIR = os.path.join(os.path.dirname(__file__), NAME)

    def __init__(self, **kwargs):
        if 'date' in kwargs:
            self.date = kwargs['date']
            self.archive = self.__class__.ARCHIVE.format(self.date, self.__class__.EXTEND)
        elif 'name' in kwargs:
            fn = kwargs['name'].split('.')
            self.date = datetime.datetime.strptime(fn[0][1:], '%y%m%d').date()
            self.archive = kwargs['name']
        self.path = self.__class__.PATH.format(self.date, self.archive)

    @property
    def url(self):
        return self.__class__.DOMAIN + self.path

    def exists(self):
        return os.path.isfile(os.path.join(self.__class__.DIR, self.archive))

    def load(self):
        with zipfile.ZipFile(os.path.join(self.__class__.DIR, self.archive), 'r') as zf:
            data = [row.strip().split('\t') for row in zf.open(zf.namelist()[0])]
        return {row[0]: (row[2], row[3], row[4], row[5], '{0:.0f}'.format(float(row[6])*100)) for row in data[1:]}

downloader.py は、営業日の確認処理担当の Workdayクラス、GUI生成とダウンロード処理担当の Downloaderクラス。

downloader.py
#! /usr/bin/env python
# -*- coding: utf-8 -*-

import os
from time import sleep
import datetime
import threading
import requests
import Tkinter as tk
import tkMessageBox as tkmsg
from dataservice import Mujinzou, Stock_databox


SAVE_DIR = os.path.dirname(__file__)  # 保存先のディレクトリ
WAIT_TIME = 1  # サーバーへの負荷を考慮
USER_AGENT = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.75 Safari/537.36'}  # User-Agent 多分なんでもいい


class Downloader(tk.Frame):

    def __init__(self, master):
        tk.Frame.__init__(self, master)
        master.tk.call('wm', 'iconphoto', master._w, tk.PhotoImage(file='favicon.gif'))
        master.title('Turtlechan Price Downloader(仮)')
        master.geometry('200x200+50+50')
        master.minsize(width=200, height=200)
        master.maxsize(width=200, height=200)
        self.pack()
        self.create_widget()
        self.ssn = requests.Session()
        self.ssn.headers.update(USER_AGENT)

    def create_widget(self):
        # ラベル
        self.lbl_start = tk.Label(self, text='開始日')
        self.lbl_end = tk.Label(self, text='終了日')
        self.strv_status = tk.StringVar()
        self.lbl_status = tk.Label(self, text='取得先を選択してください。', fg='#888', bd=1, relief=tk.SUNKEN, anchor=tk.E, textvariable=self.strv_status)
        # テキストボックス
        self.ent_start = tk.Entry(self)
        self.ent_end = tk.Entry(self)
        self.ent_start.insert(tk.END, str(datetime.date.today() - datetime.timedelta(days=20)))
        self.ent_end.insert(tk.END, str(datetime.date.today()))
        # チェックボックス
        self.frame_service = tk.LabelFrame(self, text='取得先')
        self.boolv_muji = tk.IntVar()
        self.boolv_stkdb = tk.IntVar()
        self.boolv_muji.set(0)
        self.boolv_stkdb.set(0)
        self.chk_muji = tk.Checkbutton(self.frame_service, text='無尽蔵', variable=self.boolv_muji)
        self.chk_stkdb = tk.Checkbutton(self.frame_service, text='株価データ倉庫', variable=self.boolv_stkdb)
        self.chk_muji.grid(row=0, column=0)
        self.chk_stkdb.grid(row=0, column=1)
        # ボタン
        self.btn_quit = tk.Button(self, text='Quit', command=self.quit)
        self.btn_dl = tk.Button(self, text='Download', command=self.push_dl)
        # 配置
        self.lbl_start.grid(row=0, column=0)
        self.ent_start.grid(row=0, column=1)
        self.ent_end.grid(row=1, column=1)
        self.lbl_end.grid(row=1, column=0)
        self.frame_service.grid(row=2, column=0, pady=10, columnspan=2)
        self.btn_dl.grid(row=100, column=1, sticky=tk.E)
        self.btn_quit.grid(row=101, column=1, sticky=tk.E)
        self.lbl_status.grid(row=102, column=0, columnspan=2, pady=5, sticky=tk.W + tk.E + tk.N + tk.S)

    def __dl(self, Dataservice_obj):
        if Dataservice_obj.exists():
            return 0
        if not os.path.isdir(Dataservice_obj.DIR):
            os.makedirs(Dataservice_obj.DIR)
        ds = Dataservice_obj
        res = self.ssn.get(ds.url)
        try:  # ダウンロード先が存在しなかった場合
            res.raise_for_status()
        except requests.exceptions.HTTPError:
            self.strv_status.set('{0} は存在しないみたい。'.format(ds.url))
            return 0
        self.strv_status.set('Downloading...\t{0} '.format(Dataservice_obj.archive))
        with open(os.path.join(ds.DIR, ds.archive), 'wb') as f:
            f.write(res.content)
        return 1

    def __dl_loop(self, start, end, muji, stkdb):
        ''' 並列処理のために作った関数、他にいい方法ないの? '''
        workday = Workday()
        wait = float(WAIT_TIME) / (muji + stkdb)
        for date in workday.xrange(start, end):
            tmp = 0
            tmp += self.__dl(Mujinzou(date=date)) if muji else 0
            tmp += self.__dl(Stock_databox(date=date)) if stkdb else 0
            sleep(wait * tmp)
        self.strv_status.set('ダウンロード完了。')

    def push_dl(self):
        start = datetime.datetime.strptime(self.ent_start.get(), '%Y-%m-%d').date()
        end = datetime.datetime.strptime(self.ent_end.get(), '%Y-%m-%d').date()
        muji = self.boolv_muji.get()
        stkdb = self.boolv_stkdb.get()
        if not muji and not stkdb:  # チェックボックスがからの場合何もしない
            return
        if not tkmsg.askokcancel('Download', '{0} 〜 {1}\nダウンロードを開始します。'.format(str(start), str(end))):  # 確認ダイアログ
            return
        # 時間の掛かる処理は並列処理しないとウィンドウが更新されない
        thread = threading.Thread(target=self.__dl_loop, args=(start, end, muji, stkdb))
        thread.start()


class Workday(object):
    TXT_DIR = os.path.join(os.path.dirname(__file__), 'workday_txt')

    def __init__(self):
        self.wd = set()
        for txtfile in os.listdir(self.__class__.TXT_DIR):
            with open(os.path.join(self.__class__.TXT_DIR, txtfile), 'rb') as f:
                self.wd.update(row.strip() for row in f.readlines())

    def work(self, Date_obj):
        return str(Date_obj) in self.wd

    def near(self, Date_obj, old=True):
        dt = Date_obj
        if not self.work(dt):
            while True:
                dt = dt - datetime.timedelta(days=1) if old else dt + datetime.timedelta(days=1)
                if self.work(dt):
                    break
        return dt

    def xrange(self, start, end):
        wd_sorted = sorted(self.wd)
        return (datetime.datetime.strptime(date, '%Y-%m-%d').date() for date in wd_sorted[wd_sorted.index(str(self.near(start, old=False))): wd_sorted.index(str(self.near(end, old=True))) + 1])


def main():
    root = tk.Tk()
    dlr = Downloader(root)
    dlr.mainloop()


if __name__ == '__main__':
    main()

おわりに

ソースコード見たけど、その書き方おかしい」とかあったら教えてください。
また、「使ってみたよ」とか「とりあえず読んだ」、「動かない」とか何でもいいんで教えてくださるとうれしいです。
Pythonプログラミング勉強中なので、誰かの干渉があると励みになります。よろしくお願いします。

何かの参考になれば幸いです。