Skip to content

Commit 8c99253

Browse files
authored
Use Shared MQTT Connector (#746)
* add MqttTopicConnector to shared MqttConnector between workers * Fix issue while switching between REPLAY & RT, the callback functions were not set back * Fix issue depending on new promise architecture * Add new SweApiDataSource updater to update min/max of a dataSource * Fix issue declaring Handler into dataSource worker, after switching the mode, the same handler was use (a map was caching the object). The handler is now automatically re-init after 'init' function * Refactor DataSourceWorker to use internal object * recreate the handler object into DataSourceWorker after switching to REPLAY or RT mode * add MISB live demo * pass mode into DataSource worker to differentiate kind of worker (because dataSource is still the same in RT or REPLAY) * rename misb-live to misb and remove old demo
1 parent 5a2b7ce commit 8c99253

22 files changed

+590
-229
lines changed

demos/misb/src/App.vue

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,15 @@ export default {
7171
beforeMount() {
7272
const START_TIME = '2012-06-29T14:32:34.099333251Z';
7373
const END_TIME = '2012-06-29T14:36:54.033333251Z';
74-
const MODE = Mode.REPLAY;
74+
const MODE = Mode.REAL_TIME;
7575
//
7676
7777
// const START_TIME = 'now';
7878
// const END_TIME = '2055-01-01'
7979
// const MODE = Mode.REAL_TIME;
8080
8181
const MIN_TIME = '2012-06-29T14:32:34.099333251Z';
82-
const MAX_TIME = '2012-06-29T14:36:54.033333251Z';
82+
const MAX_TIME = '2055-06-29T14:36:54.033333251Z';
8383
const tls = true;
8484
8585
const dsReplaySpeed = 2.6;
@@ -96,15 +96,15 @@ export default {
9696
endTime: END_TIME,
9797
minTime: MIN_TIME,
9898
maxTime: MAX_TIME,
99-
mode: MODE,
99+
mode: Mode.REAL_TIME,
100100
replaySpeed: dsReplaySpeed,
101101
prefetchBatchDuration: 10000,
102102
prefetchBatchSize: 250
103103
};
104104
105105
const droneVideoDataSource = new SweApiDatasource('MISB Drone - Video', {
106106
...commonDatasourceOpts,
107-
resource: '/datastreams/8ni90dbu4uf0g/observations',
107+
resource: '/datastreams/h225hesual08g/observations',
108108
responseFormat: 'application/swe+binary',
109109
});
110110
@@ -113,37 +113,37 @@ export default {
113113
114114
const droneLocationDataSource = new SweApiDatasource('MISB UAS - Platform Location', {
115115
...commonDatasourceOpts,
116-
resource: '/datastreams/fled6eics1cl4/observations',
116+
resource: '/datastreams/o7pce3e60s0ie/observations',
117117
responseFormat: 'application/swe+json',
118118
});
119119
120120
const droneOrientationDataSource = new SweApiDatasource('MISB UAS - Platform Attitude', {
121121
...commonDatasourceOpts,
122-
resource: '/datastreams/adheadf9nghts/observations',
122+
resource: '/datastreams/mlme3gtdfepvc/observations',
123123
responseFormat: 'application/swe+json',
124124
});
125125
126126
const droneCameraOrientationDataSource = new SweApiDatasource('MISB UAS - Gimbal Attitude', {
127127
...commonDatasourceOpts,
128-
resource: '/datastreams/hrpo1u6r5096i/observations',
128+
resource: '/datastreams/h75dmug8e3sae/observations',
129129
responseFormat: 'application/swe+binary',
130130
});
131131
132132
const droneHFovDataSource = new SweApiDatasource('MISB UAS - Horizontal FoV', {
133133
...commonDatasourceOpts,
134-
resource: '/datastreams/d962edate9okm/observations',
134+
resource: '/datastreams/nhcs9rp6713ka/observations',
135135
responseFormat: 'application/swe+binary',
136136
});
137137
138138
const droneVFovDataSource = new SweApiDatasource('MISB UAS - Vertical FoV', {
139139
...commonDatasourceOpts,
140-
resource: '/datastreams/d962edate9okm/observations',
140+
resource: '/datastreams/nhcs9rp6713ka/observations',
141141
responseFormat: 'application/swe+binary',
142142
});
143143
144144
const geoRefImageFrameDataSource = new SweApiDatasource('MISB UAS - GeoReferenced Image Frame', {
145145
...commonDatasourceOpts,
146-
resource: '/datastreams/p3mp2peibksl4/observations',
146+
resource: '/datastreams/iabpf1ivua1qm/observations',
147147
responseFormat: 'application/swe+csv',
148148
});
149149
@@ -166,13 +166,14 @@ export default {
166166
masterTimeRefreshRate: 250,
167167
startTime: START_TIME,
168168
endTime: END_TIME,
169+
mode: Mode.REAL_TIME,
169170
dataSources: [
170171
droneLocationDataSource,
171172
droneVideoDataSource,
172173
droneOrientationDataSource,
173174
droneCameraOrientationDataSource,
174175
geoRefImageFrameDataSource,
175-
targetLocationDataSource,
176+
// targetLocationDataSource,
176177
droneVFovDataSource,
177178
droneHFovDataSource
178179
]

demos/misb/src/components/CollapseTimeController.vue

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
<TimeController
1010
:dataSynchronizer="dataSynchronizer"
1111
@event='onControlEvent'
12+
trackRealtime=""
1213
:skipTimeStep="'5%'"
1314
></TimeController>
1415
<v-btn

showcase/examples/leaflet-location/leaflet-location.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ let gpsDataSource = new SosGetResult("android-GPS", {
1212
endpointUrl: 'sensiasoft.net/sensorhub/sos',
1313
offeringID: 'urn:android:device:060693280a28e015-sos',
1414
observedProperty: 'http://sensorml.com/ont/swe/property/Location',
15-
startTime: '2015-02-16T07:58:15.447Z',
15+
startTime: '2015-02-16T08:01:15.447Z',
1616
endTime: '2015-02-16T08:09:00Z',
1717
mode: Mode.REPLAY,
1818
tls: true,

source/core/connector/MqttConnector.js

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ const mqttProviders = {};
4646
class MqttConnector extends DataConnector {
4747
/**
4848
*
49+
* @param url
4950
* @param properties -
5051
*/
5152
constructor(url, properties) {
@@ -54,6 +55,26 @@ class MqttConnector extends DataConnector {
5455
...properties
5556
});
5657
this.interval = -1;
58+
this.id = `mqtt-connector-${randomUUID()}`;
59+
}
60+
61+
initBc() {
62+
this.onMessage = (data, topic) => {
63+
this.broadcastChannel.postMessage({
64+
data: data,
65+
topic: topic
66+
}, [data]);
67+
}
68+
this.broadcastChannel = new BroadcastChannel(this.id);
69+
let queryString;
70+
this.broadcastChannel.onmessage = (message) => {
71+
if(message.data.message === 'subscribe') {
72+
queryString = message.data.queryString;
73+
this.doRequest(message.data.topic, message.data.queryString);
74+
} else if(message.data.message === 'unsubscribe') {
75+
this.disconnect(message.data.topic+'?'+queryString);
76+
}
77+
}
5778
}
5879

5980
getMqttProvider() {
@@ -99,6 +120,7 @@ class MqttConnector extends DataConnector {
99120
*/
100121
doRequest(topic = '',queryString= undefined) {
101122
const mqttProvider = this.getMqttProvider();
123+
102124
mqttProvider.subscribe(`${topic}?${queryString}`, this.onMessage).then(() => {
103125
this.onChangeStatus(Status.CONNECTED);
104126
});
@@ -112,7 +134,7 @@ class MqttConnector extends DataConnector {
112134
/**
113135
* Disconnects and close the mqtt client.
114136
*/
115-
async disconnect() {
137+
async disconnect(topic) {
116138
// does not call super to avoid reconnection logic and use the one of the mqtt.js lib
117139
// this.checkStatus(Status.DISCONNECTED);
118140
// this.init = false;
@@ -121,9 +143,13 @@ class MqttConnector extends DataConnector {
121143
const client = mqttProviders[this.getUrl()];
122144

123145
if (isDefined(client) && client.isConnected()) {
124-
// unsubscribe all topics
125-
await client.unsubscribeAll();
126-
// client.disconnect();
146+
if(!topic) {
147+
// unsubscribe all topics
148+
return client.unsubscribeAll();
149+
// client.disconnect();
150+
} else {
151+
return client.unsubscribe(topic);
152+
}
127153
}
128154
//delete mqttProviders[this.getUrl()];
129155
//console.warn(`Disconnected from ${this.getUrl()}`);
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
import DataConnector from "./DataConnector";
2+
import {Status} from "./Status";
3+
4+
/**
5+
* Defines the MqttTopicConnector to communicate with Shared MqttConnector using broadcast channel
6+
* @extends DataConnector
7+
*/
8+
class MqttTopicConnector extends DataConnector {
9+
/**
10+
*
11+
* @param properties -
12+
*/
13+
constructor(url, properties) {
14+
super(url, properties);
15+
this.lastReceiveTime = -1;
16+
this.interval = -1;
17+
this.broadcastChannel = new BroadcastChannel(url);
18+
this.broadcastChannel.onmessage = (message) => {
19+
if(message.data.topic === this.fullTopic) {
20+
this.onMessage(message.data.data);
21+
}
22+
}
23+
this.topics = [];
24+
}
25+
26+
doRequest(topic = '',queryString= undefined) {
27+
this.fullTopic = topic + '?'+queryString;
28+
this.broadcastChannel.postMessage({
29+
message: 'subscribe',
30+
connectorId: this.id,
31+
topic: topic,
32+
queryString: queryString
33+
});
34+
this.topics.push(topic);
35+
this.onChangeStatus(Status.CONNECTED);
36+
}
37+
/**
38+
* Disconnects.
39+
*/
40+
disconnect() {
41+
this.fullDisconnect(true);
42+
}
43+
44+
/**
45+
* Fully disconnect the websocket connection by sending a close message to the webWorker.
46+
* @param {Boolean} removeInterval - force removing the interval
47+
*/
48+
fullDisconnect(removeInterval) {
49+
if (this.broadcastChannel != null) {
50+
for(let topic of this.topics) {
51+
this.broadcastChannel.postMessage({
52+
message: 'unsubscribe',
53+
connectorId: this.id,
54+
topic: topic,
55+
});
56+
}
57+
this.broadcastChannel.close();
58+
this.broadcastChannel = null;
59+
this.topics = [];
60+
}
61+
if (removeInterval) {
62+
clearInterval(this.interval);
63+
}
64+
this.opened = false;
65+
}
66+
67+
/**
68+
* Try to reconnect if the connexion if closed
69+
*/
70+
reconnect() {
71+
this.onReconnect();
72+
if (this.init) {
73+
this.fullDisconnect(false);
74+
}
75+
this.connect();
76+
77+
}
78+
79+
/**
80+
* The onMessage method used by the websocket to callback the data
81+
* @param data the callback data
82+
* @event
83+
*/
84+
onMessage(data) {
85+
}
86+
87+
/**
88+
* Closes the webSocket.
89+
*/
90+
close() {
91+
this.disconnect();
92+
}
93+
94+
isConnected() {
95+
return this.broadcastChannel !== null && this.opened;
96+
}
97+
98+
checkStatus(status) {
99+
this.onChangeStatus(status);
100+
this.status = status;
101+
}
102+
}
103+
104+
export default MqttTopicConnector;

0 commit comments

Comments
 (0)