DataHub Timeseries Pipeline — Implementation Plan
Esta página aún no está disponible en tu idioma.
DataHub Timeseries Pipeline — Implementation Plan
Section titled “DataHub Timeseries Pipeline — Implementation Plan”For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Fix the 4 interconnected failures preventing the DataHub module from displaying IoT sensor data and simulated weather station data.
Architecture: Self-describing NGSI-LD entities. Each entity carries the metadata needed to locate its own timeseries data — no fragile resolution chains. The timeseries-reader reads the entity once and knows where to query. Additionally: fix attribute name mapping (NGSI-LD names vs DB columns), clean the telemetry notification handler to store only real measurements, and fix subscription management (port + multi-tenant).
Tech Stack: Python (Flask for timeseries-reader, FastAPI for telemetry-worker), psycopg2, asyncpg, requests, NGSI-LD/Orion-LD, TimescaleDB, pytest
Spec: docs/superpowers/specs/2026-04-08-datahub-timeseries-pipeline-design.md
File Structure
Section titled “File Structure”| Service | File | Action | Responsibility |
|---|---|---|---|
| weather-worker | weather_worker/storage/orion_writer.py | Modify | Add municipality_code param, add municipalityCode Property to entity |
| weather-worker | weather_worker/storage/orion_writer.py | Modify | Pass municipality_code through sync_weather_to_orion() |
| timeseries-reader | urn_resolution.py | Modify | Direct municipalityCode resolution for WeatherObserved |
| timeseries-reader | app.py | Modify | Add _WEATHER_ATTRIBUTE_MAP, _resolve_weather_attribute(), add wind_direction_deg |
| telemetry-worker | telemetry_worker/notification_handler.py | Modify | Filter metadata keys, skip Relationship/GeoProperty, only scalar values |
| telemetry-worker | telemetry_worker/subscription_manager.py | Modify | Fix port default, add multi-tenant, cleanup broken subs |
| telemetry-worker | app.py | Modify | Add periodic subscription check task |
| datahub | backend/app/api/entities.py | Modify | Add municipalityCode to _NGSI_SYSTEM_KEYS |
| k8s | k8s/core/services/telemetry-worker-deployment.yaml | Create | New deployment with correct port + image |
| tests | services/tests/test_weather_municipality.py | Create | Tests for Component 1 |
| tests | services/tests/test_weather_attribute_map.py | Create | Tests for Component 2 |
| tests | services/tests/test_telemetry_measurements.py | Create | Tests for Component 3 |
| tests | services/tests/test_subscription_manager.py | Create | Tests for Component 4 |
Task 1: WeatherObserved — Direct Municipality Resolution (weather-worker)
Section titled “Task 1: WeatherObserved — Direct Municipality Resolution (weather-worker)”Files:
-
Modify:
nkz/services/weather-worker/weather_worker/storage/orion_writer.py:164-308(create_weather_observed_entity) -
Modify:
nkz/services/weather-worker/weather_worker/storage/orion_writer.py:444-552(sync_weather_to_orion) -
Test:
nkz/services/tests/test_weather_municipality.py -
Step 1: Write the failing test for
create_weather_observed_entitywithmunicipality_code
Create nkz/services/tests/test_weather_municipality.py:
"""Tests for municipality_code on WeatherObserved entities."""import jsonfrom unittest.mock import patch, MagicMock
import pytest
def test_create_weather_observed_includes_municipality_code(): """When municipality_code is provided, entity must include municipalityCode Property.""" from weather_worker.storage.orion_writer import create_weather_observed_entity
mock_response = MagicMock() mock_response.status_code = 201
with patch("weather_worker.storage.orion_writer.requests.post", return_value=mock_response) as mock_post: result = create_weather_observed_entity( parcel_id="urn:ngsi-ld:AgriParcel:test:p1", tenant_id="test", location=(1.0, 42.0), weather_data={"temp_avg": 22.5}, municipality_code="31012", )
assert result is not None # Inspect the entity JSON sent to Orion call_args = mock_post.call_args entity = call_args.kwargs.get("json") or call_args[1].get("json") assert "municipalityCode" in entity assert entity["municipalityCode"]["type"] == "Property" assert entity["municipalityCode"]["value"] == "31012"
def test_create_weather_observed_without_municipality_code(): """When municipality_code is None, entity must NOT include municipalityCode.""" from weather_worker.storage.orion_writer import create_weather_observed_entity
mock_response = MagicMock() mock_response.status_code = 201
with patch("weather_worker.storage.orion_writer.requests.post", return_value=mock_response) as mock_post: result = create_weather_observed_entity( parcel_id="urn:ngsi-ld:AgriParcel:test:p1", tenant_id="test", location=(1.0, 42.0), weather_data={"temp_avg": 22.5}, municipality_code=None, )
assert result is not None call_args = mock_post.call_args entity = call_args.kwargs.get("json") or call_args[1].get("json") assert "municipalityCode" not in entity- Step 2: Run test to verify it fails
Run: cd /home/g/Documents/nekazari/nkz && python -m pytest services/tests/test_weather_municipality.py -v
Expected: FAIL — create_weather_observed_entity() does not accept municipality_code parameter.
- Step 3: Implement
municipality_codeparameter increate_weather_observed_entity
Edit nkz/services/weather-worker/weather_worker/storage/orion_writer.py. Change the function signature at line 164:
def create_weather_observed_entity( parcel_id: str, tenant_id: str, location: Tuple[float, float], weather_data: Dict[str, Any], observed_at: Optional[datetime] = None, municipality_code: Optional[str] = None,) -> Optional[str]:After the "refParcel" line (line 210), add the municipalityCode Property:
"refParcel": {"type": "Relationship", "object": parcel_id}, }
# Self-describing: carry municipality code for direct timeseries resolution if municipality_code: entity["municipalityCode"] = { "type": "Property", "value": municipality_code, }- Step 4: Run test to verify it passes
Run: cd /home/g/Documents/nekazari/nkz && python -m pytest services/tests/test_weather_municipality.py -v
Expected: PASS (both tests)
- Step 5: Write the failing test for
sync_weather_to_orionpassingmunicipality_code
Append to nkz/services/tests/test_weather_municipality.py:
def test_sync_weather_to_orion_passes_municipality_code(): """sync_weather_to_orion must forward municipality_code to create_weather_observed_entity.""" from weather_worker.storage.orion_writer import sync_weather_to_orion
fake_parcel = { "id": "urn:ngsi-ld:AgriParcel:test:p1", "type": "AgriParcel", "location": { "type": "GeoProperty", "value": {"type": "Point", "coordinates": [1.0, 42.0]}, }, }
with patch("weather_worker.storage.orion_writer.get_parcels_by_location", return_value=[fake_parcel]), \ patch("weather_worker.storage.orion_writer.create_weather_observed_entity", return_value="urn:ok") as mock_create:
count = sync_weather_to_orion( tenant_id="test", latitude=42.0, longitude=1.0, weather_data={"temp_avg": 20.0}, municipality_code="31012", )
assert count == 1 # Verify municipality_code was forwarded call_kwargs = mock_create.call_args.kwargs if mock_create.call_args.kwargs else {} # Could be positional — check both if "municipality_code" in call_kwargs: assert call_kwargs["municipality_code"] == "31012" else: # positional args: parcel_id, tenant_id, location, weather_data, observed_at, municipality_code assert "31012" in mock_create.call_args.args or any("31012" == str(v) for v in call_kwargs.values())- Step 6: Run test to verify it fails
Run: cd /home/g/Documents/nekazari/nkz && python -m pytest services/tests/test_weather_municipality.py::test_sync_weather_to_orion_passes_municipality_code -v
Expected: FAIL — sync_weather_to_orion() does not accept municipality_code parameter.
- Step 7: Implement
municipality_codeinsync_weather_to_orion
Edit nkz/services/weather-worker/weather_worker/storage/orion_writer.py. Change the function signature at line 444:
def sync_weather_to_orion( tenant_id: str, latitude: float, longitude: float, weather_data: Dict[str, Any], observed_at: Optional[datetime] = None, radius_km: float = 10.0, municipality_code: Optional[str] = None,) -> int:Change the create_weather_observed_entity call at line 534:
entity_id = create_weather_observed_entity( parcel_id=parcel_id, tenant_id=tenant_id, location=parcel_location, weather_data=weather_data, observed_at=observed_at, municipality_code=municipality_code, )- Step 8: Run all tests to verify they pass
Run: cd /home/g/Documents/nekazari/nkz && python -m pytest services/tests/test_weather_municipality.py -v
Expected: PASS (all 3 tests)
- Step 9: Commit
cd /home/g/Documents/nekazari/nkzgit add services/weather-worker/weather_worker/storage/orion_writer.py services/tests/test_weather_municipality.pygit commit -m "feat(weather-worker): add municipalityCode to WeatherObserved entities
Self-describing NGSI-LD entities carry municipality_code for O(1) timeseries resolution.Eliminates fragile 4-hop chain (refParcel → parcel → address → weather_observations)."Task 2: WeatherObserved — Pass municipality_code from ingest_weather_data
Section titled “Task 2: WeatherObserved — Pass municipality_code from ingest_weather_data”Files:
- Modify:
nkz/services/weather-worker/main.py:259(call tosync_weather_to_orioninsiderun_ingestion_cycle) - No separate test needed — the existing
ingest_weather_datadoes NOT callsync_weather_to_orion; the Orion sync is a separate concern. Themunicipality_codeis already available iningest_weather_databut that method writes to TimescaleDB. The Orion sync caller is whereversync_weather_to_orionis invoked.
Looking at the code: main.py calls self.ingest_weather_data(tenant_id, municipality_code, latitude, longitude) which writes to TimescaleDB. The Orion sync (sync_weather_to_orion) must be called separately. Let me check where it’s called.
- Step 1: Find where
sync_weather_to_orionis called
Run: cd /home/g/Documents/nekazari/nkz && grep -rn "sync_weather_to_orion" services/weather-worker/
Look for the call site. If it’s only called from ingest_weather_data or a separate sync step, we need to ensure municipality_code flows through.
- Step 2: Add Orion sync call with
municipality_codeiningest_weather_data
If sync_weather_to_orion is not already called from ingest_weather_data, it needs to be. Based on the spec, the weather-worker ingest_weather_data() already has municipality_code — it just needs to pass it. Check the actual call site and add the municipality_code=municipality_code kwarg.
If the Orion sync is called from a different location (e.g., a scheduler or separate sync script), update that call site instead.
In main.py, ingest_weather_data() already receives municipality_code at line 91. If it calls sync_weather_to_orion anywhere, add municipality_code=municipality_code.
If it doesn’t call sync_weather_to_orion (the DB write and Orion sync are separate), find the actual caller and update it.
- Step 3: Commit
cd /home/g/Documents/nekazari/nkzgit add services/weather-worker/git commit -m "feat(weather-worker): pass municipality_code through to Orion sync"Task 3: Timeseries Reader — Direct municipalityCode Resolution
Section titled “Task 3: Timeseries Reader — Direct municipalityCode Resolution”Files:
-
Modify:
nkz/services/timeseries-reader/urn_resolution.py:175-221(_resolve_urn_to_weather_key) -
Test:
nkz/services/tests/test_weather_municipality.py(extend) -
Step 1: Write the failing test for direct
municipalityCoderesolution
Append to nkz/services/tests/test_weather_municipality.py:
def test_resolve_urn_weather_key_direct_municipality_code(): """WeatherObserved with municipalityCode should resolve directly, no chain.""" from timeseries_reader.urn_resolution import _resolve_urn_to_weather_key
entity = { "id": "urn:ngsi-ld:WeatherObserved:test:parcel-p1", "type": "WeatherObserved", "municipalityCode": {"type": "Property", "value": "31012"}, "refParcel": {"type": "Relationship", "object": "urn:ngsi-ld:AgriParcel:test:p1"}, }
with patch("timeseries_reader.urn_resolution.fetch_orion_entity", return_value=entity): result, source = _resolve_urn_to_weather_key( tenant_id="test", entity_id="urn:ngsi-ld:WeatherObserved:test:parcel-p1", entity=entity, )
assert result == "31012" assert source == "municipality"
def test_resolve_urn_weather_key_falls_back_without_municipality_code(): """WeatherObserved without municipalityCode should fallback to refParcel chain.""" from timeseries_reader.urn_resolution import _resolve_urn_to_weather_key
entity = { "id": "urn:ngsi-ld:WeatherObserved:test:parcel-p1", "type": "WeatherObserved", "refParcel": {"type": "Relationship", "object": "urn:ngsi-ld:AgriParcel:test:p1"}, }
# Legacy chain will fail (no parcel entity), returning None with patch("timeseries_reader.urn_resolution.fetch_orion_entity") as mock_fetch: mock_fetch.return_value = entity # first call returns the WeatherObserved itself # Second call (for parcel) returns None — chain fails mock_fetch.side_effect = [entity, None] result, source = _resolve_urn_to_weather_key( tenant_id="test", entity_id="urn:ngsi-ld:WeatherObserved:test:parcel-p1", entity=entity, )
# Without municipalityCode AND without resolvable parcel → no_location assert result is None assert source == "no_location"- Step 2: Run test to verify it fails
Run: cd /home/g/Documents/nekazari/nkz && python -m pytest services/tests/test_weather_municipality.py::test_resolve_urn_weather_key_direct_municipality_code -v
Expected: FAIL — _resolve_urn_to_weather_key does not check municipalityCode.
- Step 3: Implement direct
municipalityCoderesolution
Edit nkz/services/timeseries-reader/urn_resolution.py. In _resolve_urn_to_weather_key (line 203), add the direct resolution BEFORE the refParcel chain:
Replace lines 203-215:
if etype_short == "WeatherObserved" or etype.endswith("WeatherObserved"): # Direct resolution: entity carries its own municipality code muni_prop = entity.get("municipalityCode") if muni_prop: muni_val = muni_prop.get("value") if isinstance(muni_prop, dict) else muni_prop if isinstance(muni_val, str) and muni_val.strip(): return (muni_val.strip(), "municipality")
# Fallback: legacy chain via refParcel -> parcel -> address ref_parcel = entity.get("refParcel") if not ref_parcel: return None, "no_location" parcel_urn = ref_parcel.get("object") if isinstance(ref_parcel, dict) else ref_parcel if not parcel_urn: return None, "no_location" parcel_urn = str(parcel_urn).strip() parcel_entity = fetch_orion_entity(tenant_id, parcel_urn) if not parcel_entity: return None, "no_location" res = _parcel_urn_to_municipality_code(tenant_id, parcel_urn, parcel_entity) return (None, "no_location") if res is None else res- Step 4: Run tests to verify they pass
Run: cd /home/g/Documents/nekazari/nkz && python -m pytest services/tests/test_weather_municipality.py -v -k "resolve_urn"
Expected: PASS (both resolution tests)
- Step 5: Commit
cd /home/g/Documents/nekazari/nkzgit add services/timeseries-reader/urn_resolution.py services/tests/test_weather_municipality.pygit commit -m "feat(timeseries-reader): direct municipalityCode resolution for WeatherObserved
O(1) resolution from entity property. Falls back to legacy refParcel chainfor entities not yet migrated."Task 4: Weather Attribute Name Mapping (timeseries-reader)
Section titled “Task 4: Weather Attribute Name Mapping (timeseries-reader)”Files:
-
Modify:
nkz/services/timeseries-reader/app.py:66-72(addwind_direction_degtoVALID_ATTRIBUTES) -
Modify:
nkz/services/timeseries-reader/app.py:66+(add_WEATHER_ATTRIBUTE_MAPand_resolve_weather_attribute) -
Modify:
nkz/services/timeseries-reader/app.py:1173-1176(use resolver instead of direct check) -
Modify:
nkz/services/timeseries-reader/app.py:1031(_weather_query_columnar— use resolved names) -
Test:
nkz/services/tests/test_weather_attribute_map.py -
Step 1: Write failing tests for attribute mapping
Create nkz/services/tests/test_weather_attribute_map.py:
"""Tests for NGSI-LD to DB column weather attribute mapping."""import pytest
def test_resolve_ngsi_ld_name_to_db_column(): """NGSI-LD attribute name should resolve to DB column name.""" from timeseries_reader.app import _resolve_weather_attribute
assert _resolve_weather_attribute("temperature") == "temp_avg" assert _resolve_weather_attribute("relativeHumidity") == "humidity_avg" assert _resolve_weather_attribute("windSpeed") == "wind_speed_ms" assert _resolve_weather_attribute("windDirection") == "wind_direction_deg" assert _resolve_weather_attribute("atmosphericPressure") == "pressure_hpa" assert _resolve_weather_attribute("precipitation") == "precip_mm" assert _resolve_weather_attribute("et0") == "eto_mm" assert _resolve_weather_attribute("solarRadiation") == "solar_rad_w_m2" assert _resolve_weather_attribute("soilMoisture") == "soil_moisture_0_10cm"
def test_resolve_db_column_passthrough(): """DB column names should also resolve (backwards compatibility).""" from timeseries_reader.app import _resolve_weather_attribute
assert _resolve_weather_attribute("temp_avg") == "temp_avg" assert _resolve_weather_attribute("humidity_avg") == "humidity_avg" assert _resolve_weather_attribute("wind_direction_deg") == "wind_direction_deg"
def test_resolve_unknown_attribute_returns_none(): """Unknown attribute names should return None.""" from timeseries_reader.app import _resolve_weather_attribute
assert _resolve_weather_attribute("nonExistent") is None assert _resolve_weather_attribute("") is None assert _resolve_weather_attribute(None) is None
def test_resolve_strips_whitespace(): """Attribute names with whitespace should be stripped.""" from timeseries_reader.app import _resolve_weather_attribute
assert _resolve_weather_attribute(" temperature ") == "temp_avg"
def test_wind_direction_in_valid_attributes(): """wind_direction_deg must be in VALID_ATTRIBUTES.""" from timeseries_reader.app import VALID_ATTRIBUTES
assert "wind_direction_deg" in VALID_ATTRIBUTES- Step 2: Run test to verify it fails
Run: cd /home/g/Documents/nekazari/nkz && python -m pytest services/tests/test_weather_attribute_map.py -v
Expected: FAIL — _resolve_weather_attribute does not exist, wind_direction_deg not in VALID_ATTRIBUTES.
- Step 3: Implement the attribute mapping
Edit nkz/services/timeseries-reader/app.py.
First, add wind_direction_deg to VALID_ATTRIBUTES (after line 71):
VALID_ATTRIBUTES = frozenset({ 'temp_avg', 'temp_min', 'temp_max', 'humidity_avg', 'precip_mm', 'solar_rad_w_m2', 'eto_mm', 'soil_moisture_0_10cm', 'wind_speed_ms', 'pressure_hpa', 'wind_direction_deg',})Then, after VALID_ATTRIBUTES (after line 72), add the mapping dict and resolver:
# NGSI-LD (Smart Data Models) attribute name -> weather_observations DB column_WEATHER_ATTRIBUTE_MAP: Dict[str, str] = { "temperature": "temp_avg", "relativeHumidity": "humidity_avg", "windSpeed": "wind_speed_ms", "windDirection": "wind_direction_deg", "atmosphericPressure": "pressure_hpa", "precipitation": "precip_mm", "et0": "eto_mm", "solarRadiation": "solar_rad_w_m2", "soilMoisture": "soil_moisture_0_10cm",}# DB column names also accepted (passthrough for scripts, direct API)for _col in VALID_ATTRIBUTES: _WEATHER_ATTRIBUTE_MAP.setdefault(_col, _col)
def _resolve_weather_attribute(requested: str) -> Optional[str]: """Map NGSI-LD attribute name or DB column name to weather_observations column.""" r = (requested or "").strip() return _WEATHER_ATTRIBUTE_MAP.get(r)- Step 4: Run test to verify it passes
Run: cd /home/g/Documents/nekazari/nkz && python -m pytest services/tests/test_weather_attribute_map.py -v
Expected: PASS (all 5 tests)
- Step 5: Update the v2 endpoint to use the resolver
Edit nkz/services/timeseries-reader/app.py. Replace lines 1173-1176:
# Before: if attrs_list: for a in attrs_list: if a not in VALID_ATTRIBUTES: return jsonify({"error": f'Invalid attribute: {a}'}), 400
# After: if attrs_list: resolved_attrs = [] for a in attrs_list: resolved = _resolve_weather_attribute(a) if resolved is None: return jsonify({"error": f"Unknown weather attribute: {a}"}), 400 resolved_attrs.append(resolved) attrs_list = resolved_attrs- Step 6: Update
_weather_query_columnarto accept resolved names
Edit nkz/services/timeseries-reader/app.py. In _weather_query_columnar (line 1031), the want line filters against VALID_ATTRIBUTES. Since attrs are already resolved to DB column names by the caller, this still works. But we need to also handle the case where attrs_requested is None (no filter — return all columns). The current line want = [a for a in (attrs_requested or []) if a in VALID_ATTRIBUTES] works correctly because resolved names ARE in VALID_ATTRIBUTES. No change needed here.
- Step 7: Commit
cd /home/g/Documents/nekazari/nkzgit add services/timeseries-reader/app.py services/tests/test_weather_attribute_map.pygit commit -m "feat(timeseries-reader): map NGSI-LD attribute names to DB columns
DataHub sends 'temperature' -> reader resolves to 'temp_avg' -> SQL works.DB column names also accepted (backwards compatible). Adds wind_direction_deg."Task 5: Telemetry Notification Handler — Clean Measurements
Section titled “Task 5: Telemetry Notification Handler — Clean Measurements”Files:
-
Modify:
nkz/services/telemetry-worker/telemetry_worker/notification_handler.py:149-179(_extract_measurements) -
Test:
nkz/services/tests/test_telemetry_measurements.py -
Step 1: Write failing tests for measurement extraction
Create nkz/services/tests/test_telemetry_measurements.py:
"""Tests for telemetry notification handler measurement extraction."""import pytest
def test_extract_measurements_only_scalar_properties(): """Only Property type with scalar values should be extracted.""" from telemetry_worker.notification_handler import _extract_measurements
entity = { "id": "urn:ngsi-ld:AgriSensor:test:device1", "type": "AgriSensor", "@context": ["https://example.com/context.jsonld"], "temperature": {"type": "Property", "value": 23.5}, "humidity": {"type": "Property", "value": 65}, "batteryLevel": {"type": "Property", "value": 88.2}, }
measurements = _extract_measurements(entity) assert measurements == { "temperature": 23.5, "humidity": 65, "batteryLevel": 88.2, }
def test_extract_measurements_skips_relationships(): """Relationships (refDeviceProfile, refDevice, etc.) must be excluded.""" from telemetry_worker.notification_handler import _extract_measurements
entity = { "id": "urn:ngsi-ld:AgriSensor:test:device1", "type": "AgriSensor", "temperature": {"type": "Property", "value": 23.5}, "refDeviceProfile": { "type": "Relationship", "object": "urn:ngsi-ld:DeviceProfile:test:soil-sensor", }, "refDevice": { "type": "Relationship", "object": "urn:ngsi-ld:Device:test:device1", }, }
measurements = _extract_measurements(entity) assert "refDeviceProfile" not in measurements assert "refDevice" not in measurements assert measurements == {"temperature": 23.5}
def test_extract_measurements_skips_geoproperties(): """GeoProperties (location) must be excluded.""" from telemetry_worker.notification_handler import _extract_measurements
entity = { "id": "urn:ngsi-ld:AgriSensor:test:device1", "type": "AgriSensor", "temperature": {"type": "Property", "value": 23.5}, "location": { "type": "GeoProperty", "value": {"type": "Point", "coordinates": [1.0, 42.0]}, }, }
measurements = _extract_measurements(entity) assert "location" not in measurements assert measurements == {"temperature": 23.5}
def test_extract_measurements_skips_metadata_keys(): """Known metadata keys (name, description, controlledProperty, etc.) must be excluded.""" from telemetry_worker.notification_handler import _extract_measurements
entity = { "id": "urn:ngsi-ld:AgriSensor:test:device1", "type": "AgriSensor", "name": {"type": "Property", "value": "Soil Sensor 1"}, "description": {"type": "Property", "value": "Installed in parcel P1"}, "controlledProperty": {"type": "Property", "value": ["temperature", "humidity"]}, "category": {"type": "Property", "value": "sensor"}, "temperature": {"type": "Property", "value": 23.5}, }
measurements = _extract_measurements(entity) assert "name" not in measurements assert "description" not in measurements assert "controlledProperty" not in measurements assert "category" not in measurements assert measurements == {"temperature": 23.5}
def test_extract_measurements_skips_non_scalar_values(): """Properties with dict or list values (nested metadata) must be excluded.""" from telemetry_worker.notification_handler import _extract_measurements
entity = { "id": "urn:ngsi-ld:AgriSensor:test:device1", "type": "AgriSensor", "temperature": {"type": "Property", "value": 23.5}, "address": {"type": "Property", "value": {"streetAddress": "Calle 1", "addressLocality": "Pamplona"}}, "controlledProperty": {"type": "Property", "value": ["temperature"]}, }
measurements = _extract_measurements(entity) assert "address" not in measurements assert "controlledProperty" not in measurements assert measurements == {"temperature": 23.5}
def test_extract_measurements_empty_entity(): """Entity with only system keys should return empty dict.""" from telemetry_worker.notification_handler import _extract_measurements
entity = { "id": "urn:ngsi-ld:AgriSensor:test:device1", "type": "AgriSensor", "name": {"type": "Property", "value": "Empty Sensor"}, "refDeviceProfile": { "type": "Relationship", "object": "urn:ngsi-ld:DeviceProfile:test:basic", }, }
measurements = _extract_measurements(entity) assert measurements == {}- Step 2: Run tests to verify they fail
Run: cd /home/g/Documents/nekazari/nkz && python -m pytest services/tests/test_telemetry_measurements.py -v
Expected: FAIL — current _extract_measurements includes Relationships, GeoProperties, and metadata keys.
- Step 3: Implement the cleaned measurement extraction
Edit nkz/services/telemetry-worker/telemetry_worker/notification_handler.py. Replace lines 149-179:
_ENTITY_METADATA_KEYS = frozenset({ "id", "type", "@context", "location", # NGSI-LD system / metadata "name", "description", "dateCreated", "dateModified", "observedAt", "controlledProperty", "category", "source", "provider", "seeAlso", "ownedBy", "address", # Relationships (not measurements) "refDeviceProfile", "refDevice", "refAgriParcel", "refParcel", "refWeatherStation",})
def _extract_measurements(entity: Dict[str, Any]) -> Dict[str, Any]: """ Extract measurement values from NGSI-LD entity attributes.
Only extracts Property-type attributes with scalar values. Skips Relationships, GeoProperties, metadata keys, and non-scalar values. """ measurements = {}
for key, attr in entity.items(): if key in _ENTITY_METADATA_KEYS: continue
if isinstance(attr, dict): attr_type = attr.get("type") if attr_type == "Property": val = attr.get("value") if val is not None and not isinstance(val, (dict, list)): measurements[key] = val # GeoProperty and Relationship: skip (not measurements)
return measurements- Step 4: Run tests to verify they pass
Run: cd /home/g/Documents/nekazari/nkz && python -m pytest services/tests/test_telemetry_measurements.py -v
Expected: PASS (all 6 tests)
- Step 5: Commit
cd /home/g/Documents/nekazari/nkzgit add services/telemetry-worker/telemetry_worker/notification_handler.py services/tests/test_telemetry_measurements.pygit commit -m "fix(telemetry-worker): only extract scalar Property measurements
Skip Relationships (refDeviceProfile, refDevice), GeoProperties (location),metadata keys (name, description), and non-scalar values (dicts, lists).Fixes telemetry_events storing metadata instead of sensor readings."Task 6: Subscription Manager — Fix Port Default
Section titled “Task 6: Subscription Manager — Fix Port Default”Files:
-
Modify:
nkz/services/telemetry-worker/telemetry_worker/subscription_manager.py:11 -
Test:
nkz/services/tests/test_subscription_manager.py -
Step 1: Write failing test for port default
Create nkz/services/tests/test_subscription_manager.py:
"""Tests for telemetry-worker subscription manager."""import osfrom unittest.mock import patch, MagicMock
import pytest
def test_default_notification_url_uses_port_80(): """Default notification URL must use port 80 (K8s Service port), not 8080.""" # Unset SERVICE_PORT to test the default env = {k: v for k, v in os.environ.items() if k != "SERVICE_PORT"} with patch.dict(os.environ, env, clear=True): # Force reimport to pick up new default import importlib import telemetry_worker.subscription_manager as sm importlib.reload(sm) assert ":80/" in sm.NOTIFICATION_URL assert ":8080" not in sm.NOTIFICATION_URL- Step 2: Run test to verify it fails
Run: cd /home/g/Documents/nekazari/nkz && python -m pytest services/tests/test_subscription_manager.py::test_default_notification_url_uses_port_80 -v
Expected: FAIL — default is “8080”.
- Step 3: Fix port default
Edit nkz/services/telemetry-worker/telemetry_worker/subscription_manager.py line 11:
SERVICE_PORT = os.getenv("SERVICE_PORT", "80")- Step 4: Run test to verify it passes
Run: cd /home/g/Documents/nekazari/nkz && python -m pytest services/tests/test_subscription_manager.py::test_default_notification_url_uses_port_80 -v
Expected: PASS
- Step 5: Commit
cd /home/g/Documents/nekazari/nkzgit add services/telemetry-worker/telemetry_worker/subscription_manager.py services/tests/test_subscription_manager.pygit commit -m "fix(telemetry-worker): default SERVICE_PORT to 80 (K8s Service port)
K8s Service exposes port 80 -> targetPort 8080. Subscriptions were using8080 directly, causing notification delivery failures."Task 7: Subscription Manager — Multi-Tenant + Cleanup + Periodic Check
Section titled “Task 7: Subscription Manager — Multi-Tenant + Cleanup + Periodic Check”Files:
-
Modify:
nkz/services/telemetry-worker/telemetry_worker/subscription_manager.py -
Modify:
nkz/services/telemetry-worker/app.py -
Test:
nkz/services/tests/test_subscription_manager.py(extend) -
Step 1: Write failing tests for multi-tenant and cleanup
Append to nkz/services/tests/test_subscription_manager.py:
def test_cleanup_broken_subscriptions_deletes_port_8080(): """Subscriptions with :8080 in URI should be deleted on startup.""" import importlib import telemetry_worker.subscription_manager as sm importlib.reload(sm)
existing_subs = [ { "id": "urn:ngsi-ld:Subscription:broken1", "notification": { "endpoint": {"uri": "http://telemetry-worker-service:8080/notify"} }, }, { "id": "urn:ngsi-ld:Subscription:good1", "notification": { "endpoint": {"uri": "http://telemetry-worker-service:80/notify"} }, }, ]
mock_get = MagicMock() mock_get.status_code = 200 mock_get.json.return_value = existing_subs
mock_delete = MagicMock() mock_delete.status_code = 204
with patch("telemetry_worker.subscription_manager.requests.get", return_value=mock_get), \ patch("telemetry_worker.subscription_manager.requests.delete", return_value=mock_delete) as mock_del: sm._cleanup_broken_subscriptions("test-tenant")
# Should only delete the broken subscription mock_del.assert_called_once() call_url = mock_del.call_args[0][0] assert "broken1" in call_url
def test_get_active_tenants_queries_postgres(): """_get_active_tenants should query PostgreSQL for distinct tenant_ids.""" import importlib import telemetry_worker.subscription_manager as sm importlib.reload(sm)
mock_conn = MagicMock() mock_cursor = MagicMock() mock_conn.cursor.return_value = mock_cursor mock_cursor.fetchall.return_value = [("tenant-a",), ("tenant-b",)]
with patch("telemetry_worker.subscription_manager.psycopg2.connect", return_value=mock_conn): tenants = sm._get_active_tenants()
assert tenants == ["tenant-a", "tenant-b"] mock_cursor.execute.assert_called_once()
def test_ensure_subscriptions_for_all_tenants(): """Should create subscriptions for every active tenant.""" import importlib import telemetry_worker.subscription_manager as sm importlib.reload(sm)
with patch.object(sm, "_get_active_tenants", return_value=["tenant-a", "tenant-b"]), \ patch.object(sm, "_cleanup_broken_subscriptions") as mock_cleanup, \ patch.object(sm, "_ensure_tenant_subscriptions") as mock_ensure: sm.ensure_subscriptions_for_all_tenants()
assert mock_cleanup.call_count == 2 assert mock_ensure.call_count == 2 mock_ensure.assert_any_call("tenant-a") mock_ensure.assert_any_call("tenant-b")- Step 2: Run tests to verify they fail
Run: cd /home/g/Documents/nekazari/nkz && python -m pytest services/tests/test_subscription_manager.py -v
Expected: FAIL — _cleanup_broken_subscriptions, _get_active_tenants, ensure_subscriptions_for_all_tenants don’t exist.
- Step 3: Implement multi-tenant subscription manager
Edit nkz/services/telemetry-worker/telemetry_worker/subscription_manager.py. Replace the full file:
import osimport loggingimport requestsimport psycopg2
from tenacity import retry, stop_after_attempt, wait_fixed
logger = logging.getLogger(__name__)
ORION_URL = os.getenv("ORION_URL", "http://orion-ld-service:1026")SERVICE_HOST = os.getenv("SERVICE_HOST", "telemetry-worker-service")SERVICE_PORT = os.getenv("SERVICE_PORT", "80")NOTIFICATION_URL = f"http://{SERVICE_HOST}:{SERVICE_PORT}/notify"CONTEXT_URL = os.getenv("CONTEXT_URL", "http://api-gateway-service:5000/ngsi-ld-context.json")POSTGRES_URL = os.getenv("POSTGRES_URL", "")DEFAULT_TENANT = os.getenv("DEFAULT_TENANT", "platform")
# NGSI-LD subscriptions — no watchedAttributes = trigger on ANY attribute changeSUBSCRIPTIONS = [ { "description": "Telemetry Worker - AgriSensor updates", "type": "Subscription", "entities": [{"type": "AgriSensor"}], "notification": { "endpoint": { "uri": NOTIFICATION_URL, "accept": "application/json" }, "format": "normalized" }, "throttling": 30, "isActive": True }, { "description": "Telemetry Worker - Device updates", "type": "Subscription", "entities": [{"type": "Device"}], "notification": { "endpoint": { "uri": NOTIFICATION_URL, "accept": "application/json" }, "format": "normalized" }, "throttling": 30, "isActive": True }, { "description": "Telemetry Worker - AgriParcel updates", "type": "Subscription", "entities": [{"type": "AgriParcel"}], "notification": { "endpoint": { "uri": NOTIFICATION_URL, "accept": "application/json" }, "format": "normalized" }, "throttling": 30, "isActive": True }, { "description": "Telemetry Worker - VegetationIndex analysis results", "type": "Subscription", "entities": [{"type": "VegetationIndex"}], "notification": { "endpoint": { "uri": NOTIFICATION_URL, "accept": "application/json" }, "format": "normalized" }, "throttling": 5, "isActive": True },]
def _get_headers(tenant: str) -> dict: """Standard NGSI-LD headers with tenant and @context Link.""" return { "Content-Type": "application/json", "NGSILD-Tenant": tenant, "Link": f'<{CONTEXT_URL}>; rel="http://www.w3.org/ns/json-ld#context"; type="application/ld+json"', }
def _get_active_tenants() -> list: """Query PostgreSQL for all active tenant IDs.""" if not POSTGRES_URL: logger.warning("POSTGRES_URL not set, cannot query tenants") return [] try: conn = psycopg2.connect(POSTGRES_URL) try: cur = conn.cursor() cur.execute("SELECT DISTINCT tenant_id FROM tenants WHERE tenant_id IS NOT NULL") rows = cur.fetchall() cur.close() return [r[0] for r in rows] finally: conn.close() except Exception as e: logger.error(f"Error querying active tenants: {e}") return []
def _cleanup_broken_subscriptions(tenant_id: str): """Delete subscriptions with wrong port (legacy bug).""" headers = _get_headers(tenant_id) try: r = requests.get(f"{ORION_URL}/ngsi-ld/v1/subscriptions", headers=headers) if r.status_code != 200: return for sub in r.json(): uri = sub.get("notification", {}).get("endpoint", {}).get("uri", "") if ":8080" in uri and "telemetry-worker" in uri: sub_id = sub.get("id") requests.delete( f"{ORION_URL}/ngsi-ld/v1/subscriptions/{sub_id}", headers=headers, ) logger.info(f"Deleted broken subscription {sub_id} (port 8080)") except Exception as e: logger.warning(f"Error cleaning broken subscriptions for {tenant_id}: {e}")
def _ensure_tenant_subscriptions(tenant_id: str): """Create missing NGSI-LD subscriptions for a single tenant.""" headers = _get_headers(tenant_id) try: response = requests.get( f"{ORION_URL}/ngsi-ld/v1/subscriptions", headers=headers, ) response.raise_for_status() existing_subs = response.json() existing_descriptions = [ sub.get("description") for sub in existing_subs ] if existing_subs else []
for sub in SUBSCRIPTIONS: if sub["description"] in existing_descriptions: logger.debug(f"Subscription '{sub['description']}' exists for tenant {tenant_id}") else: logger.info(f"Creating subscription '{sub['description']}' for tenant {tenant_id}") res = requests.post( f"{ORION_URL}/ngsi-ld/v1/subscriptions", json=sub, headers=headers, ) if res.status_code in [200, 201]: logger.info(f"Created: {sub['description']} for {tenant_id}") else: logger.error( f"Failed: {sub['description']} for {tenant_id}: " f"{res.status_code} {res.text}" ) except Exception as e: logger.error(f"Error managing subscriptions for {tenant_id}: {e}")
@retry(stop=stop_after_attempt(5), wait=wait_fixed(5))def ensure_subscriptions_for_all_tenants(): """Create NGSI-LD subscriptions for all active tenants.""" tenants = _get_active_tenants() if not tenants: tenants = [DEFAULT_TENANT] logger.info(f"No tenants from DB, using default: {DEFAULT_TENANT}")
logger.info(f"Ensuring subscriptions for {len(tenants)} tenants: {tenants}")
for tenant_id in tenants: _cleanup_broken_subscriptions(tenant_id) _ensure_tenant_subscriptions(tenant_id)
# Keep backwards compat alias for app.py importcheck_or_create_subscription = ensure_subscriptions_for_all_tenants- Step 4: Run tests to verify they pass
Run: cd /home/g/Documents/nekazari/nkz && python -m pytest services/tests/test_subscription_manager.py -v
Expected: PASS (all 4 tests)
- Step 5: Add periodic subscription check to
app.py
Edit nkz/services/telemetry-worker/app.py. Update the import (line 15):
from telemetry_worker.subscription_manager import ensure_subscriptions_for_all_tenantsReplace the lifespan function:
@asynccontextmanagerasync def lifespan(app: FastAPI): # Startup logger.info("Telemetry Worker starting up...")
settings = Settings()
# Initialize async connection pool (shared between sink and profiles) sink = PostgreSQLSink( dsn=settings.postgres_url, min_pool=5, max_pool=20, ) await sink.start()
# ProfileService gets the same pool for async DB queries profile_service = ProfileService(settings, pool=sink._pool)
# Wire dependencies into notification handler init_handler(settings, profile_service, sink)
# Check/create NGSI-LD subscriptions for all tenants (sync, run in executor) try: loop = asyncio.get_event_loop() await loop.run_in_executor(None, ensure_subscriptions_for_all_tenants) except Exception as e: logger.warning(f"Auto-subscription failed (non-fatal): {e}")
# Periodic subscription self-healing (every 60 minutes) async def _periodic_subscription_check(): while True: await asyncio.sleep(3600) try: await asyncio.get_event_loop().run_in_executor( None, ensure_subscriptions_for_all_tenants ) logger.info("Periodic subscription check completed") except Exception as e: logger.warning(f"Periodic subscription check failed: {e}")
periodic_task = asyncio.create_task(_periodic_subscription_check())
yield
# Shutdown: cancel periodic task and close pool periodic_task.cancel() await sink.stop() logger.info("Telemetry Worker shut down.")- Step 6: Run all subscription tests
Run: cd /home/g/Documents/nekazari/nkz && python -m pytest services/tests/test_subscription_manager.py -v
Expected: PASS
- Step 7: Commit
cd /home/g/Documents/nekazari/nkzgit add services/telemetry-worker/telemetry_worker/subscription_manager.py services/telemetry-worker/app.py services/tests/test_subscription_manager.pygit commit -m "feat(telemetry-worker): multi-tenant subscriptions + cleanup + periodic check
- Query PostgreSQL for active tenants (fallback to DEFAULT_TENANT)- Delete broken subscriptions with port 8080 on startup- Periodic self-healing check every 60 minutes- Backwards compatible: check_or_create_subscription alias preserved"Task 8: DataHub — Add municipalityCode to System Keys
Section titled “Task 8: DataHub — Add municipalityCode to System Keys”Files:
-
Modify:
nkz-module-datahub/backend/app/api/entities.py:41-45 -
Step 1: Add
municipalityCodeto_NGSI_SYSTEM_KEYS
Edit nkz-module-datahub/backend/app/api/entities.py line 41-45. Add "municipalityCode" to the frozenset:
_NGSI_SYSTEM_KEYS = frozenset({ "id", "type", "@context", "location", "name", "description", "address", "source", "provider", "dateCreated", "dateModified", "refAgriParcel", "refDevice", "refWeatherStation", "municipalityCode",})This prevents municipalityCode from appearing as a selectable timeseries attribute in the DataHub entity tree (it’s metadata, not a measurement).
- Step 2: Commit
cd /home/g/Documents/nekazari/nkz-module-datahubgit add backend/app/api/entities.pygit commit -m "fix(datahub): hide municipalityCode from entity attribute list
municipalityCode is self-describing metadata for timeseries resolution,not a user-facing measurement attribute."Task 9: K8s Deployment — New telemetry-worker Manifest
Section titled “Task 9: K8s Deployment — New telemetry-worker Manifest”Files:
-
Create:
nkz/k8s/core/services/telemetry-worker-deployment.yaml -
Step 1: Create the new deployment manifest
Create nkz/k8s/core/services/telemetry-worker-deployment.yaml:
apiVersion: apps/v1kind: Deploymentmetadata: name: telemetry-worker namespace: nekazari labels: app: telemetry-worker layer: backendspec: replicas: 1 strategy: type: Recreate selector: matchLabels: app: telemetry-worker template: metadata: labels: app: telemetry-worker layer: backend spec: containers: - name: telemetry-worker image: ghcr.io/nkz-os/nkz/telemetry-worker:latest imagePullPolicy: IfNotPresent ports: - containerPort: 8080 env: - name: ORION_URL value: "http://orion-ld-service:1026" - name: SERVICE_HOST value: "telemetry-worker-service" - name: SERVICE_PORT value: "80" - name: CONTEXT_URL value: "http://api-gateway-service:5000/ngsi-ld-context.json" - name: POSTGRES_URL valueFrom: secretKeyRef: name: timescale-secret key: connection-string - name: LOG_LEVEL value: "INFO" resources: requests: memory: "128Mi" cpu: "50m" limits: memory: "512Mi" cpu: "500m" livenessProbe: httpGet: path: /health port: 8080 initialDelaySeconds: 30 periodSeconds: 30 readinessProbe: httpGet: path: /health port: 8080 initialDelaySeconds: 10 periodSeconds: 5---apiVersion: v1kind: Servicemetadata: name: telemetry-worker-service namespace: nekazari labels: app: telemetry-workerspec: selector: app: telemetry-worker ports: - port: 80 targetPort: 8080 type: ClusterIPKey differences from archived version:
-
image:ghcr.io/nkz-os/nkz/telemetry-worker:latest(new org) -
NO
imagePullSecrets(public GHCR) -
SERVICE_PORT: "80"(fixed) -
CONTEXT_URLadded (was missing) -
optional: trueremoved fromPOSTGRES_URLsecretKeyRef (it’s required) -
Step 2: Dry-run validation
Run: cd /home/g/Documents/nekazari/nkz && sudo kubectl apply -f k8s/core/services/telemetry-worker-deployment.yaml --dry-run=client
Expected: deployment.apps/telemetry-worker configured (dry run) and service/telemetry-worker-service configured (dry run)
- Step 3: Commit
cd /home/g/Documents/nekazari/nkzgit add k8s/core/services/telemetry-worker-deployment.yamlgit commit -m "feat(k8s): add telemetry-worker deployment to core services
Replaces archived deployment. Fixes: correct GHCR org (nkz-os),no imagePullSecrets (public), SERVICE_PORT=80, CONTEXT_URL added."Task 10: Integration Verification
Section titled “Task 10: Integration Verification”After deploying all changes, verify the full pipeline end-to-end.
- Step 1: Rebuild and push Docker images
# telemetry-workercd /home/g/Documents/nekazari/nkzsudo docker build --network=host --no-cache -t ghcr.io/nkz-os/nkz/telemetry-worker:latest services/telemetry-worker/sudo docker push ghcr.io/nkz-os/nkz/telemetry-worker:latest
# weather-workersudo docker build --network=host --no-cache -t ghcr.io/nkz-os/nkz/weather-worker:latest services/weather-worker/sudo docker push ghcr.io/nkz-os/nkz/weather-worker:latest
# timeseries-readersudo docker build --network=host --no-cache -t ghcr.io/nkz-os/nkz/timeseries-reader:latest services/timeseries-reader/sudo docker push ghcr.io/nkz-os/nkz/timeseries-reader:latest- Step 2: Deploy telemetry-worker
sudo kubectl apply -f k8s/core/services/telemetry-worker-deployment.yamlsudo kubectl rollout status deployment/telemetry-worker -n nekazari- Step 3: Restart weather-worker and timeseries-reader
sudo kubectl rollout restart deployment/weather-worker -n nekazarisudo kubectl rollout restart deployment/timeseries-reader -n nekazari- Step 4: Verify subscriptions
# Check telemetry-worker logs for subscription creationsudo kubectl logs deployment/telemetry-worker -n nekazari --tail=50 | grep -i subscription
# Verify subscriptions exist in Orion for tenant asociacinallotarrasudo kubectl exec deployment/telemetry-worker -n nekazari -- python3 -c "import requestsheaders = {'NGSILD-Tenant': 'asociacinallotarra'}r = requests.get('http://orion-ld-service:1026/ngsi-ld/v1/subscriptions', headers=headers)for s in r.json(): print(s.get('description'), '->', s.get('notification',{}).get('endpoint',{}).get('uri'))"Expected: Subscriptions using port 80 for all entity types.
- Step 5: Run migration — PATCH
municipalityCodeonto existing WeatherObserved entities
# One-time script: uses tenant_weather_locations as source of truthsudo kubectl exec deployment/weather-worker -n nekazari -- python3 -c "import os, requests, psycopg2ORION = os.getenv('ORION_URL', 'http://orion-ld-service:1026')PG = os.getenv('POSTGRES_URL', '')conn = psycopg2.connect(PG)cur = conn.cursor()cur.execute('SELECT tenant_id, municipality_code, latitude, longitude FROM tenant_weather_locations')for tenant_id, muni_code, lat, lon in cur.fetchall(): headers = { 'NGSILD-Tenant': tenant_id, 'Content-Type': 'application/json', 'Link': '<http://api-gateway-service:5000/ngsi-ld-context.json>; rel=\"http://www.w3.org/ns/json-ld#context\"; type=\"application/ld+json\"', } r = requests.get( f'{ORION}/ngsi-ld/v1/entities?type=WeatherObserved&limit=100', headers=headers, ) if r.status_code != 200: print(f'Skip {tenant_id}: {r.status_code}') continue for ent in r.json(): if ent.get('municipalityCode'): continue eid = ent['id'] patch = {'municipalityCode': {'type': 'Property', 'value': muni_code}} pr = requests.patch( f'{ORION}/ngsi-ld/v1/entities/{eid}/attrs', json=patch, headers=headers, ) print(f'PATCH {eid} municipalityCode={muni_code}: {pr.status_code}')cur.close()conn.close()"- Step 6: Verify weather data in DataHub
Open DataHub module in browser. Select a WeatherObserved entity. Verify timeseries charts display temperature, humidity, wind speed data.
- Step 7: Verify telemetry data in DataHub
Check telemetry-worker logs for notification processing:
sudo kubectl logs deployment/telemetry-worker -n nekazari --tail=20 | grep -i "measurement\|notification\|processing"If an MQTT device sends data, verify telemetry_events now contains numeric measurements (not metadata).