Saturday, August 21, 2021

Deploy Python container on Kubernetes cluster on GCP

 Source

https://www.youtube.com/watch?v=GKuk-TBmNcI


  • Purpose

The goal of this blog is to show how a docker container can be deployed on a Kubernetes cluster. The motivation of deploying containers is as follows

Once you build a container, you may want to scale it up and down according to the changing demand. TO do that, Kubernetes has a Loadbalancer which distributes http requests over a set of Nodes.


Kubernetes architecture

A Kubernetes cluster is formed of nodes. A node can be either worker or master node



Within each Node there are multiple pods. A container is deployed on a pod and the load balancer distributes the requests across the nodes and pods



Build a container

  • On cloud shell create a folder and place the following files




  • Thats how the Dockerfile looks like




  • Build the container using gcloud command




  • Create a Kubernetes cluster using this command- Change the number of nodes as you wish




  • You can change the number of nodes by modifying the deployment.yaml file as follows


  • Then apply this as follows


  • To modify the ports of the loadbalancer edit the services.yaml file as follows



  • To get the IP to call type the following command




  • Just place this IP in the browser and the container will run













Friday, August 20, 2021

Run python containers on GCP

 This guide is to run a python job from a docker container that can be triggered from calling a URL. I used it for the following project

1. The container is invoked every 15 minutes to refresh the stock prices

2. The prices are dumped into a gcp storage bucket

3. The prices are visualized on a datastudio dashboard


Step 1: Build a container with Python code

1. On gcloud open the console. This has docker prebuilt so its the easiest way to build containers

2. Create folder with project name

mkdir project

cd project

3. Create Docker file with the following contents. This is inspired by the following guthub repository

https://github.com/docker-for-data-science/docker-for-data-science-tutorial

Filename: Dockerfile

FROM python:3.6.5-slim

# Meta-data
#LABEL maintainer="Aly Sivji <alysivji@gmail.com>" \
#      description="Data Science Workflow #1: Self-Contained Container\
#      Libraries, data, and code in one image"

# Set the working directory to /app
WORKDIR /app

# Copy the current directory contents into the container at /app
COPY . /app

# Install the required libraries
RUN pip install --upgrade pip
RUN pip --no-cache-dir install pandas==0.24.2 google-cloud-storage simplejson pyarrow yfinance joblib Flask==0.10.1
#seaborn sklearn jupyter

# Make port 8888 available to the world outside this container
EXPOSE 8080

# Run jupyter when container launches
#CMD ["jupyter", "notebook", "--ip='*'", "--port=8888", "--no-browser", "--allow-root"]
#CMD python
#CMD ["bash"]
CMD ["python","main.py"]

4. In the folder, create the python file to run
#!/usr/bin/env python
# coding: utf-8

# In[1]:


#from tiingo import TiingoClient
import pandas as pd
from google.cloud import storage
import os
from datetime import datetime, timedelta
import joblib
import yfinance as yf
from datetime import datetime, timedelta
import re
from flask import Flask
app = Flask(__name__)


# In[2]:


#!pip install simplejson
#!pip install google.cloud
#!pip install google-cloud
#!pip install google-cloud-storage


# In[3]:


#yf.__version__
#import numpy as np
#np.__version__


# In[4]:


#=========== Set directory ==============
PROJECT_DIR = '/app/TimeSeries/'
LIVE_PRICE_DIR = '/app/TimeSeries/04_live/spy500/price/'
LIVE_FEATURE_DIR = '/app/TimeSeries/04_live/spy500/feature/'
LIVE_PREDICTION_DIR = '/app/TimeSeries/04_live/spy500/prediction/'
LIVE_UTILS_DIR = '/app/TimeSeries/04_live/spy500/utils/'
LIVE_MODELS_DIR = '/app/TimeSeries/04_live/spy500/models/'
LIVE_REPORTS_DIR = '/app/TimeSeries/04_live/spy500/reports/'
os.chdir(PROJECT_DIR)


# In[5]:


def upload_to_gcp(FILEPATHUPLOAD_FILEPATHBUCKET_NAMECRED_PATH = '04_live/spy500/utils/sheets_api.json'):
    client = storage.Client.from_service_account_json(json_credentials_path=CRED_PATH)
    bucket = client.get_bucket(BUCKET_NAME)

    object_name_in_gcs_bucket = bucket.blob(UPLOAD_FILEPATH)
    object_name_in_gcs_bucket.upload_from_filename(FILEPATH)

def download_from_gcp(FILEPATHGCP_FILEPATHBUCKET_NAMECRED_PATH = '04_live/spy500/utils/sheets_api.json'):
    client = storage.Client.from_service_account_json(json_credentials_path=CRED_PATH)
    bucket = client.get_bucket(BUCKET_NAME)

    #object_name_in_gcs_bucket = bucket.blob(UPLOAD_FILEPATH)

    blob = bucket.blob(GCP_FILEPATH)
    blob.download_to_filename(FILEPATH)
# In[6]:
def download_folder_from_gcp(LOCAL_FOLDERGCP_FOLDERBUCKET_NAMECRED_PATH = '04_live/spy500/utils/sheets_api.json'):
    client = storage.Client.from_service_account_json(json_credentials_path=CRED_PATH)
    bucket = client.get_bucket(BUCKET_NAME)
    blobs = bucket.list_blobs(prefix=GCP_FOLDER) # Get list of files
    print(blobs)
    for blob in blobs:
        filename = blob.name.replace('/''_'
        filename = re.sub(":","_",filename)
        blob.download_to_filename(LOCAL_FOLDER + filename)  # Download

def pullYahooPrice(symbolfirst_date="2002-01-01"):
        if type(symbol) == list:
            n = 100  #chunk row size
            chunk_df = [symbol[i:i+n] for i in range(0len(symbol), n)]        
            concat_dfs = []
            for df in chunk_df:
                 try:
                     # set threads = True for faster performance, but tickers will fail, scipt may hang
                     # set threads = False for slower performance, but more tickers will succeed

                     #temp_df = yfinance.download(df.str.cat(sep=' '), start=first_date, threads=False)
                     temp_df = yf.download(' '.join(df), start=first_date, threads=True)

                     temp_df = temp_df.stack().reset_index()
                     concat_dfs.append(temp_df)
                 except simplejson.errors.JSONDecodeError:
                     pass
            df_stk = pd.concat(concat_dfs)        
            df_stk.columns = ['date','ticker_yahoo','close_adj','close','high','low','open','volume']     
            
        else:
            
            df_stk = yf.download(symbol, start=first_date, threads=False).reset_index()
            df_stk.columns = [re.sub(" ","_",f.lower()) for f in df_stk.columns]
            df_stk["symbol"] = symbol
        return df_stk

    

def getLatestPrice(sym_vec = 'SPY'file_path = "/app/TimeSeries/04_live/spy500/price/"filename = "yfinance_spy_price.parquet.gzip"):
    
    if filename not in os.listdir(file_path):
        #-- File not generated yet
        
        
        first_date = "2002-01-01"
        df_stk = pullYahooPrice(sym_vec, first_date)
        
        
    else:
        #--- file available
        df_stk_curr = pd.read_parquet(file_path+filename)
        latest_available_day = df_stk_curr.date.max()
        print("Latest date is "+str(latest_available_day))
        first_date = pd.to_datetime(latest_available_day)+ timedelta(days=1)
        if first_date>datetime.now():
            print("Prices are updated, latest is "+str(latest_available_day))
            df_stk = pd.read_parquet(file_path+filename)
            
        else:
            print("Pulling price data starting "+str(first_date))
            df_stk_latest = pullYahooPrice(sym_vec, first_date)
            df_stk = df_stk_curr.append(df_stk_latest).sort_values('date').reset_index(drop=True)
            df_stk = df_stk.drop_duplicates().reset_index(drop=True)

    return df_stk
    
def getHistReturns(df_posTOP_N = 5):
    model_cols = [f for f in df_pos if "pred_" in f]
    for k, md in enumerate(model_cols):
        df_pos_sel = df_pos[~df_pos[md].isna()]
        if len(df_pos_sel)>500:
            df_sel = df_pos.sort_values(md, ascending = False).groupby("date").head(TOP_N).sort_values("date").reset_index(drop=True)
            df_sel = df_sel[df_sel["returns"].notna()]
            df_sel = df_sel[["date","returns"]]
            df_sel["model_id"] = md
            if k==0:
                df_all = df_sel
            else:
                df_all = df_all.append(df_sel)
    df_all_performance = df_all.groupby(["model_id","date"])["returns"].mean().reset_index()
    df_all_performance.rename(columns = {"returns":"returns_"+str(TOP_N)}, inplace = True)
    return df_all_performance
    #print(df_all_performance)


def run_update():

    print("Downloading prices")
    # Download prices
    fls_vec={LIVE_PRICE_DIR+"yfinance_spy_price.parquet.gzip" : "price/yfinance_spy_price.parquet.gzip",LIVE_PRICE_DIR+"yfinance_nmr_live.parquet.gzip" : "price/yfinance_nmr_live.parquet.gzip"}

    for fl in fls_vec.keys():
        print("Downloading "+fl)
        download_from_gcp(FILEPATH = fl, GCP_FILEPATH = fls_vec[fl], BUCKET_NAME = "abolfadl-sp500-live"CRED_PATH = '04_live/spy500/utils/sheets_api.json')



    # Download predictions
    print("Downloading model predictions")
    download_folder_from_gcp(LOCAL_FOLDER=LIVE_PREDICTION_DIR, GCP_FOLDER="prediction/"BUCKET_NAME = "abolfadl-sp500-live"CRED_PATH = '04_live/spy500/utils/sheets_api.json')


    # # Update SPY

    # In[7]:


    filename = "yfinance_spy_price.parquet.gzip"
    file_path = "/app/TimeSeries/04_live/spy500/price/"
    df_spy = getLatestPrice(sym_vec = 'SPY'file_path = file_path, filename = filename)


    # In[8]:


    df_spy = df_spy.drop_duplicates(subset=['date',"symbol"], keep="last")


    # In[9]:


    print("Updating SPY prices")
    df_spy.to_parquet(file_path+filename)


    # # Update all prices

    # In[10]:


    PREDICTION_DIRECTORY = "/app/TimeSeries/04_live/spy500/prediction/"
    all_filename = "yfinance_nmr_live.parquet.gzip"
    file_path = "/app/TimeSeries/04_live/spy500/price/"
    sp500 = list(pd.read_parquet(file_path+all_filename).ticker_yahoo.unique())
    df_equity = getLatestPrice(sym_vec = sp500, file_path = file_path, filename = all_filename)


    # In[11]:


    df_equity = df_equity.drop_duplicates(subset=['date',"ticker_yahoo"], keep="last")
    df_equity.to_parquet(file_path+all_filename)
    all_filename = "yfinance_nmr_live.parquet.gzip"
    file_path = "/app/TimeSeries/04_live/spy500/price/"
    df_equity = pd.read_parquet(file_path+all_filename)


    # # Model predictions

    # In[12]:


    PREDICTION_DIRECTORY = "/app/TimeSeries/04_live/spy500/prediction/"
    pred_fls = [f for f in os.listdir(PREDICTION_DIRECTORY) if "csv" in f]
    for k, fl in enumerate(pred_fls):
        print(fl)
        df_pred_curr = pd.read_csv(PREDICTION_DIRECTORY+fl).drop_duplicates()
        print(df_pred_curr.shape)
        if k == 0:
            df_pred = df_pred_curr
        else:
            df_pred = df_pred.append(df_pred_curr)
                
        
    df_pred = df_pred.reset_index(drop=True)       


    df_pred["date"] = pd.to_datetime(df_pred["date"])


    # In[13]:


    PREDICTION_WINDOW = 14
    df_pred["target_date"] = (pd.to_datetime(df_pred["price_date"]) + timedelta(days=PREDICTION_WINDOW)).astype(str)

    df_equity["date_jn"] = df_equity["date"].astype(str)

    df_pos = df_pred.merge(df_equity[["date_jn","ticker_yahoo","close_adj"]] , left_on = ["target_date","ticker_yahoo"] , right_on = ["date_jn","ticker_yahoo"]  ,  how = "left").sort_values("date")
    df_pos.rename(columns = {"close_adj":"position_price"}, inplace = True)

    df_latest_price = df_equity.loc[df_equity["date"]==df_equity["date"].max(),["ticker_yahoo","close_adj"]].rename(columns = {"close_adj":"latest_price"})

    df_pos = df_pos.merge(df_latest_price[["ticker_yahoo","latest_price"]], left_on = ["ticker_yahoo"], right_on =["ticker_yahoo"] , how = "left")

    df_pos.loc[(df_pos["position_price"].isna()) & (~df_pos["latest_price"].isna()),"position_price"] = df_pos.loc[(df_pos["position_price"].isna()) & (~df_pos["latest_price"].isna()),"latest_price"]

    df_pos[["date","entry_price","target_date","position_price","latest_price"]]


    df_pos["returns"] = 100*(df_pos["position_price"]-df_pos["entry_price"])/df_pos["entry_price"]




    for k, top_n in enumerate([5,10]):
        df_c = getHistReturns(df_pos, TOP_N = top_n)
        if k == 0:
            df_all = df_c
        else:
            df_all = df_all.merge(df_c,on = ["model_id","date"] ,how = "outer").reset_index(drop=True)


    #df_all.groupby("model_id").mean().reset_index().sort_values("returns_10", ascending = False)



    # In[14]:


    df_aggregate_model_performance = df_all.groupby("model_id").mean().reset_index().sort_values("returns_10"ascending = False)


    # # Dump performance tables

    # In[15]:


    MODEL_PERF_DIR = "/app/TimeSeries/04_live/spy500/model_performance/"
    #df_all.to_csv(MODEL_PERF_DIR+"live_detailed_model_performance.csv", index = False)
    #df_pos.to_csv(MODEL_PERF_DIR+"raw_master_model.csv", index = False)


    # In[16]:


    df_pos.to_csv(MODEL_PERF_DIR+"raw_master_model.csv"index = False)


    # In[17]:


    upload_to_gcp(MODEL_PERF_DIR+'raw_master_model.csv''model_performance/'+'raw_master_model.csv''abolfadl-sp500-live'CRED_PATH = '04_live/spy500/utils/sheets_api.json')


    # In[18]:


    df_all.head()


    # In[19]:


    df_aggregate_model_performance.head()


    # In[20]:


    df_pos[[f for f in df_pos.columns if "pred" not in f]].head()


    # In[21]:


    df_pos_spy = df_pos[[f for f in df_pos.columns if "pred" not in f]].merge(df_spy[["date","adj_close"]], on = "date"how = "left").rename(columns = {"adj_close":"entry_price_spy"})


    # In[22]:


    df_pos_spy["target_date"] = pd.to_datetime(df_pos_spy["target_date"])


    # In[23]:


    df_pos_spy = df_pos_spy.merge(df_spy[["date","adj_close"]], left_on = "target_date"right_on = "date",how = "left").rename(columns = {"adj_close":"position_price_spy"})


    # In[24]:


    #df_pos_spy = df_pos_spy.drop(["date_x"], axis =1).rename(columns = {"date_y":"date"})


    # In[25]:


    latest_spy_price = df_spy.tail(1).loc[:,"adj_close"].values[0]


    # In[26]:


    df_pos_spy.loc[  df_pos_spy["position_price_spy"].isna() ,"position_price_spy"] = latest_spy_price


    # In[27]:


    df_pos_spy["returns_spy"] = 100*(df_pos_spy["position_price_spy"]-df_pos_spy["entry_price_spy"])/df_pos_spy["entry_price_spy"]


    # In[28]:


    df_spy_perf = df_pos_spy.groupby("price_date")["returns_spy"].first().reset_index()
    df_spy_perf["date"] = pd.to_datetime(df_spy_perf["price_date"])
    df_spy_perf.drop("price_date"axis =1inplace = True)
    df_spy_perf


    # In[29]:


    df_all_spy = df_all.merge(df_spy_perf, on = "date"how = "left").sort_values("date")


    # In[30]:


    df_all_spy["update_timestamp"] = datetime.now()
    df_all_spy.tail(4)


    # In[31]:


    df_all_spy.to_csv(MODEL_PERF_DIR+"live_historized_model_performance_spy.csv"index = False)


    # In[32]:


    upload_to_gcp(MODEL_PERF_DIR+'live_historized_model_performance_spy.csv''model_performance/'+'live_historized_model_performance_spy.csv''abolfadl-sp500-live'CRED_PATH = '04_live/spy500/utils/sheets_api.json')


    # In[33]:


    df_all_spy_agg = df_all_spy.groupby("model_id")[["returns_5","returns_10","returns_spy"]].mean().reset_index().sort_values("returns_10"ascending = False)
    df_all_spy_agg["update_timestamp"] = datetime.now()
    df_all_spy_agg.head(10)


    # In[34]:


    df_all_spy_agg.to_csv(MODEL_PERF_DIR+"live_historized_model_performance_spy_agg.csv"index = False)


    # In[35]:


    upload_to_gcp(MODEL_PERF_DIR+'live_historized_model_performance_spy_agg.csv''model_performance/'+'live_historized_model_performance_spy_agg.csv''abolfadl-sp500-live'CRED_PATH = '04_live/spy500/utils/sheets_api.json')


    # ---

    # In[36]:


    import sys
    print(sys.executable)
    print(sys.version)
    print(sys.version_info)


    # In[ ]:



@app.route('/')
def entry():
    run_update()    
    return 'Hey, we have Flask in a Docker container!'


if __name__ == '__main__':
    #app.run(debug=True, host='0.0.0.0')
    app.run(host ='0.0.0.0'port = 8080)

5. Add any supporting folders/files you need to pack. The project should look as follows
















6. Build the Image using gcloud command line
Source: https://www.youtube.com/watch?v=LxHiCZCKwa8&t=670s

gcloud builds submit --tag gcr.io/stock-288218/mabolfadl_spy500_report

This is the project ID which you can get form the home screen of the project
















This is the name of the container

7. Deploy the container

gcloud run deploy --image gcr.io/stock-288218/mabolfadl_spy500_report --memory 4G --cpu 2.0

Its important to choose the correct memory and cpu depending on the workload of the container

8. Enter some random inputs on the location etc, a link will be generated which when invoked wil run the container


9. The problem at this point is that when the URL is called you need to wait for long time for the container to run. This causes the browsers to timeout. Using google cloud times out

10 Solution: Create a small VM on scaleway that invokes the URL using a cron job. After creating the cron job. Add the following in the crontab

*/15 * * * 1-5 curl -m 300 https://mabolfadlspy500report-i6ibrrbliq-ue.a.run.app/

DONE!!!







Wednesday, August 4, 2021

Create gitlab repo from existing folder

 1. Make sure no git repo is there

cd project_folder

rm -rf .git


2. Initialize repository


git init


3. Add remote from gitlab. Get the URL by cloning the HTTPS link of the repo from gitlab


git remote add origin https://gitlab.com/mabolfadl/frx.git


4. Add files to staging


git add .


5 . COmmit it


git commit -m 'Initial commit'


6. Set origin to be upstream push


git push --set-upstream origin master


7. Push 


git push


8. Enter username and passwords


Loud fan of desktop

 Upon restart the fan of the desktop got loud again. I cleaned the desktop from the dust but it was still loud (Lower than the first sound) ...