心ゆく日並

辛さを辛味に、苦しみを苦味に転じら れればこの日々も味わい深い

Python プログラム 投資

PythonとTensorFlowで株価予想:S&P500編

投稿日:

ども♪マコトです。

機械学習の勉強をしていたら、「Machine Learning with Financial Time Series Data on Google Cloud Platform」と言う、google先生がTensorFlowを使ってS&P500の株価予想をしているプログラムを見つけました。

google曰く、以下の結果が出たそうです。

Precision =  0.775862068966
Recall =  0.625
F1 Score =  0.692307692308
Accuracy =  0.722222222222

Accuracyが正解率なので、72%で当たり外れを当てれるんだそうです。

まあ、5年ぐらい前の結果ですがね。

元のプログラムはGoogle Cloud Platformで作っているようですが、登録するのも面倒くさかったので、解析してPythonでプログラムを組んでみました。

プログラムで使用するデータは米国yahooから取得しています。

データを取得するプログラムは米国yahooから情報を取得する方法【2019/08 】で書きましたので解析して見てください。

使うデータを変えたらもっと正解率が上がるかも?

以下がプログラムになります。

 -*- coding: utf-8 -*-
'''
googleの株価予想 S&P500
Code based on:
https://github.com/corrieelston/datalab/blob/master/FinancialTimeSeriesTensorFlow.ipynb
'''
from __future__ import print_function

import datetime
import urllib.request, urllib.parse, urllib.error
from os import path
import operator as op
from collections import namedtuple
import numpy as np
import pandas as pd
import tensorflow as tf
import argparse

import requests
import time
import datetime as dt
from datetime import datetime, date
import re
import os
import Cls_Connect as Conn

DAYS_BACK = 3
FROM_YEAR = '1991'

cookie = ''
crumb = ''
ROOT_DIR = r"C:\data"

EXCHANGES_DEFINE = [
    ['DOW', '^DJI','%5EDJI'],           #NYダウ平均株価
    ['FTSE', '^FTSE','%5EFTSE'],        #英国大型株100
    ['GDAXI', '^GDAXI','%5EGDAXI'],     #ドイツ30
    ['HSI', '^HSI','%5EHSI'],           #香港ハンセン株価指数
    ['N225', '^N225','%5EN225'],        #日経平均株価
    ['NASDAQ', '^IXIC','%5EIXIC'],      #NASDAQ
    ['SP500', '^GSPC','%5EGSPC'],       #S&P 500 (S&P)
    ['SSEC', '000001.SS','000001.SS'],  #上海総合指数 インデックス
]

EXCHANGES_LABEL = [exchange[0] for exchange in EXCHANGES_DEFINE]

Dataset = namedtuple(
    'Dataset',
    'training_predictors training_classes test_predictors test_classes')
Environ = namedtuple('Environ', 'sess model actual_classes training_step dataset feature_data')


#yahooのcrumbとcookieを取得する処理を追加
def get_yahoo_crumb_cookie():
    """Get Yahoo crumb cookie value."""
    res = requests.get('https://finance.yahoo.com/quote/SPY/history')
    yahoo_cookie = res.cookies['B']
    yahoo_crumb = None
    pattern = re.compile(r'.*"CrumbStore":\{"crumb":"(?P<crumb>[^"]+)"\}')
    for line in res.text.splitlines():
        m = pattern.match(line)
        if m is not None:
            yahoo_crumb = m.groupdict()['crumb']
    return yahoo_cookie, yahoo_crumb


def fetchYahooFinance(name, code):

    fileName = 'index_%s.csv' % name

    start = int(time.mktime(dt.datetime(int(FROM_YEAR),1,2).timetuple()))
    end = int(time.mktime(date.today().timetuple()))
    url = 'https://query1.finance.yahoo.com/v7/finance/download/%s?period1=%s&period2=%s&interval=1d&events=history&crumb=%s' % (code, start, end, crumb)
    data = requests.get(url, cookies={'B':cookie})

    saveFilePath = os.path.join(ROOT_DIR, fileName)
    with open(saveFilePath, 'wb') as saveFile:
        saveFile.write(data.content)


def fetchStockIndexes():
    '''株価指標のデータをダウンロードしファイルに保存
    '''
    global cookie
    global crumb
    cookie, crumb = get_yahoo_crumb_cookie()

    for exchange in EXCHANGES_DEFINE:
        fetchYahooFinance(exchange[0], exchange[2])


def load_exchange_dataframes():
    '''EXCHANGESに対応するCSVファイルをPandasのDataFrameとして読み込む。
    Returns:
        {EXCHANGES[n]: pd.DataFrame()}
    '''
    return {exchange: load_exchange_dataframe(exchange)
            for exchange in EXCHANGES_LABEL}


def load_exchange_dataframe(exchange):
    '''exchangeに対応するCSVファイルをPandasのDataFrameとして読み込む。
    Args:
        exchange: 指標名
    Returns:
        pd.DataFrame()
    '''
    return pd.read_csv(os.path.join(ROOT_DIR,'index_{}.csv'.format(exchange))).set_index('Date').sort_index()


def get_closing_data(dataframes):
    '''各指標の終値カラムをまとめて1つのDataFrameに詰める。
    Args:
        dataframes: {key: pd.DataFrame()}
    Returns:
        pd.DataFrame()
    '''
    closing_data = pd.DataFrame()
    for exchange, dataframe in dataframes.items():
        closing_data[exchange] = dataframe['Close']
    closing_data = closing_data.fillna(method='ffill')
    return closing_data


def get_log_return_data(closing_data):
    '''各指標について、終値を1日前との比率の対数をとって正規化する。
    Args:
        closing_data: pd.DataFrame()
    Returns:
        pd.DataFrame()
    '''
    log_return_data = pd.DataFrame()
    for exchange in closing_data:
        # np.log(当日終値 / 前日終値) で前日からの変化率を算出
        # 前日よりも上がっていればプラス、下がっていればマイナスになる
        log_return_data[exchange] = np.log(closing_data[exchange]/closing_data[exchange].shift())

    return log_return_data


def build_training_data(log_return_data, target_exchange, max_days_back=DAYS_BACK, use_subset=None):
    '''学習データを作る。分類クラスは、target_exchange指標の終値が前日に比べて上ったか下がったかの2つである。
    また全指標の終値の、当日から数えてmax_days_back日前までを含めて入力データとする。
    Args:
        log_return_data: pd.DataFrame()
        target_exchange: 学習目標とする指標名
        max_days_back: 何日前までの終値を学習データに含めるか
        use_subset (float): 短時間で動作を確認したい時用: log_return_dataのうち一部だけを学習データに含める
    Returns:
        pd.DataFrame()
    '''
    # 「上がる」「下がる」の結果を計算
    columns = []
    for colname, exchange, operator in iter_categories(target_exchange):
        columns.append(colname)
        # 全ての XXX_positive, XXX_negative を 0 に初期化
        log_return_data[colname] = 0
        # XXX_positive の場合は >=  0 の全てのインデックスを
        # XXX_negative の場合は < 0 の全てのインデックスを取得し、それらに 1 を設定する
        indices = operator(log_return_data[exchange], 0)
        log_return_data.ix[indices, colname] = 1

    num_categories = len(columns)

    # 各指標のカラム名を追加
    for colname, _, _ in iter_exchange_days_back(target_exchange, max_days_back):
        columns.append(colname)

    '''
    columns には計算対象の positive, negative と各指標の日数分のラベルが含まれる
    例:[
        'SP500_positive',
        'SP500_negative',
        'DOW_1',
        'DOW_2',
        'DOW_3',
        'FTSE_0',
        'FTSE_1',
        'FTSE_2',
        'GDAXI_0',
        'GDAXI_1',
        'GDAXI_2',
        'HSI_0',
        'HSI_1',
        'HSI_2',
        'N225_0',
        'N225_1',
        'N225_2',
        'NASDAQ_1',
        'NASDAQ_2',
        'NASDAQ_3',
        'SP500_1',
        'SP500_2',
        'SP500_3',
        'SSEC_0',
        'SSEC_1',
        'SSEC_2'
    ]
    計算対象の データ だけ当日のデータを含めたらダメなので1〜3が入る
    '''

    # データ数をもとめる
    max_index = len(log_return_data) - max_days_back
    if use_subset is not None:
        # データを少なくしたいとき
        max_index = int(max_index * use_subset)

    # 学習データを作る
    training_test_data = pd.DataFrame(columns=columns)
    for i in range(max_days_back + 10, max_index):
        # 先頭のデータを含めるとなぜか上手くいかないので max_days_back + 10 で少し省く
        values = {}
        # 「上がる」「下がる」の答を入れる
        for colname, _, _ in iter_categories(target_exchange):
            values[colname] = log_return_data[colname].ix[i]
        # 学習データを入れる
        for colname, exchange, days_back in iter_exchange_days_back(target_exchange, max_days_back):
            values[colname] = log_return_data[exchange].ix[i - days_back]
        training_test_data = training_test_data.append(values, ignore_index=True)

    return num_categories, training_test_data


def iter_categories(target_exchange):
    '''分類クラス名とその値を計算するためのオペレーター関数を列挙する。
    '''
    for polarity, operator in [
            ('positive', op.ge), # >=
            ('negative', op.lt), # <
    ]:
        colname = '{}_{}'.format(target_exchange, polarity)
        yield colname, target_exchange, operator


def iter_exchange_days_back(target_exchange, max_days_back):
    '''指標名、何日前のデータを読むか、カラム名を列挙する。
    '''
    for exchange in EXCHANGES_LABEL:
        # ターゲット の結果を予測するのに ターゲット の当日の値が含まれてはいけないので1日づらす
        #start_days_back = 1 if exchange == target_exchange else 0
        # start_days_back = 1 # N225 で行う場合は全て前日の指標を使うようにする
        if exchange == "DOW" or exchange == "NASDAQ" or exchange == "SP500":
            start_days_back = 1
        else :
            start_days_back = 0

        end_days_back = start_days_back + max_days_back
        for days_back in range(start_days_back, end_days_back):
            colname = '{}_{}'.format(exchange, days_back)
            yield colname, exchange, days_back


def split_training_test_data(num_categories, training_test_data):
    '''学習データをトレーニング用とテスト用に分割する。
    '''
    # 先頭2つより後ろが学習データ
    predictors_tf = training_test_data[training_test_data.columns[num_categories:]]
    # 先頭2つが「上がる」「下がる」の答えデータ
    classes_tf = training_test_data[training_test_data.columns[:num_categories]]

    # 学習用とテスト用のデータサイズを求める
    training_set_size = int(len(training_test_data) * 0.8)
    #test_set_size = len(training_test_data) - training_set_size

    # 古いデータ0.8を学習とし、新しいデータ0.2がテストとなる
    return Dataset(
        training_predictors=predictors_tf[:training_set_size],
        training_classes=classes_tf[:training_set_size],
        test_predictors=predictors_tf[training_set_size:],
        test_classes=classes_tf[training_set_size:],
    )


def tf_confusion_metrics(model, actual_classes, session, feed_dict):
    '''与えられたネットワークの正解率などを出力する。
    '''
    predictions = tf.argmax(model, 1)
    actuals = tf.argmax(actual_classes, 1)

    ones_like_actuals = tf.ones_like(actuals)
    zeros_like_actuals = tf.zeros_like(actuals)
    ones_like_predictions = tf.ones_like(predictions)
    zeros_like_predictions = tf.zeros_like(predictions)

    tp_op = tf.reduce_sum(
        tf.cast(
            tf.logical_and(
                tf.equal(actuals, ones_like_actuals),
                tf.equal(predictions, ones_like_predictions)
            ),
            "float"
        )
    )

    tn_op = tf.reduce_sum(
        tf.cast(
            tf.logical_and(
                tf.equal(actuals, zeros_like_actuals),
                tf.equal(predictions, zeros_like_predictions)
            ),
            "float"
        )
    )

    fp_op = tf.reduce_sum(
        tf.cast(
            tf.logical_and(
                tf.equal(actuals, zeros_like_actuals),
                tf.equal(predictions, ones_like_predictions)
            ),
            "float"
        )
    )

    fn_op = tf.reduce_sum(
        tf.cast(
            tf.logical_and(
                tf.equal(actuals, ones_like_actuals),
                tf.equal(predictions, zeros_like_predictions)
            ),
            "float"
        )
    )

    tp, tn, fp, fn = session.run(
        [tp_op, tn_op, fp_op, fn_op],
        feed_dict
    )

    tpr = float(tp)/(float(tp) + float(fn))
    #fpr = float(fp)/(float(tp) + float(fn))

    accuracy = (float(tp) + float(tn))/(float(tp) + float(fp) + float(fn) + float(tn))

    recall = tpr
    if (float(tp) + float(fp)):
        precision = float(tp)/(float(tp) + float(fp))
        f1_score = (2 * (precision * recall)) / (precision + recall)
    else:
        precision = 0
        f1_score = 0

    print('Precision = ', precision)
    print('Recall = ', recall)
    print('F1 Score = ', f1_score)
    print('Accuracy = ', accuracy)


def simple_network(dataset):
    '''単純な分類モデルを返す。
    '''
    sess = tf.Session()

    # Define variables for the number of predictors and number of classes to remove magic numbers from our code.
    num_predictors = len(dataset.training_predictors.columns)
    num_classes = len(dataset.training_classes.columns)

    # Define placeholders for the data we feed into the process - feature data and actual classes.
    feature_data = tf.placeholder("float", [None, num_predictors])
    actual_classes = tf.placeholder("float", [None, num_classes])

    # Define a matrix of weights and initialize it with some small random values.
    weights = tf.Variable(tf.truncated_normal([num_predictors, num_classes], stddev=0.0001))
    biases = tf.Variable(tf.ones([num_classes]))

    # Define our model...
    # Here we take a softmax regression of the product of our feature data and weights.
    model = tf.nn.softmax(tf.matmul(feature_data, weights) + biases)

    # Define a cost function (we're using the cross entropy).
    cost = -tf.reduce_sum(actual_classes * tf.log(model))

    # Define a training step...
    # Here we use gradient descent with a learning rate of 0.01 using the cost function we just defined.
    training_step = tf.train.AdamOptimizer(learning_rate=0.0001).minimize(cost)

    init = tf.initialize_all_variables()
    sess.run(init)

    return Environ(
        sess=sess,
        model=model,
        actual_classes=actual_classes,
        training_step=training_step,
        dataset=dataset,
        feature_data=feature_data,
    )


def smarter_network(dataset):
    '''隠しレイヤー入りのもうちょっと複雑な分類モデルを返す。
    '''
    sess = tf.compat.v1.Session()

    num_predictors = len(dataset.training_predictors.columns)
    num_classes = len(dataset.training_classes.columns)

    feature_data = tf.compat.v1.placeholder("float", [None, num_predictors])
    actual_classes = tf.compat.v1.placeholder("float", [None, num_classes])

    weights1 = tf.compat.v1.Variable(tf.random.truncated_normal([(DAYS_BACK * len(EXCHANGES_DEFINE)), 100], stddev=0.0001))
    biases1 = tf.compat.v1.Variable(tf.ones([100]))

    weights2 = tf.compat.v1.Variable(tf.truncated_normal([100, 50], stddev=0.0001))
    biases2 = tf.compat.v1.Variable(tf.ones([50]))

    weights3 = tf.compat.v1.Variable(tf.truncated_normal([50, 25], stddev=0.0001))
    biases3 = tf.compat.v1.Variable(tf.ones([25]))

    weights4 = tf.compat.v1.Variable(tf.truncated_normal([25, 2], stddev=0.0001))
    biases4 = tf.compat.v1.Variable(tf.ones([2]))

    hidden_layer_1 = tf.nn.relu(tf.matmul(feature_data, weights1) + biases1)
    hidden_layer_2 = tf.nn.relu(tf.matmul(hidden_layer_1, weights2) + biases2)
    hidden_layer_3 = tf.nn.relu(tf.matmul(hidden_layer_2, weights3) + biases3)
    model = tf.nn.softmax(tf.matmul(hidden_layer_3, weights4) + biases4)

    cost = -tf.reduce_sum(actual_classes*tf.math.log(model))

    training_step = tf.compat.v1.train.AdamOptimizer(learning_rate=0.0001).minimize(cost)

    init = tf.initialize_all_variables()
    sess.run(init)

    return Environ(
        sess=sess,
        model=model,
        actual_classes=actual_classes,
        training_step=training_step,
        dataset=dataset,
        feature_data=feature_data,
    )


def train(env, steps=30000, checkin_interval=5000):
    '''学習をsteps回おこなう。
    '''
    correct_prediction = tf.equal(
        tf.argmax(env.model, 1),
        tf.argmax(env.actual_classes, 1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, "float"))

    for i in range(1, 1 + steps):
        env.sess.run(
            env.training_step,
            feed_dict=feed_dict(env, test=False),
        )
        if i % checkin_interval == 0:
            print(i, env.sess.run(
                accuracy,
                feed_dict=feed_dict(env, test=False),
            ))

    tf_confusion_metrics(env.model, env.actual_classes, env.sess, feed_dict(env, True))


def feed_dict(env, test=False):
    '''学習/テストに使うデータを生成する。
    '''
    prefix = 'test' if test else 'training'
    predictors = getattr(env.dataset, '{}_predictors'.format(prefix))
    classes = getattr(env.dataset, '{}_classes'.format(prefix))
    return {
        env.feature_data: predictors.values,
        env.actual_classes: classes.values.reshape(len(classes.values), len(classes.columns))
    }


def main(args):
    print('株価指標データをダウンロードしcsvファイルに保存')
    fetchStockIndexes()
    print('株価指標データを読み込む')
    all_data  = load_exchange_dataframes()
    print('終値を取得')
    closing_data = get_closing_data(all_data)
    print('データを学習に使える形式に正規化')
    log_return_data = get_log_return_data(closing_data)
    print('答と学習データを作る')
    num_categories, training_test_data = build_training_data(
        log_return_data, args.target_exchange,
        use_subset=args.use_subset)

    #各列ごとに平均や標準偏差、最大値、最小値、最頻値などの要約統計量を取得
    print(training_test_data.describe())

    print('学習データをトレーニング用とテスト用に分割する')
    dataset = split_training_test_data(num_categories, training_test_data)

    #各列ごとに平均や標準偏差、最大値、最小値、最頻値などの要約統計量を取得
    print(dataset.test_predictors.describe())
    print(dataset.training_predictors.describe())

    print('機械学習のネットワークを作成')
    #env = simple_network(dataset)
    env = smarter_network(dataset)

    if args.inspect:
        import code
        print('Press Ctrl-d to proceed')
        code.interact(local=locals())

    print('学習')
    train(env, steps=args.steps, checkin_interval=args.checkin)


if __name__ == '__main__':

    parser = argparse.ArgumentParser()
    parser.add_argument('target_exchange', choices=EXCHANGES_LABEL)
    parser.add_argument('--steps', type=int, default=10000)
    parser.add_argument('--checkin', type=int, default=1000)
    parser.add_argument('--use-subset', type=float, default=None)
    parser.add_argument('--inspect', type=bool, default=False)
    args = parser.parse_args()

    #追加
    # プロキシの設定
    Con = Conn.Connect()
    os.environ["http_proxy"] = Con.http_proxy()
    os.environ["https_proxy"] = Con.https_proxy()

    main(args)

 

去年の12月頃に作って動かしたときは大体正解率が69%ぐらいでした。

2015年のgoogleが発表したときに比べると若干正解率が下がっていました。







お・す・す・め・!

1

目次1 マコトのプロフィール1.1 自己紹介1.2 タイトルの由来1.3 サブタイトルの由来1.4 趣味1.5 ほしいも ...

-Python, プログラム, 投資

Copyright© 心ゆく日並 , 2020 AllRights Reserved.