diff --git a/src/specificClass.js b/src/specificClass.js index 35f52a4..5578628 100644 --- a/src/specificClass.js +++ b/src/specificClass.js @@ -88,11 +88,11 @@ class PumpingStation { //this.logger.warn(`Unsupported child software type: ${softwareType}`); } - _safetyController(snapshot,remainingTime,direction){ + _safetyController(remainingTime, direction){ this.safetyControllerActive = false; - const vol = this._resolveVolume(snapshot); + const vol = this._resolveVolume(); if(vol == null){ //if we cant get a volume we cant control blind turn all pumps off. @@ -200,21 +200,22 @@ class PumpingStation { } - async _controlLevelBased(snapshot, direction) { + async _controlLevelBased(direction) { const { startLevel, stopLevel } = this.config.control.levelbased; - const flowUnit = this.measurements.getUnit('flow'); // use container as source of truth + const flowUnit = this.measurements.getUnit('flow'); // use container as source of truth + const levelunit = this.measurements.getUnit('level'); // use container as source of truth + let percControl = 0; - const level = (snap) => { - for (const variant of this.levelVariants) { - const levelSnap = snap.levels?.[variant]; - if (levelSnap?.samples?.current?.value !== undefined) { - return levelSnap.samples.current.value; - } - } - return null; - }; + for (const variant of this.levelVariants) { + + const level = this.measurements.type('level').variant(variant).postition('atEquipment').getCurrentValue(levelunit); + + if(!level) continue; + + } + const levelVal = level(snapshot); if (levelVal == null || !Number.isFinite(levelVal)) { @@ -288,11 +289,15 @@ class PumpingStation { } } - - _resolveVolume(snapshot) { + _resolveVolume() { for (const variant of this.volVariants) { - const volsnap = snapshot.vols[variant]; - if (volsnap?.samples?.exists) return volsnap.samples.current?.value ?? null; + const type = 'volume'; + const unit = this.measurements.getUnit(type); + const volume = this.measurements.type(type).variant(variant).position('atEquipment').getCurrentValue(unit) || null; + + if (!volume) continue; + + return volume; } return null; } @@ -408,7 +413,7 @@ class PumpingStation { this._updatePredictedVolume(snapshot); const netFlow = this._selectBestNetFlow(snapshot); - const remaining = this._computeRemainingTime(snapshot, netFlow); + const remaining = this._computeRemainingTime(netFlow); //check safety conditions this._safetyController(snapshot,remaining.seconds,netFlow.direction); @@ -501,28 +506,14 @@ class PumpingStation { .child(childId) .value(eventData.value, ts, unit); - this._refreshAggregatedPredictedFlow(); }; eventNames.forEach((eventName) => child.measurements.emitter.on(eventName, handler)); } - _refreshAggregatedPredictedFlow() { - const preferredUnit = this.measurements.getUnit('flow'); - const childPositions = Object.keys(this.measurements.measurements?.flow?.predictedChild || {}); - const inflowPositions = childPositions.filter((p) => p === 'in'); - const outflowPositions = childPositions.filter((p) => p === 'out'); - - const sumIn = this.measurements.sum('flow', 'predicted', inflowPositions, preferredUnit); - const sumOut = this.measurements.sum('flow', 'predicted', outflowPositions, preferredUnit); - - this.measurements.type('flow').variant('predicted').position('in').value(sumIn, Date.now(), preferredUnit); - this.measurements.type('flow').variant('predicted').position('out').value(sumOut, Date.now(), preferredUnit); - } setManualInflow(value, timestamp = Date.now(), unit) { const num = Number(value); - const unit = this.measurements.getUnit('flow'); // Write manual inflow into the aggregated bucket this.measurements @@ -532,7 +523,6 @@ class PumpingStation { .child('manual-qin') .value(num, timestamp, unit); - this._refreshAggregatedPredictedFlow(); } _handleMeasurement(measurementType, value, position, context) { @@ -718,23 +708,64 @@ class PumpingStation { } _selectBestNetFlow(snapshot) { + const type = 'flow'; + const unit = this.measurements.getUnit(type) || 'm3/s'; + for (const variant of this.flowVariants) { - const flow = snapshot.flows[variant]; + // Check if we have *any* flows for this variant at all + const bucket = this.measurements.measurements?.[type]?.[variant]; + if (!bucket || Object.keys(bucket).length === 0) { + this.logger.debug(`No ${type}.${variant} data; skipping this variant`); + continue; + } - if (!flow.inflow.exists && !flow.outflow.exists) continue; + // Sum all inflow/outflow positions (in/upstream vs out/downstream) + const inflow = this.measurements.sum( + type, + variant, + this.flowPositions.inflow, + unit + ) || 0; - const unit = this.measurements.getUnit('flow'); - const inflow = flow.inflow.current?.value ?? flow.inflow.previous?.value ?? 0; - const outflow = flow.outflow.current?.value ?? flow.outflow.previous?.value ?? 0; - const net = inflow - outflow; // -> pos is filling + const outflow = this.measurements.sum( + type, + variant, + this.flowPositions.outflow, + unit + ) || 0; - this.measurements.type('netFlowRate').variant(variant).position('atequipment').value(net, Date.now(), unit); - this.logger.debug(`inflow : ${inflow} - outflow : ${outflow}`); + // If absolutely nothing is flowing and bucket only just got created, + // we can choose to skip this variant and try the next one. + const absIn = Math.abs(inflow); + const absOut = Math.abs(outflow); - return { value: net,source: variant,direction: this._deriveDirection(net) }; + if (absIn < this.flowThreshold && absOut < this.flowThreshold) { + this.logger.debug( + `Flows for ${type}.${variant} below threshold; inflow=${inflow}, outflow=${outflow}, trying next variant` + ); + continue; + } + + const net = inflow - outflow; // >0 = filling + + this.measurements + .type('netFlowRate') + .variant(variant) + .position('atequipment') + .value(net, Date.now(), unit); + + this.logger.debug( + `netFlow (${variant}): inflow=${inflow} ${unit}, outflow=${outflow} ${unit}, net=${net} ${unit}` + ); + + return { + value: net, + source: variant, + direction: this._deriveDirection(net) + }; } - // fallback using level trend + // --- Fallback: level trend for (const variant of this.levelVariants) { const levelRate = snapshot.levelRates[variant]; if (!Number.isFinite(levelRate)) continue; @@ -751,7 +782,9 @@ class PumpingStation { return { value: 0, source: null, direction: 'steady' }; } - _computeRemainingTime(snapshot, netFlow) { + + _computeRemainingTime(netFlow) { + if (!netFlow || Math.abs(netFlow.value) < this.flowThreshold) { return { seconds: null, source: null }; } @@ -761,17 +794,22 @@ class PumpingStation { this.logger.warn('Invalid basin surface area.'); return { seconds: null, source: null }; } - - + const type = 'level'; + const unit = this.measurements.getUnit(type); for (const variant of this.levelVariants) { - const levelSnap = snapshot.levels[variant]; - const current = levelSnap.samples.current?.value ?? null; - if (!Number.isFinite(current)) continue; + + const lvl = this.measurements + .type(type) + .variant(variant) + .position('atequipment') + .getCurrentValue() + + if (!Number.isFinite(lvl)) continue; const remainingHeight = netFlow.value > 0 - ? Math.max(heightOverflow - current, 0) - : Math.max(current - heightOutlet, 0); + ? Math.max(heightOverflow - lvl, 0) + : Math.max(lvl - heightOutlet, 0); const seconds = (remainingHeight * surfaceArea) / Math.abs(netFlow.value); if (!Number.isFinite(seconds)) continue; @@ -956,7 +994,7 @@ module.exports = PumpingStation; /* ------------------------------------------------------------------------- */ /* Example usage */ /* ------------------------------------------------------------------------- */ -/* + if (require.main === module) { const Measurement = require('../../measurement/src/specificClass'); const RotatingMachine = require('../../rotatingMachine/src/specificClass'); @@ -1101,24 +1139,27 @@ if (require.main === module) { //seedSample(inflowSensor, 'flow', 0.35, 'm3/s'); //seedSample(outflowSensor, 'flow', 0.20, 'm3/s'); + + setInterval( () => station.tick(), 1000); - await new Promise((resolve) => setTimeout(resolve, 10)); + await new Promise((resolve) => setTimeout(resolve, 10)); - console.log('Initial state:', station.state); + console.log('Initial state:', station.state); + station.setManualInflow(10,Date.now(),'l/s'); - await pump1.handleInput('parent', 'execSequence', 'startup'); - await pump1.handleInput('parent', 'execMovement', 10); + await pump1.handleInput('parent', 'execSequence', 'startup'); + await pump1.handleInput('parent', 'execMovement', 10); - await pump2.handleInput('parent', 'execSequence', 'startup'); - await pump2.handleInput('parent', 'execMovement', 10); + await pump2.handleInput('parent', 'execSequence', 'startup'); + await pump2.handleInput('parent', 'execMovement', 10); - console.log('Station state:', station.state); - console.log('Station output:', station.getOutput()); - })().catch((err) => { - console.error('Demo failed:', err); - }); + console.log('Station state:', station.state); + console.log('Station output:', station.getOutput()); + })().catch((err) => { + console.error('Demo failed:', err); + }); } //*/