박권수

fix. Mqtt unnecessary call remove

......@@ -2,12 +2,10 @@
const Bottle = require('../../models/bottle');
const Hub = require('../../models/hub');
const Medicine = require('../../models/medicine');
const DataProcess = require('../../lib/DataProcess');
const Mqtt = require('../../lib/MqttModule');
exports.bottleConnect = async(ctx) => {
const { bottleId, hubId } = ctx.request.body;
const topic = 'bottle/' + bottleId + '/bts';
const newBottle = new Bottle({
bottleId,
......@@ -32,13 +30,9 @@ exports.bottleConnect = async(ctx) => {
return;
}
const client = await Mqtt.mqttOn({
host : hosting.host,
port : hosting.port,
clientId : hosting.clientId
});
Mqtt.mqttSubscribe(client, topic, DataProcess.dataPublish);
const client = await Mqtt.mqttOn(hosting);
const topic = 'bottle/' + bottleId + '/bts';
Mqtt.mqttSubscribe(client, topic);
await newBottle.save();
......@@ -46,7 +40,25 @@ exports.bottleConnect = async(ctx) => {
};
exports.bottleDisconnect = async(ctx) => {
const { bottleId } = ctx.params;
const { bottleId } = ctx.params;
const bottle = await Bottle.findByBottleId(bottleId);
if(!bottle) {
ctx.status = 404;
return;
}
const hub = await Hub.findByHubId(bottle.getHubId());
const hosting = await hub.getHubHost();
const client = await Mqtt.mqttOn(hosting);
const topic = 'bottle/' + bottleId + '/bts';
Mqtt.mqttUnsubscribe(client, topic);
await Bottle.deleteOne({ bottleId });
ctx.status = 200;
};
exports.lookupInfo = async(ctx) => {
......
//허브(Mqtt Broker)등록 및 삭제
const Hub = require('../../models/hub');
const Mqtt = require('../../lib/MqttModule');
const DataProcess = require('../../lib/DataProcess');
exports.hubConnect = async (ctx) => {
const { hubId, host, port } = ctx.request.body;
const isExistHub = await Hub.findByHubId(hubId);
if(isExistHub) {
ctx.status = 409;
return;
}
const hosting = {
host,
port
};
Mqtt.mqttOn(hosting);
await Hub.findOneAndUpdate({
hubId
}, { hosting }, {
upsert : true
Mqtt.mqttOn(hosting, DataProcess.dataPublish);
const hub = new Hub({
hubId,
hosting
});
await hub.save();
ctx.status = 200;
}
ctx.body = hub;
};
exports.hubDisconnect = async(ctx) => {
const { hubId } = ctx.params;
......@@ -35,4 +45,4 @@ exports.hubDisconnect = async(ctx) => {
await Hub.deleteOne({ hubId });
ctx.status = 200;
}
\ No newline at end of file
};
\ No newline at end of file
......
const mqtt = require('mqtt');
const clientList = [];
exports.mqttOn = async (hosting) => {
exports.mqttOn = async (hosting, func) => {
const filterIndex = clientList.findIndex(client => {
return (client.options.clientId === hosting.clientId
&& client.options.host === hosting.host
......@@ -11,26 +11,37 @@ exports.mqttOn = async (hosting) => {
if(filterIndex === -1) {
const client = mqtt.connect(hosting);
clientList.push(client);
client.on('connect', () => {
console.log('Client connected: ', client.connected);
});
client.on('message', async (topic, message, packet) => {
const result = await func(topic, message.toString());
this.mqttPublishMessage(client, result);
console.log('\x1b[1;32msubscribe : topic', topic, 'message : ', message.toString(), '\x1b[0m');
});
return client;
} else {
return clientList[filterIndex];
};
}
return clientList[filterIndex];
};
exports.mqttSubscribe = (client, topic, func) => {
exports.mqttSubscribe = (client, topic) => {
client.subscribe(topic);
client.on('message', async (topic, message, packet) => {
const result = await func(topic, message.toString());
this.mqttPublishMessage(client, result);
});
};
exports.mqttPublishMessage = (client, { topic, message }) => {
client.publish(topic, message, () => {});
client.publish(topic, message, () => {
console.log('\x1b[1;33mpublish : topic', topic, 'message : ', message, '\x1b[0m');
});
};
exports.mqttUnsubscribe = (client, topic) => {
client.unsubscribe(topic, () => {
console.log('unsubscribe', topic);
});
};
exports.mqttOff = (hosting) => {
......