open_api.py 9.58 KB
from PyQt5.QtWidgets import *
from PyQt5.QAxContainer import *
from PyQt5.QtCore import *
from PyQt5.QtTest import *
from pandas import DataFrame
import sys
import errCode

TR_REQ_TIME_INTERVAL = 3600

class OpenApi(QAxWidget):
    def __init__(self):
        super().__init__()

        # event_loop list
        self.tr_event_loop=QEventLoop()
        self.login_event_loop=QEventLoop()

        # variable list
        self.account_no=None            # 계좌번호
        self.ohlcv = None               # 거래정보(시가,종가,고가,저가,거래량)
        self.remained_data=None         # 다음페이지 존재여부

        # screen number list
        self.screen_data_req="0101"     # 거래 데이터

        self.create_instance()
        self._signal_slots()
        self.comm_connect()             # 로그인
        self.get_account_info()         # 계좌정보 가져오기

    # 주식일봉차트조회요청
    def _opt10081(self,sRQName,sTrCode):
        QTest.qWait(TR_REQ_TIME_INTERVAL)

        cnt=self._get_repeat_cnt(sTrCode,sRQName)

        for i in range(cnt):
            date=self._get_comm_data(sTrCode,sRQName,i,"일자")
            open=self._get_comm_data(sTrCode,sRQName,i,"시가")
            high=self._get_comm_data(sTrCode,sRQName,i,"고가")
            low=self._get_comm_data(sTrCode,sRQName,i,"저가")
            close=self._get_comm_data(sTrCode,sRQName,i,"현재가")
            volume=self._get_comm_data(sTrCode,sRQName,i,"거래량")

            self.ohlcv['date'].append(date)
            self.ohlcv['open'].append(int(open))
            self.ohlcv['high'].append(int(high))
            self.ohlcv['low'].append(int(low))
            self.ohlcv['close'].append(int(close))
            self.ohlcv['volumn'].append(int(volume))

    # TR 요청을 처리하는 slot
    # param sScrNo: 스크린번호
    #       sRQName: 요청했을 때 지은 이름
    #       sTrCode: 요청 id, tr코드
    #       sRecordName: 레코드 이름
    #       sPrevNext: 다음 페이지가 있는지 여부. "2" : 다음페이지 존재, "0" or "" : 다음페이지 없음
    def _receive_tr_data(self,sScrNo,sRQName,sTrCode,sRecordName,sPrevNext):
        if sPrevNext=='2':
            self.remained_data=True
        else:
            self.remained_data=False

        if sRQName=="opt10081":
            print("==============주식일봉차트조회요청================")
            self._opt10081(sRQName,sTrCode)
        elif sRQName=="opw0001_req":
            return
        elif sRQName=="opw00018_req":
            return
        elif sRQName=="opt10074_req":
            return
        elif sRQName=="opw00015_req":
            return
        elif sRQName=="opt10076_req":
            return
        elif sRQName=="opt10073_req":
            return

        try:
            self.tr_event_loop.exit()
        except AttributeError:
            pass

    # 특정 종목의 일자별 거래 데이터 조회 함수
    # param  : code      -   종목코드
    #          start     -   기준일자
    # return : df        -   특정종목의 일자별 거래 데이터 목록
    def get_total_data(self,code,start):
        self.ohlcv = {'date': [], 'open': [], 'high': [], 'low': [], 'close': [], 'volume': []}

        self._set_input_value("종목코드",code)
        self._set_input_value("기준일자",start)
        self._set_input_value("수정주가구분",1)
        self._comm_rq_data("opt10081_req","opt10081",0,self.screen_data_req)

        while self.remained_data:
            self._set_input_value("종목코드",code)
            self._set_input_value("기준일자",start)
            self._set_input_value("수정주가구분",1)
            self._comm_rq_data("opt10081_req","opt10081",2,self.screen_data_req)

        # 데이터가 없거나, date=''일 경우 empty list를 반환
        if len(self.ohlcv)==0:
            return []
        if self.ohlcv['date']=='':
            return []

        # 데이터를 DataFrame형태로 저장 및 반환
        df=DataFrame(self.ohlcv,columns=['open','high','low','close','volume'],index=self.ohlcv['date'])

        return df

    # 특정 종목의 특정 날짜의 시가,종가,고가,저가,거래량 중 특정한 데이터를 반환하는 함수
    # param     :   code    -   종목코드
    #           :   date    -   조회날짜
    #           :   option  -   open(시가)/close(종가)/high(고가)/low(저가)/volume(거래량)
    # return    :   조회한 데이터에 해당하는 값
    #               값이 없거나, option이 오류일 경우 return False
    def get_oneday_option_data(self,code,date,option):
        self.ohlcv = {'date': [], 'open': [], 'high': [], 'low': [], 'close': [], 'volume': []}

        self._set_input_value("종목코드",code)
        self._set_input_value("기준일자",date)
        self._set_input_value("수정주가구분",1)
        self._comm_rq_data("opt10081_req","opt10081",0,self.screen_data_req)

        if self.ohlcv['date']=="":
            return False

        df = DataFrame(self.ohlcv, columns=['open', 'high', 'low', 'close', 'volume'], index=self.ohlcv['date'])
        if option == 'open':
            return df.iloc[0, 0]
        elif option == 'high':
            return df.iloc[0, 1]
        elif option == 'low':
            return df.iloc[0, 2]
        elif option == 'close':
            return df.iloc[0, 3]
        elif option == 'volume':
            return df.iloc[0, 4]
        else:
            return False

    # 사용자의 계좌정보 저장 및 출력
    def get_account_info(self):
        account=self.get_login_info("ACCNO")
        self.account_no=account.split(";")[0]
        print("======== 계좌번호 :   ",self.account_no,"========")

    # 원하는 사용자 정보 반환
    # param     :   tag - ACCNO - 보유계좌리스트
    #                     ACCOUNT_CNT - 보유계좌 수
    #                     USER_ID - 사용자 ID
    #                     USER_NAME - 사용자 이름
    #                     KEY_BSECGB - 키보드 보안 해제 여부 (0 : 정상, 1: 해지)
    #                     FIREW_SECGB - 방화벽 설정여부 (0 : 미설정, 1: 설정, 2 : 해지)
    #                     GetServerGubun - 접속서버 구분 (1 : 모의투자, 나머지 : 실서버)
    # return    :   tag를 통해 요청한 정보(ret) 반환
    def get_login_info(self,tag):
        try:
            ret=self.dynamicCall("GetLoginInfo(QString)",tag)
            return ret
        except Exception as e:
            print(e)
            sys.exit()

    # 키움 api를 사용하기 위한 ocx controller 저장
    def create_instance(self):
        try:
            self.setControl("KHOPENAPI.KHOpenAPICtrl.1")
        except Exception as e:
            print(e)
            sys.exit()

    # event처리를 위한 slot
    def _signal_slots(self):
        try:
            self.OnEventConnect.connect(self._login_slot)
            self.OnReceiveTrData.connect(self._receive_tr_data)
        except Exception as e:
            print(e)
            sys.exit()

    # 수동 로그인설정인 경우 로그인창을 출력해서 로그인을 시도
    # 자동로그인 설정인 경우 로그인창 출력없이 로그인을 시도
    def comm_connect(self):
        try:
            self.dynamicCall("CommConnect()")
            self.login_event_loop.exec_()
        except Exception as e:
            print(e)
            sys.exit()

    # 로그인 이벤트 처리 slot
    # param : code - 로그인 성공 시 0
    #                      실패 시 에러코드 출력
    def _login_slot(self,code):
        try:
            result=errCode.errors(code)
            if code==0:
                print("Connected",result[1])
            else:
                print("Failed to connect",result[1])
            self.login_event_loop.exit()
        except Exception as e:
            print(e)
            sys.exit()

    # 조회요청시 TR의 Input값을 지정하는 함수
    # param : sId       -   TR에 명시된 Input이름
    #         svalue    -   Input이름으로 지정한 값
    def _set_input_value(self,sId,sValue):
        try:
            self.dynamicCall("SetInputValue(QString, QString)", sId, sValue)
        except Exception as e:
            print(e)
            sys.exit()

    # 조회요청함수
    # param : sRQName       -   사용자 구분명
    #         sTrCode       -   조회하려는 TR이름
    #         nPrevNext     -   연속조회여부
    #         sScreenNo     -   화면번호
    def _comm_rq_data(self,sRQName,sTrData,nPrevNext,sScrNo):
        self.dynamicCall("CommRqData(QString, QString, int, QString", sRQName, sTrData, nPrevNext, sScrNo)
        self.tr_event_loop.exec_()

    # OnReceiveTRData()이벤트가 호출될때 조회데이터를 얻어오는 함수
    # param : sTrCode       -   TR 이름
    #         sRecordName   -   레코드이름
    #         nIndex        -   TR반복부
    #         sItemName     -   TR에서 얻어오려는 출력항목이름
    def _get_comm_data(self,sTrCode,sRecordName,nIndex,sItemName):
        ret = self.dynamicCall("GetCommData(QString, QString, int, QString", sTrCode, sRecordName, nIndex, sItemName)
        return ret.strip()

    # 조회수신한 멀티데이터의 갯수(반복)을 얻는다
    # param : sTrCode       -   tr이름
    #         sRecordName   -   레코드 이름
    def _get_repeat_cnt(self,sTrCode,sRecordName):
        try:
            ret=self.dynamicCall("GetRepeatCnt(QString, QString)",sTrCode,sRecordName)
            return ret
        except Exception as e:
            print(e)
            sys.exit()

if __name__ == "__main__":
    app = QApplication(sys.argv)
    OpenApi()