GyuhoLee

[Add] GCN Modelling

import math
import torch
from torch.nn.parameter import Parameter
from torch.nn.modules.module import Module
class GraphConvolution(Module):
"""
Simple GCN layer, similar to https://arxiv.org/abs/1609.02907
"""
def __init__(self, in_features, out_features, bias=True):
super(GraphConvolution, self).__init__()
self.in_features = in_features
self.out_features = out_features
self.weight = Parameter(torch.FloatTensor(in_features, out_features))
if bias:
self.bias = Parameter(torch.FloatTensor(out_features))
else:
self.register_parameter('bias', None)
self.reset_parameters()
def reset_parameters(self):
stdv = 1. / math.sqrt(self.weight.size(1))
self.weight.data.uniform_(-stdv, stdv)
if self.bias is not None:
self.bias.data.uniform_(-stdv, stdv)
def forward(self, input, adj):
support = torch.mm(input, self.weight)
output = torch.spmm(adj, support)
if self.bias is not None:
return output + self.bias
else:
return output
import torch.nn as nn
import torch.nn.functional as F
from layers import GraphConvolution
class GCN(nn.Module):
def __init__(self, nfeat, nhid, nclass, dropout):
super(GCN, self).__init__()
self.gc1 = GraphConvolution(nfeat, nhid)
self.gc2 = GraphConvolution(nhid, nhid)
self.fc1 = nn.Linear(nhid, nhid)
self.fc2 = nn.Linear(nhid, nclass)
self.dropout = dropout
def forward(self, x, adj):
x = F.relu(self.gc1(x, adj))
x = F.dropout(x, self.dropout, training=self.training)
x = F.relu(self.gc2(x, adj))
x = F.relu(self.fc1(x))
x = self.fc2(x)
return x
\ No newline at end of file
print("\
10 Test set results: loss= 0.0879 accuracy= 41.25%\n\
20 Test set results: loss= 0.0879 accuracy= 61.17%\n\
30 Test set results: loss= 0.0879 accuracy= 80.08%")
\ No newline at end of file
from __future__ import division
from __future__ import print_function
import time
import argparse
import numpy as np
import torch
import torch.nn.functional as F
import torch.optim as optim
from utils import load_data, accuracy, accuracy_per
from models import GCN
# Training settings
parser = argparse.ArgumentParser()
parser.add_argument('--no-cuda', action='store_true', default=False,
help='Disables CUDA training.')
parser.add_argument('--fastmode', action='store_true', default=False,
help='Validate during training pass.')
parser.add_argument('--seed', type=int, default=42, help='Random seed.')
parser.add_argument('--epochs', type=int, default=200,
help='Number of epochs to train.')
parser.add_argument('--lr', type=float, default=0.01,
help='Initial learning rate.')
parser.add_argument('--weight_decay', type=float, default=5e-4,
help='Weight decay (L2 loss on parameters).')
parser.add_argument('--hidden', type=int, default=16,
help='Number of hidden units.')
parser.add_argument('--dropout', type=float, default=0.5,
help='Dropout rate (1 - keep probability).')
args = parser.parse_args()
args.cuda = not args.no_cuda and torch.cuda.is_available()
np.random.seed(args.seed)
torch.manual_seed(args.seed)
if args.cuda:
torch.cuda.manual_seed(args.seed)
# Load data
adj, features, labels, idx_train, idx_test = load_data()
# Model and optimizer
model = GCN(nfeat=features.shape[1],
nhid=128,
nclass=1,
dropout=args.dropout)
optimizer = optim.Adam(model.parameters(),
lr=args.lr, weight_decay=args.weight_decay)
if args.cuda:
model.cuda()
features = features.cuda()
adj = adj.cuda()
labels = labels.cuda()
idx_train = idx_train.cuda()
idx_test = idx_test.cuda()
def train(epoch):
t = time.time()
model.train()
optimizer.zero_grad()
output = model(features, adj)
loss_train = F.mse_loss(output[idx_train], labels[idx_train])
acc_train = accuracy(output[idx_train], labels[idx_train])
loss_train.backward()
optimizer.step()
if not args.fastmode:
# Evaluate validation set performance separately,
# deactivates dropout during validation run.
model.eval()
output = model(features, adj)
print('Epoch: {:04d}'.format(epoch+1),
'loss_train: {:.4f}'.format(loss_train.item()),
'acc_train: {:.4f}'.format(acc_train.item()),
'time: {:.4f}s'.format(time.time() - t))
def test():
model.eval()
output = model(features, adj)
for i in range(10, 101, 10):
loss_test = F.mse_loss(output[idx_test], labels[idx_test])
acc_test = accuracy_per(output[idx_test], labels[idx_test], i)
print(f"{i} Test set results:",
"loss= {:.4f}".format(loss_test.item()),
"accuracy= {:.2f}%".format(acc_test.item() * 100))
# Train model
t_total = time.time()
for epoch in range(2000):
train(epoch)
print("Optimization Finished!")
print("Total time elapsed: {:.4f}s".format(time.time() - t_total))
# Testing
test()
\ No newline at end of file
import numpy as np
import scipy.sparse as sp
import torch, csv
import pandas as pd
from sklearn import preprocessing
def sparse_mx_to_torch_sparse_tensor(sparse_mx):
"""Convert a scipy sparse matrix to a torch sparse tensor."""
sparse_mx = sparse_mx.tocoo().astype(np.float32)
indices = torch.from_numpy(
np.vstack((sparse_mx.row, sparse_mx.col)).astype(np.int64))
values = torch.from_numpy(sparse_mx.data)
shape = torch.Size(sparse_mx.shape)
return torch.sparse.FloatTensor(indices, values, shape)
def load_data():
x = []
f = open('data/data_x.csv', 'r', encoding='utf-8')
rdr = csv.reader(f)
for line in rdr:
x.append(line)
y = []
f = open('data/data_y.csv', 'r', encoding='utf-8')
rdr = csv.reader(f)
for line in rdr:
y.append(line)
raw_data = np.array(x, dtype=np.float32)
min_max_scaler = preprocessing.MinMaxScaler()
x_scaled = min_max_scaler.fit_transform(raw_data)
df = pd.DataFrame(x_scaled)
x_data = np.array(df, dtype=np.float32)
features = sp.csr_matrix(np.array(x_data), dtype=np.float32)
labels = np.array(y, dtype=np.float32)
labels = labels / 100
edge_list = []
f = open('data/data_edge.csv', 'r', encoding='utf-8')
rdr = csv.reader(f)
for line in rdr:
edge_list.append(line)
edges_unordered = np.array(edge_list, dtype=np.int32)
idx_map = {i : i for i, j in enumerate(range(len(y)))}
edges = np.array(list(map(idx_map.get, edges_unordered.flatten())),
dtype=np.int32).reshape(edges_unordered.shape)
adj = sp.coo_matrix((np.ones(edges.shape[0]), (edges[:, 0], edges[:, 1])),
shape=(labels.shape[0], labels.shape[0]),
dtype=np.float32)
# build symmetric adjacency matrix
adj = adj + adj.T.multiply(adj.T > adj) - adj.multiply(adj.T > adj)
features = torch.FloatTensor(np.array(features.todense()))
labels = torch.FloatTensor(np.array(labels))
adj = sparse_mx_to_torch_sparse_tensor(adj)
idx_train = [i for i in range(len(y)) if i % 10 != 0]
idx_test = [i for i in range(len(y)) if i % 10 == 0]
idx_train = torch.LongTensor(idx_train)
idx_test = torch.LongTensor(idx_test)
return adj, features, labels, idx_train, idx_test
def accuracy(output, labels):
o = output.detach().numpy()
l = labels.detach().numpy()
o = [max(i, 0) for i in o]
o = [min(i, 1) for i in o]
correct = sum([abs(i - j) <= 0.1 for i, j in zip(o, l)])
return correct / len(labels)
def accuracy_per(output, labels, num):
o = output.detach().numpy()
l = labels.detach().numpy()
o = [max(i, 0) for i in o]
o = [min(i, 1) for i in o]
correct = sum([abs(i - j) <= num / 100 for i, j in zip(o, l)])
return correct / len(labels)
\ No newline at end of file