#!/usr/bin/env python
# coding: utf-8
# In[1]:
#from tiingo import TiingoClient
import pandas as pd
from google.cloud import storage
import os
from datetime import datetime, timedelta
import joblib
import yfinance as yf
from datetime import datetime, timedelta
import re
from flask import Flask
app = Flask(__name__)
# In[2]:
#!pip install simplejson
#!pip install google.cloud
#!pip install google-cloud
#!pip install google-cloud-storage
# In[3]:
#yf.__version__
#import numpy as np
#np.__version__
# In[4]:
#=========== Set directory ==============
PROJECT_DIR = '/app/TimeSeries/'
LIVE_PRICE_DIR = '/app/TimeSeries/04_live/spy500/price/'
LIVE_FEATURE_DIR = '/app/TimeSeries/04_live/spy500/feature/'
LIVE_PREDICTION_DIR = '/app/TimeSeries/04_live/spy500/prediction/'
LIVE_UTILS_DIR = '/app/TimeSeries/04_live/spy500/utils/'
LIVE_MODELS_DIR = '/app/TimeSeries/04_live/spy500/models/'
LIVE_REPORTS_DIR = '/app/TimeSeries/04_live/spy500/reports/'
os.chdir(PROJECT_DIR)
# In[5]:
def upload_to_gcp(FILEPATH, UPLOAD_FILEPATH, BUCKET_NAME, CRED_PATH = '04_live/spy500/utils/sheets_api.json'):
client = storage.Client.from_service_account_json(json_credentials_path=CRED_PATH)
bucket = client.get_bucket(BUCKET_NAME)
object_name_in_gcs_bucket = bucket.blob(UPLOAD_FILEPATH)
object_name_in_gcs_bucket.upload_from_filename(FILEPATH)
def download_from_gcp(FILEPATH, GCP_FILEPATH, BUCKET_NAME, CRED_PATH = '04_live/spy500/utils/sheets_api.json'):
client = storage.Client.from_service_account_json(json_credentials_path=CRED_PATH)
bucket = client.get_bucket(BUCKET_NAME)
#object_name_in_gcs_bucket = bucket.blob(UPLOAD_FILEPATH)
blob = bucket.blob(GCP_FILEPATH)
blob.download_to_filename(FILEPATH)
# In[6]:
def download_folder_from_gcp(LOCAL_FOLDER, GCP_FOLDER, BUCKET_NAME, CRED_PATH = '04_live/spy500/utils/sheets_api.json'):
client = storage.Client.from_service_account_json(json_credentials_path=CRED_PATH)
bucket = client.get_bucket(BUCKET_NAME)
blobs = bucket.list_blobs(prefix=GCP_FOLDER) # Get list of files
print(blobs)
for blob in blobs:
filename = blob.name.replace('/', '_')
filename = re.sub(":","_",filename)
blob.download_to_filename(LOCAL_FOLDER + filename) # Download
def pullYahooPrice(symbol, first_date="2002-01-01"):
if type(symbol) == list:
n = 100 #chunk row size
chunk_df = [symbol[i:i+n] for i in range(0, len(symbol), n)]
concat_dfs = []
for df in chunk_df:
try:
# set threads = True for faster performance, but tickers will fail, scipt may hang
# set threads = False for slower performance, but more tickers will succeed
#temp_df = yfinance.download(df.str.cat(sep=' '), start=first_date, threads=False)
temp_df = yf.download(' '.join(df), start=first_date, threads=True)
temp_df = temp_df.stack().reset_index()
concat_dfs.append(temp_df)
except simplejson.errors.JSONDecodeError:
pass
df_stk = pd.concat(concat_dfs)
df_stk.columns = ['date','ticker_yahoo','close_adj','close','high','low','open','volume']
else:
df_stk = yf.download(symbol, start=first_date, threads=False).reset_index()
df_stk.columns = [re.sub(" ","_",f.lower()) for f in df_stk.columns]
df_stk["symbol"] = symbol
return df_stk
def getLatestPrice(sym_vec = 'SPY', file_path = "/app/TimeSeries/04_live/spy500/price/", filename = "yfinance_spy_price.parquet.gzip"):
if filename not in os.listdir(file_path):
#-- File not generated yet
first_date = "2002-01-01"
df_stk = pullYahooPrice(sym_vec, first_date)
else:
#--- file available
df_stk_curr = pd.read_parquet(file_path+filename)
latest_available_day = df_stk_curr.date.max()
print("Latest date is "+str(latest_available_day))
first_date = pd.to_datetime(latest_available_day)+ timedelta(days=1)
if first_date>datetime.now():
print("Prices are updated, latest is "+str(latest_available_day))
df_stk = pd.read_parquet(file_path+filename)
else:
print("Pulling price data starting "+str(first_date))
df_stk_latest = pullYahooPrice(sym_vec, first_date)
df_stk = df_stk_curr.append(df_stk_latest).sort_values('date').reset_index(drop=True)
df_stk = df_stk.drop_duplicates().reset_index(drop=True)
return df_stk
def getHistReturns(df_pos, TOP_N = 5):
model_cols = [f for f in df_pos if "pred_" in f]
for k, md in enumerate(model_cols):
df_pos_sel = df_pos[~df_pos[md].isna()]
if len(df_pos_sel)>500:
df_sel = df_pos.sort_values(md, ascending = False).groupby("date").head(TOP_N).sort_values("date").reset_index(drop=True)
df_sel = df_sel[df_sel["returns"].notna()]
df_sel = df_sel[["date","returns"]]
df_sel["model_id"] = md
if k==0:
df_all = df_sel
else:
df_all = df_all.append(df_sel)
df_all_performance = df_all.groupby(["model_id","date"])["returns"].mean().reset_index()
df_all_performance.rename(columns = {"returns":"returns_"+str(TOP_N)}, inplace = True)
return df_all_performance
#print(df_all_performance)
def run_update():
print("Downloading prices")
# Download prices
fls_vec={LIVE_PRICE_DIR+"yfinance_spy_price.parquet.gzip" : "price/yfinance_spy_price.parquet.gzip",LIVE_PRICE_DIR+"yfinance_nmr_live.parquet.gzip" : "price/yfinance_nmr_live.parquet.gzip"}
for fl in fls_vec.keys():
print("Downloading "+fl)
download_from_gcp(FILEPATH = fl, GCP_FILEPATH = fls_vec[fl], BUCKET_NAME = "abolfadl-sp500-live", CRED_PATH = '04_live/spy500/utils/sheets_api.json')
# Download predictions
print("Downloading model predictions")
download_folder_from_gcp(LOCAL_FOLDER=LIVE_PREDICTION_DIR, GCP_FOLDER="prediction/", BUCKET_NAME = "abolfadl-sp500-live", CRED_PATH = '04_live/spy500/utils/sheets_api.json')
# # Update SPY
# In[7]:
filename = "yfinance_spy_price.parquet.gzip"
file_path = "/app/TimeSeries/04_live/spy500/price/"
df_spy = getLatestPrice(sym_vec = 'SPY', file_path = file_path, filename = filename)
# In[8]:
df_spy = df_spy.drop_duplicates(subset=['date',"symbol"], keep="last")
# In[9]:
print("Updating SPY prices")
df_spy.to_parquet(file_path+filename)
# # Update all prices
# In[10]:
PREDICTION_DIRECTORY = "/app/TimeSeries/04_live/spy500/prediction/"
all_filename = "yfinance_nmr_live.parquet.gzip"
file_path = "/app/TimeSeries/04_live/spy500/price/"
sp500 = list(pd.read_parquet(file_path+all_filename).ticker_yahoo.unique())
df_equity = getLatestPrice(sym_vec = sp500, file_path = file_path, filename = all_filename)
# In[11]:
df_equity = df_equity.drop_duplicates(subset=['date',"ticker_yahoo"], keep="last")
df_equity.to_parquet(file_path+all_filename)
all_filename = "yfinance_nmr_live.parquet.gzip"
file_path = "/app/TimeSeries/04_live/spy500/price/"
df_equity = pd.read_parquet(file_path+all_filename)
# # Model predictions
# In[12]:
PREDICTION_DIRECTORY = "/app/TimeSeries/04_live/spy500/prediction/"
pred_fls = [f for f in os.listdir(PREDICTION_DIRECTORY) if "csv" in f]
for k, fl in enumerate(pred_fls):
print(fl)
df_pred_curr = pd.read_csv(PREDICTION_DIRECTORY+fl).drop_duplicates()
print(df_pred_curr.shape)
if k == 0:
df_pred = df_pred_curr
else:
df_pred = df_pred.append(df_pred_curr)
df_pred = df_pred.reset_index(drop=True)
df_pred["date"] = pd.to_datetime(df_pred["date"])
# In[13]:
PREDICTION_WINDOW = 14
df_pred["target_date"] = (pd.to_datetime(df_pred["price_date"]) + timedelta(days=PREDICTION_WINDOW)).astype(str)
df_equity["date_jn"] = df_equity["date"].astype(str)
df_pos = df_pred.merge(df_equity[["date_jn","ticker_yahoo","close_adj"]] , left_on = ["target_date","ticker_yahoo"] , right_on = ["date_jn","ticker_yahoo"] , how = "left").sort_values("date")
df_pos.rename(columns = {"close_adj":"position_price"}, inplace = True)
df_latest_price = df_equity.loc[df_equity["date"]==df_equity["date"].max(),["ticker_yahoo","close_adj"]].rename(columns = {"close_adj":"latest_price"})
df_pos = df_pos.merge(df_latest_price[["ticker_yahoo","latest_price"]], left_on = ["ticker_yahoo"], right_on =["ticker_yahoo"] , how = "left")
df_pos.loc[(df_pos["position_price"].isna()) & (~df_pos["latest_price"].isna()),"position_price"] = df_pos.loc[(df_pos["position_price"].isna()) & (~df_pos["latest_price"].isna()),"latest_price"]
df_pos[["date","entry_price","target_date","position_price","latest_price"]]
df_pos["returns"] = 100*(df_pos["position_price"]-df_pos["entry_price"])/df_pos["entry_price"]
for k, top_n in enumerate([5,10]):
df_c = getHistReturns(df_pos, TOP_N = top_n)
if k == 0:
df_all = df_c
else:
df_all = df_all.merge(df_c,on = ["model_id","date"] ,how = "outer").reset_index(drop=True)
#df_all.groupby("model_id").mean().reset_index().sort_values("returns_10", ascending = False)
# In[14]:
df_aggregate_model_performance = df_all.groupby("model_id").mean().reset_index().sort_values("returns_10", ascending = False)
# # Dump performance tables
# In[15]:
MODEL_PERF_DIR = "/app/TimeSeries/04_live/spy500/model_performance/"
#df_all.to_csv(MODEL_PERF_DIR+"live_detailed_model_performance.csv", index = False)
#df_pos.to_csv(MODEL_PERF_DIR+"raw_master_model.csv", index = False)
# In[16]:
df_pos.to_csv(MODEL_PERF_DIR+"raw_master_model.csv", index = False)
# In[17]:
upload_to_gcp(MODEL_PERF_DIR+'raw_master_model.csv', 'model_performance/'+'raw_master_model.csv', 'abolfadl-sp500-live', CRED_PATH = '04_live/spy500/utils/sheets_api.json')
# In[18]:
df_all.head()
# In[19]:
df_aggregate_model_performance.head()
# In[20]:
df_pos[[f for f in df_pos.columns if "pred" not in f]].head()
# In[21]:
df_pos_spy = df_pos[[f for f in df_pos.columns if "pred" not in f]].merge(df_spy[["date","adj_close"]], on = "date", how = "left").rename(columns = {"adj_close":"entry_price_spy"})
# In[22]:
df_pos_spy["target_date"] = pd.to_datetime(df_pos_spy["target_date"])
# In[23]:
df_pos_spy = df_pos_spy.merge(df_spy[["date","adj_close"]], left_on = "target_date", right_on = "date",how = "left").rename(columns = {"adj_close":"position_price_spy"})
# In[24]:
#df_pos_spy = df_pos_spy.drop(["date_x"], axis =1).rename(columns = {"date_y":"date"})
# In[25]:
latest_spy_price = df_spy.tail(1).loc[:,"adj_close"].values[0]
# In[26]:
df_pos_spy.loc[ df_pos_spy["position_price_spy"].isna() ,"position_price_spy"] = latest_spy_price
# In[27]:
df_pos_spy["returns_spy"] = 100*(df_pos_spy["position_price_spy"]-df_pos_spy["entry_price_spy"])/df_pos_spy["entry_price_spy"]
# In[28]:
df_spy_perf = df_pos_spy.groupby("price_date")["returns_spy"].first().reset_index()
df_spy_perf["date"] = pd.to_datetime(df_spy_perf["price_date"])
df_spy_perf.drop("price_date", axis =1, inplace = True)
df_spy_perf
# In[29]:
df_all_spy = df_all.merge(df_spy_perf, on = "date", how = "left").sort_values("date")
# In[30]:
df_all_spy["update_timestamp"] = datetime.now()
df_all_spy.tail(4)
# In[31]:
df_all_spy.to_csv(MODEL_PERF_DIR+"live_historized_model_performance_spy.csv", index = False)
# In[32]:
upload_to_gcp(MODEL_PERF_DIR+'live_historized_model_performance_spy.csv', 'model_performance/'+'live_historized_model_performance_spy.csv', 'abolfadl-sp500-live', CRED_PATH = '04_live/spy500/utils/sheets_api.json')
# In[33]:
df_all_spy_agg = df_all_spy.groupby("model_id")[["returns_5","returns_10","returns_spy"]].mean().reset_index().sort_values("returns_10", ascending = False)
df_all_spy_agg["update_timestamp"] = datetime.now()
df_all_spy_agg.head(10)
# In[34]:
df_all_spy_agg.to_csv(MODEL_PERF_DIR+"live_historized_model_performance_spy_agg.csv", index = False)
# In[35]:
upload_to_gcp(MODEL_PERF_DIR+'live_historized_model_performance_spy_agg.csv', 'model_performance/'+'live_historized_model_performance_spy_agg.csv', 'abolfadl-sp500-live', CRED_PATH = '04_live/spy500/utils/sheets_api.json')
# ---
# In[36]:
import sys
print(sys.executable)
print(sys.version)
print(sys.version_info)
# In[ ]:
@app.route('/')
def entry():
run_update()
return 'Hey, we have Flask in a Docker container!'
if __name__ == '__main__':
#app.run(debug=True, host='0.0.0.0')
app.run(host ='0.0.0.0', port = 8080)
5. Add any supporting folders/files you need to pack. The project should look as follows