ども♪マコトです。
機械学習の勉強をしていたら、「Machine Learning with Financial Time Series Data on Google Cloud Platform」と言う、google先生がTensorFlowを使ってS&P500の株価予想をしているプログラムを見つけました。
google曰く、以下の結果が出たそうです。
Precision = 0.775862068966 Recall = 0.625 F1 Score = 0.692307692308 Accuracy = 0.722222222222
Accuracyが正解率なので、72%で当たり外れを当てれるんだそうです。
まあ、5年ぐらい前の結果ですがね。
元のプログラムはGoogle Cloud Platformで作っているようですが、登録するのも面倒くさかったので、解析してPythonでプログラムを組んでみました。
プログラムで使用するデータは米国yahooから取得しています。
データを取得するプログラムは米国yahooから情報を取得する方法【2019/08 】で書きましたので解析して見てください。
使うデータを変えたらもっと正解率が上がるかも?
以下がプログラムになります。
-*- coding: utf-8 -*- ''' googleの株価予想 S&P500 Code based on: https://github.com/corrieelston/datalab/blob/master/FinancialTimeSeriesTensorFlow.ipynb ''' from __future__ import print_function import datetime import urllib.request, urllib.parse, urllib.error from os import path import operator as op from collections import namedtuple import numpy as np import pandas as pd import tensorflow as tf import argparse import requests import time import datetime as dt from datetime import datetime, date import re import os import Cls_Connect as Conn DAYS_BACK = 3 FROM_YEAR = '1991' cookie = '' crumb = '' ROOT_DIR = r"C:\data" EXCHANGES_DEFINE = [ ['DOW', '^DJI','%5EDJI'], #NYダウ平均株価 ['FTSE', '^FTSE','%5EFTSE'], #英国大型株100 ['GDAXI', '^GDAXI','%5EGDAXI'], #ドイツ30 ['HSI', '^HSI','%5EHSI'], #香港ハンセン株価指数 ['N225', '^N225','%5EN225'], #日経平均株価 ['NASDAQ', '^IXIC','%5EIXIC'], #NASDAQ ['SP500', '^GSPC','%5EGSPC'], #S&P 500 (S&P) ['SSEC', '000001.SS','000001.SS'], #上海総合指数 インデックス ] EXCHANGES_LABEL = [exchange[0] for exchange in EXCHANGES_DEFINE] Dataset = namedtuple( 'Dataset', 'training_predictors training_classes test_predictors test_classes') Environ = namedtuple('Environ', 'sess model actual_classes training_step dataset feature_data') #yahooのcrumbとcookieを取得する処理を追加 def get_yahoo_crumb_cookie(): """Get Yahoo crumb cookie value.""" res = requests.get('https://finance.yahoo.com/quote/SPY/history') yahoo_cookie = res.cookies['B'] yahoo_crumb = None pattern = re.compile(r'.*"CrumbStore":\{"crumb":"(?P<crumb>[^"]+)"\}') for line in res.text.splitlines(): m = pattern.match(line) if m is not None: yahoo_crumb = m.groupdict()['crumb'] return yahoo_cookie, yahoo_crumb def fetchYahooFinance(name, code): fileName = 'index_%s.csv' % name start = int(time.mktime(dt.datetime(int(FROM_YEAR),1,2).timetuple())) end = int(time.mktime(date.today().timetuple())) url = 'https://query1.finance.yahoo.com/v7/finance/download/%s?period1=%s&period2=%s&interval=1d&events=history&crumb=%s' % (code, start, end, crumb) data = requests.get(url, cookies={'B':cookie}) saveFilePath = os.path.join(ROOT_DIR, fileName) with open(saveFilePath, 'wb') as saveFile: saveFile.write(data.content) def fetchStockIndexes(): '''株価指標のデータをダウンロードしファイルに保存 ''' global cookie global crumb cookie, crumb = get_yahoo_crumb_cookie() for exchange in EXCHANGES_DEFINE: fetchYahooFinance(exchange[0], exchange[2]) def load_exchange_dataframes(): '''EXCHANGESに対応するCSVファイルをPandasのDataFrameとして読み込む。 Returns: {EXCHANGES[n]: pd.DataFrame()} ''' return {exchange: load_exchange_dataframe(exchange) for exchange in EXCHANGES_LABEL} def load_exchange_dataframe(exchange): '''exchangeに対応するCSVファイルをPandasのDataFrameとして読み込む。 Args: exchange: 指標名 Returns: pd.DataFrame() ''' return pd.read_csv(os.path.join(ROOT_DIR,'index_{}.csv'.format(exchange))).set_index('Date').sort_index() def get_closing_data(dataframes): '''各指標の終値カラムをまとめて1つのDataFrameに詰める。 Args: dataframes: {key: pd.DataFrame()} Returns: pd.DataFrame() ''' closing_data = pd.DataFrame() for exchange, dataframe in dataframes.items(): closing_data[exchange] = dataframe['Close'] closing_data = closing_data.fillna(method='ffill') return closing_data def get_log_return_data(closing_data): '''各指標について、終値を1日前との比率の対数をとって正規化する。 Args: closing_data: pd.DataFrame() Returns: pd.DataFrame() ''' log_return_data = pd.DataFrame() for exchange in closing_data: # np.log(当日終値 / 前日終値) で前日からの変化率を算出 # 前日よりも上がっていればプラス、下がっていればマイナスになる log_return_data[exchange] = np.log(closing_data[exchange]/closing_data[exchange].shift()) return log_return_data def build_training_data(log_return_data, target_exchange, max_days_back=DAYS_BACK, use_subset=None): '''学習データを作る。分類クラスは、target_exchange指標の終値が前日に比べて上ったか下がったかの2つである。 また全指標の終値の、当日から数えてmax_days_back日前までを含めて入力データとする。 Args: log_return_data: pd.DataFrame() target_exchange: 学習目標とする指標名 max_days_back: 何日前までの終値を学習データに含めるか use_subset (float): 短時間で動作を確認したい時用: log_return_dataのうち一部だけを学習データに含める Returns: pd.DataFrame() ''' # 「上がる」「下がる」の結果を計算 columns = [] for colname, exchange, operator in iter_categories(target_exchange): columns.append(colname) # 全ての XXX_positive, XXX_negative を 0 に初期化 log_return_data[colname] = 0 # XXX_positive の場合は >= 0 の全てのインデックスを # XXX_negative の場合は < 0 の全てのインデックスを取得し、それらに 1 を設定する indices = operator(log_return_data[exchange], 0) log_return_data.ix[indices, colname] = 1 num_categories = len(columns) # 各指標のカラム名を追加 for colname, _, _ in iter_exchange_days_back(target_exchange, max_days_back): columns.append(colname) ''' columns には計算対象の positive, negative と各指標の日数分のラベルが含まれる 例:[ 'SP500_positive', 'SP500_negative', 'DOW_1', 'DOW_2', 'DOW_3', 'FTSE_0', 'FTSE_1', 'FTSE_2', 'GDAXI_0', 'GDAXI_1', 'GDAXI_2', 'HSI_0', 'HSI_1', 'HSI_2', 'N225_0', 'N225_1', 'N225_2', 'NASDAQ_1', 'NASDAQ_2', 'NASDAQ_3', 'SP500_1', 'SP500_2', 'SP500_3', 'SSEC_0', 'SSEC_1', 'SSEC_2' ] 計算対象の データ だけ当日のデータを含めたらダメなので1〜3が入る ''' # データ数をもとめる max_index = len(log_return_data) - max_days_back if use_subset is not None: # データを少なくしたいとき max_index = int(max_index * use_subset) # 学習データを作る training_test_data = pd.DataFrame(columns=columns) for i in range(max_days_back + 10, max_index): # 先頭のデータを含めるとなぜか上手くいかないので max_days_back + 10 で少し省く values = {} # 「上がる」「下がる」の答を入れる for colname, _, _ in iter_categories(target_exchange): values[colname] = log_return_data[colname].ix[i] # 学習データを入れる for colname, exchange, days_back in iter_exchange_days_back(target_exchange, max_days_back): values[colname] = log_return_data[exchange].ix[i - days_back] training_test_data = training_test_data.append(values, ignore_index=True) return num_categories, training_test_data def iter_categories(target_exchange): '''分類クラス名とその値を計算するためのオペレーター関数を列挙する。 ''' for polarity, operator in [ ('positive', op.ge), # >= ('negative', op.lt), # < ]: colname = '{}_{}'.format(target_exchange, polarity) yield colname, target_exchange, operator def iter_exchange_days_back(target_exchange, max_days_back): '''指標名、何日前のデータを読むか、カラム名を列挙する。 ''' for exchange in EXCHANGES_LABEL: # ターゲット の結果を予測するのに ターゲット の当日の値が含まれてはいけないので1日づらす #start_days_back = 1 if exchange == target_exchange else 0 # start_days_back = 1 # N225 で行う場合は全て前日の指標を使うようにする if exchange == "DOW" or exchange == "NASDAQ" or exchange == "SP500": start_days_back = 1 else : start_days_back = 0 end_days_back = start_days_back + max_days_back for days_back in range(start_days_back, end_days_back): colname = '{}_{}'.format(exchange, days_back) yield colname, exchange, days_back def split_training_test_data(num_categories, training_test_data): '''学習データをトレーニング用とテスト用に分割する。 ''' # 先頭2つより後ろが学習データ predictors_tf = training_test_data[training_test_data.columns[num_categories:]] # 先頭2つが「上がる」「下がる」の答えデータ classes_tf = training_test_data[training_test_data.columns[:num_categories]] # 学習用とテスト用のデータサイズを求める training_set_size = int(len(training_test_data) * 0.8) #test_set_size = len(training_test_data) - training_set_size # 古いデータ0.8を学習とし、新しいデータ0.2がテストとなる return Dataset( training_predictors=predictors_tf[:training_set_size], training_classes=classes_tf[:training_set_size], test_predictors=predictors_tf[training_set_size:], test_classes=classes_tf[training_set_size:], ) def tf_confusion_metrics(model, actual_classes, session, feed_dict): '''与えられたネットワークの正解率などを出力する。 ''' predictions = tf.argmax(model, 1) actuals = tf.argmax(actual_classes, 1) ones_like_actuals = tf.ones_like(actuals) zeros_like_actuals = tf.zeros_like(actuals) ones_like_predictions = tf.ones_like(predictions) zeros_like_predictions = tf.zeros_like(predictions) tp_op = tf.reduce_sum( tf.cast( tf.logical_and( tf.equal(actuals, ones_like_actuals), tf.equal(predictions, ones_like_predictions) ), "float" ) ) tn_op = tf.reduce_sum( tf.cast( tf.logical_and( tf.equal(actuals, zeros_like_actuals), tf.equal(predictions, zeros_like_predictions) ), "float" ) ) fp_op = tf.reduce_sum( tf.cast( tf.logical_and( tf.equal(actuals, zeros_like_actuals), tf.equal(predictions, ones_like_predictions) ), "float" ) ) fn_op = tf.reduce_sum( tf.cast( tf.logical_and( tf.equal(actuals, ones_like_actuals), tf.equal(predictions, zeros_like_predictions) ), "float" ) ) tp, tn, fp, fn = session.run( [tp_op, tn_op, fp_op, fn_op], feed_dict ) tpr = float(tp)/(float(tp) + float(fn)) #fpr = float(fp)/(float(tp) + float(fn)) accuracy = (float(tp) + float(tn))/(float(tp) + float(fp) + float(fn) + float(tn)) recall = tpr if (float(tp) + float(fp)): precision = float(tp)/(float(tp) + float(fp)) f1_score = (2 * (precision * recall)) / (precision + recall) else: precision = 0 f1_score = 0 print('Precision = ', precision) print('Recall = ', recall) print('F1 Score = ', f1_score) print('Accuracy = ', accuracy) def simple_network(dataset): '''単純な分類モデルを返す。 ''' sess = tf.Session() # Define variables for the number of predictors and number of classes to remove magic numbers from our code. num_predictors = len(dataset.training_predictors.columns) num_classes = len(dataset.training_classes.columns) # Define placeholders for the data we feed into the process - feature data and actual classes. feature_data = tf.placeholder("float", [None, num_predictors]) actual_classes = tf.placeholder("float", [None, num_classes]) # Define a matrix of weights and initialize it with some small random values. weights = tf.Variable(tf.truncated_normal([num_predictors, num_classes], stddev=0.0001)) biases = tf.Variable(tf.ones([num_classes])) # Define our model... # Here we take a softmax regression of the product of our feature data and weights. model = tf.nn.softmax(tf.matmul(feature_data, weights) + biases) # Define a cost function (we're using the cross entropy). cost = -tf.reduce_sum(actual_classes * tf.log(model)) # Define a training step... # Here we use gradient descent with a learning rate of 0.01 using the cost function we just defined. training_step = tf.train.AdamOptimizer(learning_rate=0.0001).minimize(cost) init = tf.initialize_all_variables() sess.run(init) return Environ( sess=sess, model=model, actual_classes=actual_classes, training_step=training_step, dataset=dataset, feature_data=feature_data, ) def smarter_network(dataset): '''隠しレイヤー入りのもうちょっと複雑な分類モデルを返す。 ''' sess = tf.compat.v1.Session() num_predictors = len(dataset.training_predictors.columns) num_classes = len(dataset.training_classes.columns) feature_data = tf.compat.v1.placeholder("float", [None, num_predictors]) actual_classes = tf.compat.v1.placeholder("float", [None, num_classes]) weights1 = tf.compat.v1.Variable(tf.random.truncated_normal([(DAYS_BACK * len(EXCHANGES_DEFINE)), 100], stddev=0.0001)) biases1 = tf.compat.v1.Variable(tf.ones([100])) weights2 = tf.compat.v1.Variable(tf.truncated_normal([100, 50], stddev=0.0001)) biases2 = tf.compat.v1.Variable(tf.ones([50])) weights3 = tf.compat.v1.Variable(tf.truncated_normal([50, 25], stddev=0.0001)) biases3 = tf.compat.v1.Variable(tf.ones([25])) weights4 = tf.compat.v1.Variable(tf.truncated_normal([25, 2], stddev=0.0001)) biases4 = tf.compat.v1.Variable(tf.ones([2])) hidden_layer_1 = tf.nn.relu(tf.matmul(feature_data, weights1) + biases1) hidden_layer_2 = tf.nn.relu(tf.matmul(hidden_layer_1, weights2) + biases2) hidden_layer_3 = tf.nn.relu(tf.matmul(hidden_layer_2, weights3) + biases3) model = tf.nn.softmax(tf.matmul(hidden_layer_3, weights4) + biases4) cost = -tf.reduce_sum(actual_classes*tf.math.log(model)) training_step = tf.compat.v1.train.AdamOptimizer(learning_rate=0.0001).minimize(cost) init = tf.initialize_all_variables() sess.run(init) return Environ( sess=sess, model=model, actual_classes=actual_classes, training_step=training_step, dataset=dataset, feature_data=feature_data, ) def train(env, steps=30000, checkin_interval=5000): '''学習をsteps回おこなう。 ''' correct_prediction = tf.equal( tf.argmax(env.model, 1), tf.argmax(env.actual_classes, 1)) accuracy = tf.reduce_mean(tf.cast(correct_prediction, "float")) for i in range(1, 1 + steps): env.sess.run( env.training_step, feed_dict=feed_dict(env, test=False), ) if i % checkin_interval == 0: print(i, env.sess.run( accuracy, feed_dict=feed_dict(env, test=False), )) tf_confusion_metrics(env.model, env.actual_classes, env.sess, feed_dict(env, True)) def feed_dict(env, test=False): '''学習/テストに使うデータを生成する。 ''' prefix = 'test' if test else 'training' predictors = getattr(env.dataset, '{}_predictors'.format(prefix)) classes = getattr(env.dataset, '{}_classes'.format(prefix)) return { env.feature_data: predictors.values, env.actual_classes: classes.values.reshape(len(classes.values), len(classes.columns)) } def main(args): print('株価指標データをダウンロードしcsvファイルに保存') fetchStockIndexes() print('株価指標データを読み込む') all_data = load_exchange_dataframes() print('終値を取得') closing_data = get_closing_data(all_data) print('データを学習に使える形式に正規化') log_return_data = get_log_return_data(closing_data) print('答と学習データを作る') num_categories, training_test_data = build_training_data( log_return_data, args.target_exchange, use_subset=args.use_subset) #各列ごとに平均や標準偏差、最大値、最小値、最頻値などの要約統計量を取得 print(training_test_data.describe()) print('学習データをトレーニング用とテスト用に分割する') dataset = split_training_test_data(num_categories, training_test_data) #各列ごとに平均や標準偏差、最大値、最小値、最頻値などの要約統計量を取得 print(dataset.test_predictors.describe()) print(dataset.training_predictors.describe()) print('機械学習のネットワークを作成') #env = simple_network(dataset) env = smarter_network(dataset) if args.inspect: import code print('Press Ctrl-d to proceed') code.interact(local=locals()) print('学習') train(env, steps=args.steps, checkin_interval=args.checkin) if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('target_exchange', choices=EXCHANGES_LABEL) parser.add_argument('--steps', type=int, default=10000) parser.add_argument('--checkin', type=int, default=1000) parser.add_argument('--use-subset', type=float, default=None) parser.add_argument('--inspect', type=bool, default=False) args = parser.parse_args() #追加 # プロキシの設定 Con = Conn.Connect() os.environ["http_proxy"] = Con.http_proxy() os.environ["https_proxy"] = Con.https_proxy() main(args)
去年の12月頃に作って動かしたときは大体正解率が69%ぐらいでした。
2015年のgoogleが発表したときに比べると若干正解率が下がっていました。
コメント