Skip to content

Commit 4c38e43

Browse files
committed
Fix reset dataSource by changing properties, disconnect correctly, removing from workerPool, removing handler from worker and connect again
1 parent fe7464a commit 4c38e43

File tree

8 files changed

+69
-35
lines changed

8 files changed

+69
-35
lines changed

source/core/datasource/DataSource.datasource.js

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ class DataSource {
6666
}
6767

6868
terminate() {
69-
if (this.dataSourceWorker !== null) {
70-
this.dataSourceWorker.terminate();
69+
if (this.getWorker() !== null) {
70+
this.getWorker().terminate();
7171
}
7272
}
7373

@@ -107,7 +107,7 @@ class DataSource {
107107
...this.properties,
108108
...properties
109109
};
110-
return this.dataSourceWorker.postMessageWithAck({
110+
return this.getWorker().postMessageWithAck({
111111
message: 'update-properties',
112112
data: properties,
113113
dsId: this.id
@@ -136,8 +136,7 @@ class DataSource {
136136
}
137137

138138
async initDataSource(properties = this.properties) {
139-
this.dataSourceWorker = this.getWorker();
140-
return this.dataSourceWorker.postMessageWithAck({
139+
return this.getWorker().postMessageWithAck({
141140
message: 'init',
142141
id: this.id,
143142
properties: properties,
@@ -176,7 +175,7 @@ class DataSource {
176175
}
177176

178177
async doConnect() {
179-
return this.dataSourceWorker.postMessageWithAck({
178+
return this.getWorker().postMessageWithAck({
180179
message: 'connect',
181180
dsId: this.id
182181
});
@@ -187,7 +186,7 @@ class DataSource {
187186
return false;
188187
} else {
189188
return this.checkInit().then(() => {
190-
return this.dataSourceWorker.postMessageWithAck({
189+
return this.getWorker().postMessageWithAck({
191190
message: 'is-connected',
192191
dsId: this.id
193192
});
@@ -200,7 +199,7 @@ class DataSource {
200199
*/
201200
async disconnect() {
202201
await this.checkInit();
203-
return this.dataSourceWorker.postMessageWithAck({
202+
return this.getWorker().postMessageWithAck({
204203
message: 'disconnect',
205204
dsId: this.id
206205
});
@@ -209,8 +208,21 @@ class DataSource {
209208
async onDisconnect() {
210209
}
211210

212-
reset() {
213-
this.init = undefined;
211+
async reset() {
212+
await this.disconnect();
213+
this.resetInit();
214+
return this.removeWorker();
215+
}
216+
217+
async removeWorker() {
218+
if (this.id in dataSourceWorkers) {
219+
return this.getWorker().postMessageWithAck({
220+
message: 'remove-handler',
221+
dsId: this.id
222+
}).then(() => {
223+
delete dataSourceWorkers[this.id]; // delete index from pool
224+
});
225+
}
214226
}
215227

216228
onRemovedDataSource(dataSourceId) {

source/core/datasource/TimeSeries.datasource.js

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@ class TimeSeriesDatasource {
3535
...properties
3636
});
3737

38-
this.setMode(properties.mode);
38+
this.setMode(properties.mode, false);
3939
}
4040

41-
async setMode(mode) {
42-
if (this.timeSeriesDataSource) {
41+
async setMode(mode, disconnect = true) {
42+
if (disconnect && this.timeSeriesDataSource) {
4343
await this.timeSeriesDataSource.disconnect();
4444
}
4545

@@ -320,11 +320,10 @@ class TimeSeriesDatasource {
320320
return this.timeSeriesDataSource.isConnected();
321321
}
322322

323-
reset() {
324-
this.timeSeriesDataSource.reset();
323+
async reset() {
324+
return this.timeSeriesDataSource.reset();
325325
}
326326

327-
328327
onTimeChanged(min, max, start, end) {
329328
}
330329

source/core/datasource/TimeSeries.realtime.datasource.js

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {DATA_SYNCHRONIZER_TOPIC, DATASOURCE_TIME_TOPIC} from "../Constants";
1818
import DataSource from "./DataSource.datasource";
1919
import {Mode} from './Mode';
2020
import MqttConnector from "../connector/MqttConnector";
21+
import {isDefined} from "../utils/Utils";
2122

2223
/**
2324
* The DataSource is the abstract class used to create different datasources.
@@ -186,7 +187,7 @@ class TimeSeriesRealtimeDatasource extends DataSource {
186187
await this.checkInit();
187188
const topic = DATA_SYNCHRONIZER_TOPIC + this.dataSynchronizer.id;
188189
this.properties.version = this.dataSynchronizer.version();
189-
return this.dataSourceWorker.postMessageWithAck({
190+
return this.getWorker().postMessageWithAck({
190191
message: 'topics',
191192
topics: {
192193
data: topic,
@@ -201,29 +202,32 @@ class TimeSeriesRealtimeDatasource extends DataSource {
201202
}
202203

203204
async removeDataSynchronizer() {
204-
if (this.dataSourceWorker) {
205-
this.dataSourceWorker.terminate();
206-
this.dataSynchronizer = undefined;
207-
}
205+
await this.removeWorker();
206+
this.dataSynchronizer = undefined;
208207
// this.init = undefined;
209208
return this.checkInit();
210-
211209
}
212210

213211
/**
214212
* Disconnect the dataSource then the protocol will be closed as well.
215213
*/
216214
async disconnect() {
217-
await this.checkInit();
218-
return this.dataSourceWorker.postMessageWithAck({
219-
message: 'disconnect',
220-
dsId: this.id,
221-
mode: Mode.REAL_TIME,
222-
});
215+
if (isDefined(this.init)) {
216+
try {
217+
return this.getWorker().postMessageWithAck({
218+
message: 'disconnect',
219+
dsId: this.id,
220+
mode: Mode.REAL_TIME,
221+
});
222+
} catch (ex) {
223+
console.error(ex);
224+
}
225+
}
223226
}
224227

225228
async doConnect() {
226-
return this.dataSourceWorker.postMessageWithAck({
229+
await this.checkInit();
230+
return this.getWorker().postMessageWithAck({
227231
message: 'connect',
228232
startTime: 'now',
229233
version: this.version(),
@@ -247,7 +251,7 @@ class TimeSeriesRealtimeDatasource extends DataSource {
247251
topics.sync = this.dataSynchronizer.getTimeTopicId()
248252
}
249253

250-
return this.dataSourceWorker.postMessageWithAck({
254+
return this.getWorker().postMessageWithAck({
251255
message: 'topics',
252256
topics: topics,
253257
dsId: this.id,
@@ -273,6 +277,12 @@ class TimeSeriesRealtimeDatasource extends DataSource {
273277
version() {
274278
return this.properties.version;
275279
}
280+
281+
async reset() {
282+
console.warn(`dataSource ${this.id} has been reset`);
283+
await super.reset();
284+
return this.doConnect();
285+
}
276286
}
277287

278288
export default TimeSeriesRealtimeDatasource;

source/core/datasource/TimeSeries.replay.datasource.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,11 @@ class TimeSeriesReplayDatasource extends DataSource {
333333
version: version
334334
});
335335
}
336+
async reset() {
337+
console.warn(`dataSource ${this.id} has been reset`);
338+
await super.reset();
339+
return this.doConnect();
340+
}
336341
}
337342

338343
export default TimeSeriesReplayDatasource;

source/core/datasource/worker/DataSourceWorker.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ class DataSourceWorker {
3030
this.handleIsConnected(eventData, resp);
3131
} else if (eventData.message === 'is-init') {
3232
this.handleIsInit(eventData, resp);
33+
} else if (eventData.message === 'remove-handler') {
34+
this.handleRemoveHandler(eventData, resp);
3335
}
3436
} catch (ex) {
3537
console.error(ex);
@@ -82,6 +84,12 @@ class DataSourceWorker {
8284
this.postMessage(resp);
8385
}
8486

87+
handleRemoveHandler(eventData, resp) {
88+
const dsId = eventData.dsId;
89+
delete this.dataSourceHandlers[dsId];
90+
this.postMessage(resp);
91+
}
92+
8593
handleIsInit(eventData, resp) {
8694
const dsId = eventData.dsId;
8795
resp.data = this.dataSourceHandlers[dsId].isInitialized();

source/core/timesync/DataSynchronizer.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ class DataSynchronizer {
4444
id: id+'-realtime'
4545
}, this)
4646
this.broadcastChannels = [];
47-
this.setMode(properties.mode || Mode.REPLAY).then(() => {
47+
this.setMode(properties.mode || Mode.REPLAY, false).then(() => {
4848
this.dataSynchronizer.onTimeChanged = (min, max, start, end) => this.onTimeChanged(min, max, start, end);
4949
this.dataSynchronizer.onAddedDataSource = (dataSourceId) => this.onAddedDataSource(dataSourceId);
5050
this.dataSynchronizer.onRemovedDataSource = (dataSourceId) => this.onRemovedDataSource(dataSourceId);
@@ -55,8 +55,8 @@ class DataSynchronizer {
5555
return this.id;
5656
}
5757

58-
async setMode(mode) {
59-
if (this.dataSynchronizer) {
58+
async setMode(mode, disconnect = true) {
59+
if (this.dataSynchronizer && disconnect) {
6060
await this.dataSynchronizer.disconnect();
6161
}
6262
if (mode === Mode.REPLAY) {
@@ -73,7 +73,7 @@ class DataSynchronizer {
7373
this.broadcastChannels = [];
7474
const promises=[];
7575
for(let ds of this.dataSynchronizer.getDataSources()) {
76-
promises.push(ds.setMode(mode));
76+
promises.push(ds.setMode(mode, disconnect));
7777
}
7878
this.dataSynchronizer.onTimeChanged = (min, max, start, end) => this.onTimeChanged(min, max, start, end);
7979
this.dataSynchronizer.onAddedDataSource = (dataSourceId) => this.onAddedDataSource(dataSourceId);

source/core/timesync/rt/DataSynchronizer.realtime.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ class DataSynchronizerRealtime {
214214
* Disconnects all dataSources
215215
*/
216216
async disconnect() {
217+
console.log('disconnect')
217218
await this.reset();
218219
const promises = [];
219220
for (let dataSource of this.dataSources) {

source/core/ui/view/View.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,6 @@ class View {
242242
if (event.data.type === EventType.STATUS && event.data.status === Status.CLOSED_ERROR) {
243243
self.reset();
244244
} else if (event.data.type === EventType.DATA) {
245-
246245
const that = this;
247246

248247
// transform the data

0 commit comments

Comments
 (0)