ども♪マコトです。
以前、EDINETのAPIを使って企業の財務情報を取得する④:PythonとEDINETのAPIを使って書類を取得するでEDINETのAPIを使って書類を作るプログラムを作ってみました。
ただ、EDINETのAPIって日で取得することはできても銘柄で取得することができないんですよね。
不便だな〜と思っているところにUfocatchと言うサービスのAPIを使用すると、銘柄でEDINETのデータを取得できると知ったので作ってみました。
ただ、EDINETにあるデータが3年程度なので3年前までの有価証券などのデータしか取得できません。
以下がプログラム
""" UfocatchのAPIを使用してEDINETの有価証券報告書などを取得する https://resource.ufocatch.com/ """ # coding: utf-8 import requests import xml.etree.ElementTree as ET from collections import defaultdict import json import os import sys from zipfile import ZipFile from io import BytesIO import Cls_Connect as Conn import mysql.connector as sqlc import time import datetime #ダウンロードフォルダ DOWNLOAD_SAVE_DIR = r"C:\data" #zipを解凍しない 0、zipを解凍する 1 ZipFlg = 1 def get_link_info_str(ticker_symbol, base_url): try: url = base_url+ticker_symbol response = requests.get(url) data = response.text except Exception: e = sys.exc_info()[1] print("エラー",e.args) if e.args[0].reason.args[1].args[0] == 10054: time.sleep(10) data = get_link_info_str(ticker_symbol, base_url) finally: return data def get_link(tree, namespace): #print ET.tostring(tree) yuho_dict = defaultdict(dict) for el in tree.findall('.//'+namespace+'entry'): title = el.find(namespace+'title').text if not is_yuho(title): continue print('writing:',title[:30],'...') _id = el.find(namespace+'id').text link = el.find('./'+namespace+'link[@type="application/zip"]') url = link.attrib['href'] yuho_dict[_id] = {'id':_id,'title':title,'url':url} return yuho_dict def is_yuho(title): #if u'有価証券報告書' in title: if not (title.find('有価証券報告書') == -1 and title.find('半期報告書') == -1 and title.find('四半期報告書') == -1 ) : return True else: return False def write_download_info(ofname,dat_download): with open(ofname,'w') as of: json.dump(dat_download, of,ensure_ascii=False, indent=4) def download_all_xbrl_files(download_info_dict,directory_path): for ticker_symbol, info_dicts in download_info_dict.items(): save_path = os.path.join(directory_path, ticker_symbol) if not os.path.exists(save_path): os.mkdir(save_path) for _id, info_dict in info_dicts.items(): save_file = os.path.join(save_path, _id) if not os.path.exists(save_file): #フォルダが存在しなかったファイルのみ取得する download_xbrl_file(info_dict['url'],_id,save_path) def download_xbrl_file(url,_id,save_path): try: res = requests.get(url) if res.ok: path = save_path+'/'+_id if ZipFlg == 0 : with open(path +'.zip', 'wb') as f: #念の為、小まめに書き込み、メモリに優しくしておく for chunk in res.iter_content(chunk_size=1024): f.write(chunk) else: if not os.path.exists(path): os.mkdir(path) z = ZipFile(BytesIO(res.content)) z.extractall(path) except Exception: e = sys.exc_info()[1] print("エラー",e.args) if e.args[0].reason.args[1].args[0] == 10054: time.sleep(10) download_xbrl_file(url,_id,save_path) finally: print('終了' + str(datetime.datetime.today())) def main(t_symbols): for t_symbol in t_symbols: t_symbol = str(t_symbol[0]) response_string = get_link_info_str(t_symbol, base_url) ET_tree = ET.fromstring( response_string ) ET.register_namespace('',namespace[1:-1]) dat_download = defaultdict(dict) # get download file info info_dict = get_link(ET_tree,namespace) dat_download[t_symbol] = info_dict #保存するディレクトリ saveFileName = "dat_download_" + t_symbol + ".json" saveFilePath = os.path.join(DOWNLOAD_SAVE_DIR, saveFileName) #saveFilePath = os.getcwd()+'/downloaded_info/dat_download_'+t_symbol+'.json' write_download_info(saveFilePath,dat_download) #directory_path = os.getcwd()+'/xbrl_files/' directory_path = DOWNLOAD_SAVE_DIR download_all_xbrl_files(dat_download,directory_path) print(t_symbol + str(datetime.datetime.today())) def getmarketCode(): #ここに取得したい企業のコードを入力する t_symbols = ('1301','2432','7267') return t_symbols if __name__=='__main__': try: # プロキシの設定 Con = Conn.Connect() os.environ["http_proxy"] = Con.http_proxy() os.environ["https_proxy"] = Con.https_proxy() base_url = 'http://resource.ufocatch.com/atom/edinetx/query/' namespace = '{http://www.w3.org/2005/Atom}' print('開始' + str(datetime.datetime.today())) #マーケットのコードを取得する t_symbols = getmarketCode() #xbrlを取得するメイン処理 main(t_symbols) except Exception: e = sys.exc_info()[1] print("エラー",e.args) finally: print('終了' + str(datetime.datetime.today()))
以下のところに取得したい銘柄コードを入力すれば取得できます。
def getmarketCode(): #ここに取得したい企業のコードを入力する t_symbols = ('1301','2432','7267') return t_symbols
質問などがありましたら連絡ください。
お待ちしています!
コメント