fixed multiple children being able to pull and push to pumpingstation

This commit is contained in:
znetsixe
2025-11-06 16:46:54 +01:00
parent 1848486f1c
commit 9e4b149b64

View File

@@ -23,7 +23,9 @@ class PumpingStation {
//variants in determining what gets priority //variants in determining what gets priority
this.flowVariants = ['measured', 'predicted']; this.flowVariants = ['measured', 'predicted'];
this.levelVariants = ['measured', 'predicted']; this.levelVariants = ['measured', 'predicted'];
this.volVariants = ['measured', 'predicted'];
this.flowPositions = { inflow: ['in', 'upstream'], outflow: ['out', 'downstream'] }; this.flowPositions = { inflow: ['in', 'upstream'], outflow: ['out', 'downstream'] };
this.predictedFlowChildren = new Map(); // childId -> { in: 0, out: 0 }
this.basin = {}; this.basin = {};
this.state = { this.state = {
@@ -49,7 +51,7 @@ class PumpingStation {
return; return;
} }
if (softwareType === 'machine' || softwareType === 'pumpingStation') { if (softwareType === 'machine' || softwareType === 'pumpingStation' || softwareType === 'machineGroupController') {
this._registerPredictedFlowChild(child); this._registerPredictedFlowChild(child);
return; return;
} }
@@ -57,16 +59,38 @@ class PumpingStation {
this.logger.warn(`Unsupported child software type: ${softwareType}`); this.logger.warn(`Unsupported child software type: ${softwareType}`);
} }
_safeGuardSystem(snapshot,remainingTime){
let vol = null;
for (const variant of this.volVariants){
const volsnap = snapshot.vols[variant];
//go through with variants until we find one that exists
if (!volsnap.samples.exists){ continue};
const vol = volsnap.samples.current?.value ?? null;
}
if(vol == null){
//if we cant get a volume, we must force whole system off.
};
/*
if(remainingTime < timeThreshhold || vol > maxVolume || vol < minVolume){}
*/
}
tick() { tick() {
const snapshot = this._takeMeasurementSnapshot(); const snapshot = this._takeMeasurementSnapshot();
this._updatePredictedVolume(snapshot); this._updatePredictedVolume(snapshot);
const netFlow = this._selectBestNetFlow(snapshot);
//write netflow in measurment container
const netFlow = this._selectBestNetFlow(snapshot);
const remaining = this._computeRemainingTime(snapshot, netFlow); const remaining = this._computeRemainingTime(snapshot, netFlow);
this._safeGuardSystem(snapshot,remaining.seconds);
this.state = { this.state = {
direction: netFlow.direction, direction: netFlow.direction,
netFlow: netFlow.value, netFlow: netFlow.value,
@@ -100,32 +124,63 @@ class PumpingStation {
}); });
} }
//register machines or pumping stations that can provide predicted flow data
_registerPredictedFlowChild(child) { _registerPredictedFlowChild(child) {
const position = child.config.functionality.positionVsParent; const position = child.config.functionality.positionVsParent;
const childName = child.config.general.name; const childName = child.config.general.name;
const childId = child.config.general.id ?? childName;
const listener = (eventName, posKey) => { const posKey =
child.measurements.emitter.on(eventName, (eventData) => { position === 'downstream' || position === 'out' || position === 'atequipment'
this.logger.debug( ? 'out'
`Predicted flow update from ${childName} (${position}) -> ${eventData.value} ${eventData.unit}` : position === 'upstream' || position === 'in'
); ? 'in'
this.measurements : null;
.type('flow')
.variant('predicted') if (!posKey) {
.position(posKey) this.logger.warn(`Unsupported predicted flow position "${position}" from ${childName}`);
.value(eventData.value, eventData.timestamp, eventData.unit); return;
}); }
if (!this.predictedFlowChildren.has(childId)) {
this.predictedFlowChildren.set(childId, { in: 0, out: 0 });
}
const handler = (eventData = {}) => {
const value = Number.isFinite(eventData.value) ? eventData.value : 0;
const timestamp = eventData.timestamp ?? Date.now();
const unit = eventData.unit ?? 'm3/s';
this.logger.debug(
`Predicted flow update from ${childName} (${childId}, ${posKey}) -> ${value} ${unit}`
);
this.predictedFlowChildren.get(childId)[posKey] = value;
this._refreshAggregatedPredictedFlow(posKey, timestamp, unit);
}; };
if (position === 'downstream' || position === 'atequipment' || position === 'out') { const eventNames =
listener('flow.predicted.downstream', 'out'); posKey === 'in'
} else if (position === 'upstream' || position === 'in') { ? ['flow.predicted.downstream', 'flow.predicted.upstream']
listener('flow.predicted.downstream', 'in'); : ['flow.predicted.downstream'];
} else {
this.logger.warn(`Unsupported predicted flow position "${position}" from ${childName}`); for (const eventName of eventNames) {
child.measurements.emitter.on(eventName, handler);
} }
} }
_refreshAggregatedPredictedFlow(direction, timestamp = Date.now(), unit = 'm3/s') {
const sum = Array.from(this.predictedFlowChildren.values())
.map((entry) => (Number.isFinite(entry[direction]) ? entry[direction] : 0))
.reduce((acc, val) => acc + val, 0);
this.measurements
.type('flow')
.variant('predicted')
.position(direction)
.value(sum, timestamp, unit);
}
_handleMeasurement(measurementType, value, position, context) { _handleMeasurement(measurementType, value, position, context) {
switch (measurementType) { switch (measurementType) {
case 'level': case 'level':
@@ -213,13 +268,18 @@ class PumpingStation {
const snapshot = { const snapshot = {
flows: {}, flows: {},
levels: {}, levels: {},
levelRates: {} levelRates: {},
vols:{},
}; };
for (const variant of this.flowVariants) { for (const variant of this.flowVariants) {
snapshot.flows[variant] = this._snapshotFlowsForVariant(variant); snapshot.flows[variant] = this._snapshotFlowsForVariant(variant);
} }
for (const variant of this.volVariants){
snapshot.vols[variant] = this._snapshotVolsForVariant(variant);
}
for (const variant of this.levelVariants) { for (const variant of this.levelVariants) {
snapshot.levels[variant] = this._snapshotLevelForVariant(variant); snapshot.levels[variant] = this._snapshotLevelForVariant(variant);
snapshot.levelRates[variant] = this._estimateLevelRate(snapshot.levels[variant]); snapshot.levelRates[variant] = this._estimateLevelRate(snapshot.levels[variant]);
@@ -228,15 +288,17 @@ class PumpingStation {
return snapshot; return snapshot;
} }
_snapshotVolsForVariant(variant) {
const volumeSeries = this._locateSeries('volume', variant, ['atequipment']);
return {variant,samples: this._seriesSamples(volumeSeries)};
}
_snapshotFlowsForVariant(variant) { _snapshotFlowsForVariant(variant) {
const inflowSeries = this._locateSeries('flow', variant, this.flowPositions.inflow); const inflowSeries = this._locateSeries('flow', variant, this.flowPositions.inflow);
const outflowSeries = this._locateSeries('flow', variant, this.flowPositions.outflow); const outflowSeries = this._locateSeries('flow', variant, this.flowPositions.outflow);
return { return {variant, inflow: this._seriesSamples(inflowSeries), outflow: this._seriesSamples(outflowSeries) };
variant,
inflow: this._seriesSamples(inflowSeries),
outflow: this._seriesSamples(outflowSeries)
};
} }
_snapshotLevelForVariant(variant) { _snapshotLevelForVariant(variant) {
@@ -342,6 +404,7 @@ class PumpingStation {
return { seconds: null, source: null }; return { seconds: null, source: null };
} }
for (const variant of this.levelVariants) { for (const variant of this.levelVariants) {
const levelSnap = snapshot.levels[variant]; const levelSnap = snapshot.levels[variant];
const current = levelSnap.samples.current?.value ?? null; const current = levelSnap.samples.current?.value ?? null;
@@ -405,10 +468,7 @@ class PumpingStation {
const writeTimestamp = timestampPrev + Math.max(deltaSeconds, 0) * 1000; const writeTimestamp = timestampPrev + Math.max(deltaSeconds, 0) * 1000;
const volumeSeries = this.measurements const volumeSeries = this.measurements.type('volume').variant('predicted').position('atEquipment');
.type('volume')
.variant('predicted')
.position('atEquipment');
const currentVolume = volumeSeries.getCurrentValue('m3') ?? this.basin.minVol; const currentVolume = volumeSeries.getCurrentValue('m3') ?? this.basin.minVol;