ai_trader.py 6.27 KB
import datetime
import sys
import numpy as np
import pandas as pd
import pymysql
from sqlalchemy.event import listen
from sqlalchemy.pool import Pool
from sqlalchemy.exc import InternalError,ProgrammingError
from tensorflow.keras.callbacks import EarlyStopping

from library.AIModel import load_data,create_model,evaluate,predict
from library import cf
from library.open_api import setup_sql_mod

listen(Pool,'connect',setup_sql_mod)
listen(Pool,'first_connect',setup_sql_mod)


def filter_by_ai(db_name,simul_num):
    from library.simulator_api import simulator_api
    sf=simulator_api(simul_num,'real',db_name)
    try:
        ai_filter(sf.ai_num,sf.engine_simul)
    except AttributeError:
        sys.exit(1)


def lstm_algorithm(dataset,ai_setting):
    shuffled_data=load_data(df=dataset.copy(),n_steps=ai_setting['n_steps'],test_size=ai_setting['test_size'])
    model=create_model(n_steps=ai_setting['n_steps'],loss=ai_setting['loss'],units=ai_setting['units'],
                       n_layers=ai_setting['n_layers'],dropout=ai_setting['dropout'])
    early_stopping=EarlyStopping(monitor='val_loss',patience=50)
    model.fit(shuffled_data["X_train"],shuffled_data['y_train'],
              batch_size=ai_setting['batch_size'],
              epochs=ai_setting['epochs'],
              validation_data=(shuffled_data['X_test'],shuffled_data['y_test']),
              callbacks=[early_stopping],
              verbose=1)
    scaled_data=load_data(df=dataset.copy(),n_steps=ai_setting['n_steps'],test_size=ai_setting['test_size'],
                          shuffle=False)
    mse=evaluate(scaled_data,model)

    future_price=predict(scaled_data,model,n_steps=ai_setting['n_steps'])
    predicted_y=model.predict(scaled_data['X_test'])
    real_y=np.squeeze(scaled_data['column_scaler']['close'].inverse_transform(predicted_y))

    if ai_setting['is_used_predicted_close']:
        close=real_y[-1]
    else:
        close=dataset.iloc[-1]['close']

    ratio=(future_price-close)/close*100

    msg = f"After {ai_setting['lookup_step']}: {int(close)} -> {int(future_price)}"

    if ratio > 0:       # lookup_step(분, 일) 후 상승 예상일 경우 출력 메시지
        msg += f'    {ratio:.2f}% ⯅ '
    elif ratio < 0:     # lookup_step(분, 일) 후 하락 예상일 경우 출력 메시지
        msg += f'    {ratio:.2f}% ⯆ '
    print(msg, end=' ')

    return ai_setting['ratio_cut']>=ratio


def create_training_engine(db_name):
    return pymysql.connect(
        host=cf.db_ip,
        port=int(cf.db_port),
        user=cf.db_id,
        password=cf.db_pw,
        db=db_name,
        charset='utf8mb4',
        cursorclass=pymysql.cursors.DictCursor
    )


def ai_filter(ai_filter_num,engine,until=datetime.datetime.today()):
    if ai_filter_num == 1:
        ai_setting = {
            "n_steps": 100,                         # 시퀀스 데이터를 몇개씩 담을지 설정
            "lookup_step": 1,                       # 단위 : 몇 일(분) 뒤의 종가를 예측 할 것 인지
            "test_size": 0.2,                       # train 범위
            "n_layers": 4,                          # LSTM layer 개수
            "units": 50,                            # LSTM neurons 개수
            "dropout": 0.2,                         # overfitting 방지를 위한 dropout
            "loss": "mse",                          # loss : 최적화 과정에서 최소화될 손실 함수
            "optimizer": "adam",                    # optimizer : 최적화 알고리즘 선택
            "batch_size": 64,                       # 각 학습 반복에 사용할 데이터 샘플 수
            "epochs": 400,                          # 몇 번 테스트 할지
            "ratio_cut": 2,                         # 단위:(%) lookup_step 기간 뒤 ratio_cut(%) 만큼 증가 할 것이 예측 된다면 매수
            "table": "daily_craw",                  # 분석 시 daily_craw(일별데이터)를 이용 할지 min_craw(분별데이터)를 이용 할지
            "is_used_predicted_close" : True        # ratio(예상 상승률) 계산 시 예측 그래프의 close 값을 이용 할 경우 True,
                                                    # 실제 close 값을 이용할 시 False
        }
        tr_engine = create_training_engine(ai_setting['table'])

        buy_list=None

        try:
            query="SELECT DISTINCT code_name FROM realtime_daily_buy_list"
            buy_list=engine.execute(query).fetchall()
        except (InternalError,ProgrammingError) as err:
            if 'Table' in str(err):
                print("realtime_daily_buy_list 테이블이 존재하지 않습니다.")
            else:
                print("autobot 데이터베이스가 존재하지 않습니다.")
            exit(1)

        feature_columns = ["close", "volume", "open", "high", "low"]
        filtered_list = []
        for code_name in buy_list:
            code_name=code_name[0]
            sql = """
                SELECT {} FROM `{}`
                WHERE STR_TO_DATE(date, '%Y%m%d%H%i') <= '{}'
            """.format(','.join(feature_columns), code_name, until)
            df = pd.read_sql(sql, tr_engine)

            # 데이터가 1000개(1000일 or 1000분)가 넘지 않으면 예측도가 떨어지기 때문에 필터링
            if len(df) < 1000:
                filtered_list.append(code_name)
                print(f"테스트 데이터가 적어서 realtime_daily_buy_list 에서 제외")
                continue
            try:
                filtered = lstm_algorithm(df, ai_setting)
            except ValueError:
                print(f"테스트 데이터가 적어서 realtime_daily_buy_list 에서 제외")
                filtered_list.append(code_name)
                continue

            print(code_name)

            # filtered가 True 이면 filtered_list(필터링 종목)에 해당 종목을 append
            if filtered:
                print(f"기준에 부합하지 않으므로 realtime_daily_buy_list 에서 제외")
                filtered_list.append(code_name)

        # filtered_list에 있는 종목들을 realtime_daily_buy_list(매수리스트)에서 제거
        if len(filtered_list) > 0:
            engine.execute(f"""
                    DELETE FROM realtime_daily_buy_list WHERE code_name in ({','.join(map('"{}"'.format, filtered_list))})
                """)