DataProvider.py
3.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
from QuantDB import QuantDB
from typing import List
class DataProvider:
DB = None
def __init__(self):
self.Name = "Default"
self.DB = QuantDB()
def __repr__(self):
return self.Name
def initialize(self):
return self.DB.connect()
def close(self):
if( self.DB ):
self.DB.close()
self.DB = None
def findFS(self, columns: List[str], year: int, quarter: int, div: str): # a: str, b: str, times: int
if( self.DB ):
querystring = 'SELECT '
for target in columns:
querystring = querystring + '"' + target + '",'
querystring = querystring[:-1]
querystring = querystring + 'from "FinancialStatements" ' + 'WHERE ' + '"Year"=' + str(year) + ' and "Quarter"=' + str(quarter) + ' and "Div"=' + "'" + div + "'"
return self.DB.searchData(querystring, columns)
def findAllFS(self, columns: List[str], year: int, quarter: int, div: str):
if( self.DB ):
querystring = 'SELECT '
for target in columns:
querystring = querystring + '"' + target + '",'
querystring = querystring[:-1]
querystring = querystring + 'from "FinancialStatements" ' + 'WHERE ' + '"Year"=' + str(year) + ' and "Quarter"=' + str(quarter) + ' and "Div"=' + "'" + div + "'"
return self.DB.searchData(querystring, columns)
def findOneFS(self, columns: List[str], stockcode: str, year: int, quarter: int, div: str):
if( self.DB ):
querystring = 'SELECT '
for target in columns:
querystring = querystring + '"' + target + '",'
querystring = querystring[:-1]
querystring = querystring + 'from "FinancialStatements" ' + 'WHERE ' + '"id"=' + "'" + stockcode + str(year) + str(quarter) + div + "'"
return self.DB.searchData(querystring, columns)
def findSPbyID(self, columns: List[str], stockcode: str, stockdate: str):
if( self.DB ):
querystring = 'SELECT '
for target in columns:
querystring = querystring + '"' + target + '",'
querystring = querystring[:-1]
querystring = querystring + 'from "StockPrice" ' + 'WHERE ' + '"id"=' + "'" + stockcode + stockdate + "'"
return self.DB.searchData(querystring, columns)
def findSP(self, columns: List[str], stockcode: str, stock_startdate: str, stock_enddate: str):
if( self.DB ):
querystring = 'SELECT '
for target in columns:
querystring = querystring + '"' + target + '",'
querystring = querystring[:-1]
querystring = querystring + 'from "StockPrice" ' + 'WHERE ' + '"StockCode"=' + "'" + stockcode + "'" ' and "StockDate" >= ' + "'" + stock_startdate + "'" + ' and "StockDate" <= ' + "'" + stock_enddate + "' order by id asc"
return self.DB.searchData(querystring, columns)
def findCompany(self, columns: List[str], stockcode: str):
if( self.DB ):
querystring = 'SELECT '
for target in columns:
querystring = querystring + '"' + target + '",'
querystring = querystring[:-1]
querystring = querystring + 'from "Company" ' + 'WHERE ' + '"StockCode"=' + "'" + stockcode + "'"
return self.DB.searchData(querystring, columns)
def findStockName(self, stockcode: str):
if( self.DB ):
df_comapanys = self.findCompany(["StockName"], stockcode)
if df_comapanys is None :
return ''
else :
return df_comapanys["StockName"][0]