AIModel.py 5.03 KB
import sys
from collections import deque
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn import preprocessing
from sklearn.model_selection import train_test_split
from tensorflow.keras.layers import LSTM,Dense,Dropout
from tensorflow.keras.models import Sequential
from tensorflow.python.keras.callbacks import EarlyStopping


# 학습 함수
# params : n_epochs     -   모든 데이터셋을 몇 번 학습시킬 것인지
#          batch_size   -   데이터셋을 batch_size로 나누어서 학습
#          verbose      -   실행 과정을 콘솔에 띄울 것인지 여부 (0:끔/1:움직이는 실시간 그래프/2:정적메세지)
def train(data,model,n_epochs=400,batch_size=64,verbose=0):
    # 더이상 성능이 증가하지 않을 경우 학습 중지
    # monitor : 'val_loss' - validation_set's loss monitoring
    # patience : 성능이 증가하지 않는 epoch를 몇번 허용할 것인가
    early_stopping=EarlyStopping(monitor='val_loss',patience=50)
    # 데이터 학습
    history=model.fit(data["X_train"],data['y_train'],
                      batch_size=batch_size,
                      epochs=n_epochs,
                      validation_data=(data["X_test"],data["y_test"]),
                      callbacks=[early_stopping],
                      verbose=verbose)
    return history


# 에러 평가 함수
def evaluate(data,model):
    # correct : 정답률 / loss : 예측값과 실제값이 차이나는 정도를 나타내는 지표
    correct,loss=model.evaluate(data["X_test"],data["y_test"],verbose=0)
    mean_loss=data["column_scaler"]['close'].inverse_transform([[loss]])[0][0]
    return mean_loss


# 예측 주가 계산 함수
def predict(data,model,n_steps=100):
    last_sequence=data["last_sequence"][-n_steps:]
    column_scaler=data["column_scaler"]
    last_sequence=last_sequence.reshape(last_sequence.shape[1],last_sequence.shape[0])
    last_sequence=np.expand_dims(last_sequence,axis=0)
    prediction=model.predict(last_sequence)
    predicted_price=column_scaler['close'].inverse_transform(prediction)[0][0]
    return predicted_price

def load_data(df,n_steps=100,lookup_step=1,test_size=0.2,shuffle=True):
    result={}               # return 할 값들을 저장하는 변수
    column_scaler={}        # 각 행별 정규화된 값을 저장하는 변수

    # data 값을 0과 1사이 값으로 정규화
    for column in df.columns:
        scaler=preprocessing.MinMaxScaler()
        df[column]=scaler.fit_transform(np.expand_dims(df[column].to_numpy(),axis=1))
        column_scaler[column]=scaler

    result["column_scaler"]=column_scaler
    last_sequence=np.array(df.tail(lookup_step))
    df['future']=df['close'].shift(-lookup_step)
    # 결측치 삭제
    df.dropna(inplace=True)

    sequence_data=[]
    sequences=deque(maxlen=n_steps)
    for entry,target in zip(df.loc[:,df.columns!='future'].to_numpy(),df['future'].to_numpy()):
        sequences.append(entry)
        if len(sequences)==n_steps:
            sequence_data.append([np.array(sequences),target])

    if not sequence_data:
        pass

    last_sequence=list(sequences)+list(last_sequence)
    last_sequence=np.array(pd.DataFrame(last_sequence).shift(-1).dropna())
    result['last_sequence']=last_sequence
    X,y=[],[]

    for seq,target in sequence_data:
        X.append(seq)
        y.append(target)

    X=np.array(X)
    y=np.array(y)
    X=X.reshape((X.shape[0],X.shape[2],X.shape[1]))

    # dataset을 train/test용으로 분리
    result["X_train"],result["X_test"],result["y_train"],result["y_test"]=train_test_split(X,y,test_size=test_size,shuffle=shuffle)

    return result


# 모델 생성함수
# optimizer : 훈련과정 설정. 일반적으로 사용하는 adam 사용
# loss      : 최적화 과정에서 최소화될 손실함수 설정. 일반적으로 사용되며 데이터가 연속된 예측값이므로 mean_squared_error 사용
# cell      : 시계열 데이터이므로 LSTM
def create_model(units=50,dropout=0.3,n_steps=100,loss="mse",optimizer="adam",n_layers=4,cell=LSTM):
    model=Sequential()
    for i in range(n_layers):
        if i==0:
            model.add(cell(units,return_sequences=True,input_shape=(None,n_steps)))
        elif i==n_layers-1:
            model.add(cell(units))
        else:
            model.add(cell(units,return_sequences=True))

        # 매 layer마다 과적합을 방지하기 위해 dropout
        model.add(Dropout(dropout))
    # dense : 예측하고자 하는 target이 1개
    model.add(Dense(1))
    model.compile(loss=loss,metrics=[loss],optimizer=optimizer)

    return model

# 그래프 출력
def plot_graph(model,data):
    y_test=data["y_test"]
    X_test=data["X_test"]
    y_pred=model.predict(X_test)
    y_test=np.squeeze(data["column_scaler"]["close"].inverse__transform(np.expand_dims(y_test,axis=0)))
    y_pred=np.squeeze(data["column_scaler"]['close'].inverse_transform[y_pred])
    plt.plot(y_test[-200:],c="b")
    plt.plot(y_pred[-200:],c='r')
    plt.xlabel("Days")
    plt.ylabel("Price")
    plt.legend(["Actual_price","Predicted Price"])
    plt.show()