homie dsmr first try

This commit is contained in:
Ard Kuijpers
2020-06-03 21:31:32 +02:00
parent ccf189662d
commit 997ffea4b8
43 changed files with 314 additions and 3226 deletions

View File

@@ -1 +0,0 @@
__version__ = "0.1.0"

44
app.py Normal file
View File

@@ -0,0 +1,44 @@
import os
from dsmr_parser import telegram_specifications
from dsmr_parser.clients import SerialReader, SERIAL_SETTINGS_V2_2, SERIAL_SETTINGS_V4, SERIAL_SETTINGS_V5
from app_settings import Settings
from device_dsmr import Device_DSMR
from serial import SerialException
import logging
logger = logging.getLogger(__name__)
def main():
logging.basicConfig(format='%(asctime)s (%(levelname)s) [%(name)s] %(message)s', level=logging.INFO)
cfg = Settings()
protocol_version: str = str(cfg.dsmr_protocol)
dev = str(cfg.dsmr_port)
if 'V2' in protocol_version:
devsettings = SERIAL_SETTINGS_V2_2
spec = telegram_specifications.V2_2
elif 'V3' in protocol_version:
devsettings = SERIAL_SETTINGS_V4
spec = telegram_specifications.V3
elif 'V4' in protocol_version:
devsettings = SERIAL_SETTINGS_V4
spec = telegram_specifications.V4
else:
devsettings = SERIAL_SETTINGS_V5
spec = telegram_specifications.V5
device = Device_DSMR("dsmr", name="Digitale Slimme Meter")
serial_reader = SerialReader(dev, devsettings, spec)
while True:
try:
telegram = next(serial_reader.read_as_object())
except SerialException as serial_exc:
logger.warning("Serial exception", exc_info=serial_exc)
for telegram in serial_reader.read_as_object():
device.update(telegram)
if __name__ == "__main__":
main()

29
app_settings.py Normal file
View File

@@ -0,0 +1,29 @@
from pydantic import BaseSettings, Field
import homie
VERSION = "0.1.0"
class Settings(BaseSettings):
"""Application settings for the DSMR MQTT bridge."""
loglevel: str = Field('INFO', env='LOGLEVEL')
mqtt_host: str = Field(None, env='MQTT_HOST')
mqtt_port: int = Field(1883, env='MQTT_PORT')
mqtt_username: str = Field(None, env='MQTT_USERNAME')
mqtt_password: str = Field(None, env='MQTT_PASSWORD')
dsmr_port: str = Field('/dev/ttyUSB0', env='DSMR_PORT')
dsmr_protocol: str = Field('V5', env='DSMR_PROTOCOL')
homie_update_interval: int = 60
homie_topic: str = Field('homie', env='HOMIE_TOPIC')
homie_implementation: str \
= f"DSMR Homie {VERSION} Homie 4 Version {homie.__version__}"
homie_fw_name: str = "DSMR"
homie_fw_version: str = VERSION
class Config:
"""Where to find the environment file containing the settings."""
env_file = '.env'

View File

@@ -1,12 +1,16 @@
import logging
import datetime
from homie.device_base import Device_Base from homie.device_base import Device_Base
from homie.node.node_base import Node_Base from homie.node.node_base import Node_Base
from homie.node.property.property_datetime import Property_DateTime
from dsmr_parser import telegram_specifications from dsmr_parser.objects import Telegram
from dsmr_parser.clients import SerialReader, SERIAL_SETTINGS_V4
from settings import Settings from node.node_electricitymeter import Node_ElectricityMeter
from node.node_gasmeter import Node_GasMeter
from app_settings import Settings
import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -17,7 +21,7 @@ TRANSLATED_MQTT_SETTINGS = {
'MQTT_PORT': SETTINGS.mqtt_port, 'MQTT_PORT': SETTINGS.mqtt_port,
'MQTT_USERNAME' : SETTINGS.mqtt_username, 'MQTT_USERNAME' : SETTINGS.mqtt_username,
'MQTT_PASSWORD' : SETTINGS.mqtt_password, 'MQTT_PASSWORD' : SETTINGS.mqtt_password,
'MQTT_CLIENT_ID' : SETTINGS.hostname, 'MQTT_CLIENT_ID' : "dsmr2mqtt",
'MQTT_SHARE_CLIENT': False, 'MQTT_SHARE_CLIENT': False,
} }
@@ -29,32 +33,22 @@ TRANSLATED_HOMIE_SETTINGS = {
} }
class Device_DSMR(Device_Base): class Device_DSMR(Device_Base):
def __init__(self, device_id=None, name=None, homie_settings=TRANSLATED_HOMIE_SETTINSG, mqtt_settings=TRANSLATED_MQTT_SETTINGS): def __init__(self, device_id=None, name=None, homie_settings=TRANSLATED_HOMIE_SETTINGS, mqtt_settings=TRANSLATED_MQTT_SETTINGS):
super().__init__(device_id, name, homie_settings, mqtt_settings) super().__init__(device_id, name, homie_settings, mqtt_settings)
node = Node_Base(self, "gasmeter", "Gasmeter", "status") self.dsmrdevice = Node_Base(self, 'dsmrdevice', 'DSMR device', 'status')
self.add_node(node) self.add_node(self.dsmrdevice)
self.last_update = Property_DateTime(self.dsmrdevice, id="timestamp", name="Timestamp", settable=False)
self.dsmrdevice.add_property(self.last_update)
self.electricitymeter = Node_ElectricityMeter(self)
self.add_node(self.electricitymeter)
self.gasmeter = Node_GasMeter(self)
self.add_node(self.gasmeter)
self.start() self.start()
def update(self, telegram: Telegram):
def register_status_properties(self, node): self.last_update.value = telegram.P1_MESSAGE_TIMESTAMP.value.strftime("%Y-%m-%dT%H:%M:%S.%f")
super(Device_Temperature_Humidity_Battery, self).register_status_properties( self.electricitymeter.update(telegram)
node self.gasmeter.update(telegram)
)
self.battery = Property_Battery(node)
node.add_property(self.battery)
def update_battery(self, battery):
logger.info("Updated Battery {}".format(battery))
self.battery.value = battery

22
dsmr_parser_test.py Normal file
View File

@@ -0,0 +1,22 @@
# DSMR v4.2 p1 using dsmr_parser and telegram objects
from dsmr_parser import telegram_specifications
from dsmr_parser.clients import SerialReader, SERIAL_SETTINGS_V5
from dsmr_parser.objects import CosemObject, MBusObject, Telegram
from dsmr_parser.parsers import TelegramParser
import os
import logging
serial_reader = SerialReader(
device='/dev/ttyUSB0',
serial_settings=SERIAL_SETTINGS_V5,
telegram_specification=telegram_specifications.V5
)
# telegram = next(serial_reader.read_as_object())
# print(telegram)
for telegram in serial_reader.read_as_object():
os.system('clear')
print(telegram)

View File

@@ -1,2 +0,0 @@
[run]
branch = True

View File

@@ -1,14 +0,0 @@
.idea
*.pyc
.tox
.cache
.venv
*.egg-info
/.project
/.pydevproject
/.coverage
build/
dist/
venv/
*.*~
*~

View File

@@ -1,18 +0,0 @@
language: python
python:
- 2.7
- 3.4
- 3.5
- 3.6
install: pip install tox-travis codecov
script: tox
after_success:
- codecov
matrix:
allow_failures:
- python: 2.7

View File

@@ -1,109 +0,0 @@
Change Log
----------
**0.21** (2020-05-25)
- All objects can produce a json serialization of their state.
**0.20** (2020-05-12)
- All objects can now print their values
- Add parser + object for generic profile
**0.19** (2020-05-03)
- Add following missing elements to telegram specification v4:
- SHORT_POWER_FAILURE_COUNT,
- INSTANTANEOUS_CURRENT_L1,
- INSTANTANEOUS_CURRENT_L2,
- INSTANTANEOUS_CURRENT_L3
- Add missing tests + fix small test bugs
- Complete telegram object v4 parse test
**0.18** (2020-01-28)
- PyCRC replacement (`pull request #48 <https://github.com/ndokter/dsmr_parser/pull/48>`_).
**0.17** (2019-12-21)
- Add a true telegram object (`pull request #40 <https://github.com/ndokter/dsmr_parser/pull/40>`_).
**0.16** (2019-12-21)
- Add support for Belgian and Smarty meters (`pull request #44 <https://github.com/ndokter/dsmr_parser/pull/44>`_).
**0.15** (2019-12-12)
- Fixed asyncio loop issue (`pull request #43 <https://github.com/ndokter/dsmr_parser/pull/43>`_).
**0.14** (2019-10-08)
- Changed serial reading to reduce CPU usage (`pull request #37 <https://github.com/ndokter/dsmr_parser/pull/37>`_).
**0.13** (2019-03-04)
- Fix DSMR v5.0 serial settings which were not used (`pull request #33 <https://github.com/ndokter/dsmr_parser/pull/33>`_).
**0.12** (2018-09-23)
- Add serial settings for DSMR v5.0 (`pull request #31 <https://github.com/ndokter/dsmr_parser/pull/31>`_).
- Lux-creos-obis-1.8.0 (`pull request #32 <https://github.com/ndokter/dsmr_parser/pull/32>`_).
**0.11** (2017-09-18)
- NULL value fix in checksum (`pull request #26 <https://github.com/ndokter/dsmr_parser/pull/26>`_)
**0.10** (2017-06-05)
- bugfix: don't force full telegram signatures (`pull request #25 <https://github.com/ndokter/dsmr_parser/pull/25>`_)
- removed unused code for automatic telegram detection as this needs reworking after the fix mentioned above
- InvalidChecksumError's are logged as warning instead of error
**0.9** (2017-05-12)
- added DSMR v5 serial settings
**0.8** (2017-01-26)
- added support for DSMR v3
- added support for DSMR v5
**IMPORTANT: this release has the following backwards incompatible changes:**
- Removed TelegramParserV2_2 in favor of TelegramParser
- Removed TelegramParserV4 in favor of TelegramParser
**0.7** (2017-01-14)
- Internal refactoring related to the way clients feed their data into the parse module. Clients can now supply the telegram data in single characters, lines (which was common) or complete telegram strings. (`pull request #17 <https://github.com/ndokter/dsmr_parser/pull/17>`_)
**IMPORTANT: this release has the following backwards incompatible changes:**
- Client related imports from dsmr_parser.serial and dsmr_parser.protocol have been moved to dsmr_parser.clients (import these from the clients/__init__.py module)
- The .parse() method of TelegramParser, TelegramParserV2_2, TelegramParserV4 now accepts a string containing the entire telegram (including \r\n characters) and not a list
**0.6** (2017-01-04)
- Fixed bug in CRC checksum verification for the asyncio client (`pull request #15 <https://github.com/ndokter/dsmr_parser/pull/15>`_)
- Support added for TCP connections using the asyncio client (`pull request #12 <https://github.com/ndokter/dsmr_parser/pull/12/>`_)
**0.5** (2016-12-29)
- CRC checksum verification for DSMR v4 telegrams (`issue #10 <https://github.com/ndokter/dsmr_parser/issues/10>`_)
**0.4** (2016-11-21)
- DSMR v2.2 serial settings now uses parity serial.EVEN by default (`pull request #5 <https://github.com/ndokter/dsmr_parser/pull/5>`_)
- improved asyncio reader and improve it's error handling (`pull request #8 <https://github.com/ndokter/dsmr_parser/pull/8>`_)
**0.3** (2016-11-12)
- asyncio reader for non-blocking reads (`pull request #3 <https://github.com/ndokter/dsmr_parser/pull/3>`_)
**0.2** (2016-11-08)
- support for DMSR version 2.2 (`pull request #2 <https://github.com/ndokter/dsmr_parser/pull/2>`_)
**0.1** (2016-08-22)
- initial version with a serial reader and support for DSMR version 4.x

View File

@@ -1,21 +0,0 @@
The MIT License
Copyright (c) 2016 Nigel Dokter http://nldr.net
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View File

@@ -1,266 +0,0 @@
DSMR Parser
===========
.. image:: https://img.shields.io/pypi/v/dsmr-parser.svg
:target: https://pypi.python.org/pypi/dsmr-parser
.. image:: https://travis-ci.org/ndokter/dsmr_parser.svg?branch=master
:target: https://travis-ci.org/ndokter/dsmr_parser
A library for parsing Dutch Smart Meter Requirements (DSMR) telegram data. It
also includes client implementation to directly read and parse smart meter data.
Features
--------
DSMR Parser supports DSMR versions 2, 3, 4 and 5. It has been tested with Python 3.4, 3.5 and 3.6.
Client module usage
-------------------
**Serial client**
Read the serial port and work with the parsed telegrams. It should be run in a separate
process because the code is blocking (not asynchronous):
.. code-block:: python
from dsmr_parser import telegram_specifications
from dsmr_parser.clients import SerialReader, SERIAL_SETTINGS_V4
serial_reader = SerialReader(
device='/dev/ttyUSB0',
serial_settings=SERIAL_SETTINGS_V4,
telegram_specification=telegram_specifications.V4
)
for telegram in serial_reader.read():
print(telegram) # see 'Telegram object' docs below
**AsyncIO client**
To be documented.
Parsing module usage
--------------------
The parsing module accepts complete unaltered telegram strings and parses these
into a dictionary.
.. code-block:: python
from dsmr_parser import telegram_specifications
from dsmr_parser.parsers import TelegramParser
# String is formatted in separate lines for readability.
telegram_str = (
'/ISk5\2MT382-1000\r\n'
'\r\n'
'0-0:96.1.1(4B384547303034303436333935353037)\r\n'
'1-0:1.8.1(12345.678*kWh)\r\n'
'1-0:1.8.2(12345.678*kWh)\r\n'
'1-0:2.8.1(12345.678*kWh)\r\n'
'1-0:2.8.2(12345.678*kWh)\r\n'
'0-0:96.14.0(0002)\r\n'
'1-0:1.7.0(001.19*kW)\r\n'
'1-0:2.7.0(000.00*kW)\r\n'
'0-0:17.0.0(016*A)\r\n'
'0-0:96.3.10(1)\r\n'
'0-0:96.13.1(303132333435363738)\r\n'
'0-0:96.13.0(303132333435363738393A3B3C3D3E3F303132333435363738393A3B3C3D3E'
'3F303132333435363738393A3B3C3D3E3F303132333435363738393A3B3C3D3E3F30313233'
'3435363738393A3B3C3D3E3F)\r\n'
'0-1:96.1.0(3232323241424344313233343536373839)\r\n'
'0-1:24.1.0(03)\r\n'
'0-1:24.3.0(090212160000)(00)(60)(1)(0-1:24.2.1)(m3)\r\n'
'(00001.001)\r\n'
'0-1:24.4.0(1)\r\n'
'!\r\n'
)
parser = TelegramParser(telegram_specifications.V3)
telegram = parser.parse(telegram_str)
print(telegram) # see 'Telegram object' docs below
Telegram dictionary
-------------------
A dictionary of which the key indicates the field type. These regex values
correspond to one of dsmr_parser.obis_reference constants.
The value is either a CosemObject or MBusObject. These have a 'value' and 'unit'
property. MBusObject's additionally have a 'datetime' property. The 'value' can
contain any python type (int, str, Decimal) depending on the field. The 'unit'
contains 'kW', 'A', 'kWh' or 'm3'.
.. code-block:: python
# Contents of a parsed DSMR v3 telegram
{'\\d-\\d:17\\.0\\.0.+?\\r\\n': <dsmr_parser.objects.CosemObject object at 0x10fc39eb8>,
'\\d-\\d:1\\.7\\.0.+?\\r\\n': <dsmr_parser.objects.CosemObject object at 0x10f916390>,
'\\d-\\d:1\\.8\\.1.+?\\r\\n': <dsmr_parser.objects.CosemObject object at 0x10fc39e10>,
'\\d-\\d:1\\.8\\.2.+?\\r\\n': <dsmr_parser.objects.CosemObject object at 0x10fc39ef0>,
'\\d-\\d:24\\.1\\.0.+?\\r\\n': <dsmr_parser.objects.CosemObject object at 0x10fbaef28>,
'\\d-\\d:24\\.3\\.0.+?\\r\\n.+?\\r\\n': <dsmr_parser.objects.MBusObject object at 0x10f9163c8>,
'\\d-\\d:24\\.4\\.0.+?\\r\\n': <dsmr_parser.objects.CosemObject object at 0x10fc39f60>,
'\\d-\\d:2\\.7\\.0.+?\\r\\n': <dsmr_parser.objects.CosemObject object at 0x10fc39fd0>,
'\\d-\\d:2\\.8\\.1.+?\\r\\n': <dsmr_parser.objects.CosemObject object at 0x10fbaee10>,
'\\d-\\d:2\\.8\\.2.+?\\r\\n': <dsmr_parser.objects.CosemObject object at 0x10fc39e80>,
'\\d-\\d:96\\.13\\.0.+?\\r\\n': <dsmr_parser.objects.CosemObject object at 0x10fc39d30>,
'\\d-\\d:96\\.13\\.1.+?\\r\\n': <dsmr_parser.objects.CosemObject object at 0x10fbaeeb8>,
'\\d-\\d:96\\.14\\.0.+?\\r\\n': <dsmr_parser.objects.CosemObject object at 0x10fbaef98>,
'\\d-\\d:96\\.1\\.0.+?\\r\\n': <dsmr_parser.objects.CosemObject object at 0x10fbaef60>,
'\\d-\\d:96\\.1\\.1.+?\\r\\n': <dsmr_parser.objects.CosemObject object at 0x10fc39f98>,
'\\d-\\d:96\\.3\\.10.+?\\r\\n': <dsmr_parser.objects.CosemObject object at 0x10fc39dd8>}
Example to get some of the values:
.. code-block:: python
from dsmr_parser import obis_references
# The telegram message timestamp.
message_datetime = telegram[obis_references.P1_MESSAGE_TIMESTAMP]
# Using the active tariff to determine the electricity being used and
# delivered for the right tariff.
active_tariff = telegram[obis_references.ELECTRICITY_ACTIVE_TARIFF]
active_tariff = int(tariff.value)
electricity_used_total = telegram[obis_references.ELECTRICITY_USED_TARIFF_ALL[active_tariff - 1]]
electricity_delivered_total = telegram[obis_references.ELECTRICITY_DELIVERED_TARIFF_ALL[active_tariff - 1]]
gas_reading = telegram[obis_references.HOURLY_GAS_METER_READING]
# See dsmr_reader.obis_references for all readable telegram values.
# Note that the avilable values differ per DSMR version.
Telegram as an Object
---------------------
An object version of the telegram is available as well.
.. code-block:: python
# DSMR v4.2 p1 using dsmr_parser and telegram objects
from dsmr_parser import telegram_specifications
from dsmr_parser.clients import SerialReader, SERIAL_SETTINGS_V5
from dsmr_parser.objects import CosemObject, MBusObject, Telegram
from dsmr_parser.parsers import TelegramParser
import os
serial_reader = SerialReader(
device='/dev/ttyUSB0',
serial_settings=SERIAL_SETTINGS_V5,
telegram_specification=telegram_specifications.V4
)
# telegram = next(serial_reader.read_as_object())
# print(telegram)
for telegram in serial_reader.read_as_object():
os.system('clear')
print(telegram)
Example of output of print of the telegram object:
.. code-block:: console
P1_MESSAGE_HEADER: 42 [None]
P1_MESSAGE_TIMESTAMP: 2016-11-13 19:57:57+00:00 [None]
EQUIPMENT_IDENTIFIER: 3960221976967177082151037881335713 [None]
ELECTRICITY_USED_TARIFF_1: 1581.123 [kWh]
ELECTRICITY_USED_TARIFF_2: 1435.706 [kWh]
ELECTRICITY_DELIVERED_TARIFF_1: 0.000 [kWh]
ELECTRICITY_DELIVERED_TARIFF_2: 0.000 [kWh]
ELECTRICITY_ACTIVE_TARIFF: 0002 [None]
CURRENT_ELECTRICITY_USAGE: 2.027 [kW]
CURRENT_ELECTRICITY_DELIVERY: 0.000 [kW]
LONG_POWER_FAILURE_COUNT: 7 [None]
VOLTAGE_SAG_L1_COUNT: 0 [None]
VOLTAGE_SAG_L2_COUNT: 0 [None]
VOLTAGE_SAG_L3_COUNT: 0 [None]
VOLTAGE_SWELL_L1_COUNT: 0 [None]
VOLTAGE_SWELL_L2_COUNT: 0 [None]
VOLTAGE_SWELL_L3_COUNT: 0 [None]
TEXT_MESSAGE_CODE: None [None]
TEXT_MESSAGE: None [None]
DEVICE_TYPE: 3 [None]
INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE: 0.170 [kW]
INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE: 1.247 [kW]
INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE: 0.209 [kW]
INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE: 0.000 [kW]
INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE: 0.000 [kW]
INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE: 0.000 [kW]
EQUIPMENT_IDENTIFIER_GAS: 4819243993373755377509728609491464 [None]
HOURLY_GAS_METER_READING: 981.443 [m3]
Accessing the telegrams information as attributes directly:
.. code-block:: python
telegram
Out[3]: <dsmr_parser.objects.Telegram at 0x7f5e995d9898>
telegram.CURRENT_ELECTRICITY_USAGE
Out[4]: <dsmr_parser.objects.CosemObject at 0x7f5e98ae5ac8>
telegram.CURRENT_ELECTRICITY_USAGE.value
Out[5]: Decimal('2.027')
telegram.CURRENT_ELECTRICITY_USAGE.unit
Out[6]: 'kW'
The telegram object has an iterator, can be used to find all the information elements in the current telegram:
.. code-block:: python
[attr for attr, value in telegram]
Out[11]:
['P1_MESSAGE_HEADER',
'P1_MESSAGE_TIMESTAMP',
'EQUIPMENT_IDENTIFIER',
'ELECTRICITY_USED_TARIFF_1',
'ELECTRICITY_USED_TARIFF_2',
'ELECTRICITY_DELIVERED_TARIFF_1',
'ELECTRICITY_DELIVERED_TARIFF_2',
'ELECTRICITY_ACTIVE_TARIFF',
'CURRENT_ELECTRICITY_USAGE',
'CURRENT_ELECTRICITY_DELIVERY',
'LONG_POWER_FAILURE_COUNT',
'VOLTAGE_SAG_L1_COUNT',
'VOLTAGE_SAG_L2_COUNT',
'VOLTAGE_SAG_L3_COUNT',
'VOLTAGE_SWELL_L1_COUNT',
'VOLTAGE_SWELL_L2_COUNT',
'VOLTAGE_SWELL_L3_COUNT',
'TEXT_MESSAGE_CODE',
'TEXT_MESSAGE',
'DEVICE_TYPE',
'INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE',
'INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE',
'INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE',
'INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE',
'INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE',
'INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE',
'EQUIPMENT_IDENTIFIER_GAS',
'HOURLY_GAS_METER_READING']
Installation
------------
To install DSMR Parser:
.. code-block:: bash
$ pip install dsmr-parser
Known issues
------------
If the serial settings SERIAL_SETTINGS_V2_2 or SERIAL_SETTINGS_V4 don't work.
Make sure to try and replace the parity settings to EVEN or NONE.
It's possible that alternative settings will be added in the future if these
settings don't work for the majority of meters.

View File

@@ -1,65 +0,0 @@
from functools import partial
import argparse
import asyncio
import logging
from dsmr_parser.clients import create_dsmr_reader, create_tcp_dsmr_reader
def console():
"""Output DSMR data to console."""
parser = argparse.ArgumentParser(description=console.__doc__)
parser.add_argument('--device', default='/dev/ttyUSB0',
help='port to read DSMR data from')
parser.add_argument('--host', default=None,
help='alternatively connect using TCP host.')
parser.add_argument('--port', default=None,
help='TCP port to use for connection')
parser.add_argument('--version', default='2.2', choices=['2.2', '4'],
help='DSMR version (2.2, 4)')
parser.add_argument('--verbose', '-v', action='count')
args = parser.parse_args()
if args.verbose:
level = logging.DEBUG
else:
level = logging.ERROR
logging.basicConfig(level=level)
loop = asyncio.get_event_loop()
def print_callback(telegram):
"""Callback that prints telegram values."""
for obiref, obj in telegram.items():
if obj:
print(obj.value, obj.unit)
print()
# create tcp or serial connection depending on args
if args.host and args.port:
create_connection = partial(create_tcp_dsmr_reader,
args.host, args.port, args.version,
print_callback, loop=loop)
else:
create_connection = partial(create_dsmr_reader,
args.device, args.version,
print_callback, loop=loop)
try:
# connect and keep connected until interrupted by ctrl-c
while True:
# create serial or tcp connection
conn = create_connection()
transport, protocol = loop.run_until_complete(conn)
# wait until connection it closed
loop.run_until_complete(protocol.wait_closed())
# wait 5 seconds before attempting reconnect
loop.run_until_complete(asyncio.sleep(5))
except KeyboardInterrupt:
# cleanup connection after user initiated shutdown
transport.close()
loop.run_until_complete(asyncio.sleep(0))
finally:
loop.close()

View File

@@ -1,5 +0,0 @@
from dsmr_parser.clients.settings import SERIAL_SETTINGS_V2_2, \
SERIAL_SETTINGS_V4, SERIAL_SETTINGS_V5
from dsmr_parser.clients.serial_ import SerialReader, AsyncSerialReader
from dsmr_parser.clients.protocol import create_dsmr_protocol, \
create_dsmr_reader, create_tcp_dsmr_reader

View File

@@ -1,171 +0,0 @@
import logging
import fileinput
import tailer
from dsmr_parser.clients.telegram_buffer import TelegramBuffer
from dsmr_parser.exceptions import ParseError, InvalidChecksumError
from dsmr_parser.objects import Telegram
from dsmr_parser.parsers import TelegramParser
logger = logging.getLogger(__name__)
class FileReader(object):
"""
Filereader to read and parse raw telegram strings from a file and instantiate Telegram objects
for each read telegram.
Usage:
from dsmr_parser import telegram_specifications
from dsmr_parser.clients.filereader import FileReader
if __name__== "__main__":
infile = '/data/smartmeter/readings.txt'
file_reader = FileReader(
file = infile,
telegram_specification = telegram_specifications.V4
)
for telegram in file_reader.read_as_object():
print(telegram)
The file can be created like:
from dsmr_parser import telegram_specifications
from dsmr_parser.clients import SerialReader, SERIAL_SETTINGS_V5
if __name__== "__main__":
outfile = '/data/smartmeter/readings.txt'
serial_reader = SerialReader(
device='/dev/ttyUSB0',
serial_settings=SERIAL_SETTINGS_V5,
telegram_specification=telegram_specifications.V4
)
for telegram in serial_reader.read_as_object():
f=open(outfile,"ab+")
f.write(telegram._telegram_data.encode())
f.close()
"""
def __init__(self, file, telegram_specification):
self._file = file
self.telegram_parser = TelegramParser(telegram_specification)
self.telegram_buffer = TelegramBuffer()
self.telegram_specification = telegram_specification
def read_as_object(self):
"""
Read complete DSMR telegram's from a file and return a Telegram object.
:rtype: generator
"""
with open(self._file, "rb") as file_handle:
while True:
data = file_handle.readline()
str = data.decode()
self.telegram_buffer.append(str)
for telegram in self.telegram_buffer.get_all():
try:
yield Telegram(telegram, self.telegram_parser, self.telegram_specification)
except InvalidChecksumError as e:
logger.warning(str(e))
except ParseError as e:
logger.error('Failed to parse telegram: %s', e)
class FileInputReader(object):
"""
Filereader to read and parse raw telegram strings from stdin or files specified at the commandline
and instantiate Telegram objects for each read telegram.
Usage python script "syphon_smartmeter_readings_stdin.py":
from dsmr_parser import telegram_specifications
from dsmr_parser.clients.filereader import FileInputReader
if __name__== "__main__":
fileinput_reader = FileReader(
file = infile,
telegram_specification = telegram_specifications.V4
)
for telegram in fileinput_reader.read_as_object():
print(telegram)
Command line:
tail -f /data/smartmeter/readings.txt | python3 syphon_smartmeter_readings_stdin.py
"""
def __init__(self, telegram_specification):
self.telegram_parser = TelegramParser(telegram_specification)
self.telegram_buffer = TelegramBuffer()
self.telegram_specification = telegram_specification
def read_as_object(self):
"""
Read complete DSMR telegram's from stdin of filearguments specified on teh command line
and return a Telegram object.
:rtype: generator
"""
with fileinput.input(mode='rb') as file_handle:
while True:
data = file_handle.readline()
str = data.decode()
self.telegram_buffer.append(str)
for telegram in self.telegram_buffer.get_all():
try:
yield Telegram(telegram, self.telegram_parser, self.telegram_specification)
except InvalidChecksumError as e:
logger.warning(str(e))
except ParseError as e:
logger.error('Failed to parse telegram: %s', e)
class FileTailReader(object):
"""
Filereader to read and parse raw telegram strings from the tail of a
given file and instantiate Telegram objects for each read telegram.
Usage python script "syphon_smartmeter_readings_stdin.py":
from dsmr_parser import telegram_specifications
from dsmr_parser.clients.filereader import FileTailReader
if __name__== "__main__":
infile = '/data/smartmeter/readings.txt'
filetail_reader = FileTailReader(
file = infile,
telegram_specification = telegram_specifications.V5
)
for telegram in filetail_reader.read_as_object():
print(telegram)
"""
def __init__(self, file, telegram_specification):
self._file = file
self.telegram_parser = TelegramParser(telegram_specification)
self.telegram_buffer = TelegramBuffer()
self.telegram_specification = telegram_specification
def read_as_object(self):
"""
Read complete DSMR telegram's from a files tail and return a Telegram object.
:rtype: generator
"""
with open(self._file, "rb") as file_handle:
for data in tailer.follow(file_handle):
str = data.decode()
self.telegram_buffer.append(str)
for telegram in self.telegram_buffer.get_all():
try:
yield Telegram(telegram, self.telegram_parser, self.telegram_specification)
except InvalidChecksumError as e:
logger.warning(str(e))
except ParseError as e:
logger.error('Failed to parse telegram: %s', e)

View File

@@ -1,119 +0,0 @@
"""Asyncio protocol implementation for handling telegrams."""
from functools import partial
import asyncio
import logging
from serial_asyncio import create_serial_connection
from dsmr_parser import telegram_specifications
from dsmr_parser.clients.telegram_buffer import TelegramBuffer
from dsmr_parser.exceptions import ParseError, InvalidChecksumError
from dsmr_parser.parsers import TelegramParser
from dsmr_parser.clients.settings import SERIAL_SETTINGS_V2_2, \
SERIAL_SETTINGS_V4, SERIAL_SETTINGS_V5
def create_dsmr_protocol(dsmr_version, telegram_callback, loop=None):
"""Creates a DSMR asyncio protocol."""
if dsmr_version == '2.2':
specification = telegram_specifications.V2_2
serial_settings = SERIAL_SETTINGS_V2_2
elif dsmr_version == '4':
specification = telegram_specifications.V4
serial_settings = SERIAL_SETTINGS_V4
elif dsmr_version == '5':
specification = telegram_specifications.V5
serial_settings = SERIAL_SETTINGS_V5
elif dsmr_version == '5B':
specification = telegram_specifications.BELGIUM_FLUVIUS
serial_settings = SERIAL_SETTINGS_V5
else:
raise NotImplementedError("No telegram parser found for version: %s",
dsmr_version)
protocol = partial(DSMRProtocol, loop, TelegramParser(specification),
telegram_callback=telegram_callback)
return protocol, serial_settings
def create_dsmr_reader(port, dsmr_version, telegram_callback, loop=None):
"""Creates a DSMR asyncio protocol coroutine using serial port."""
protocol, serial_settings = create_dsmr_protocol(
dsmr_version, telegram_callback, loop=None)
serial_settings['url'] = port
conn = create_serial_connection(loop, protocol, **serial_settings)
return conn
def create_tcp_dsmr_reader(host, port, dsmr_version,
telegram_callback, loop=None):
"""Creates a DSMR asyncio protocol coroutine using TCP connection."""
if not loop:
loop = asyncio.get_event_loop()
protocol, _ = create_dsmr_protocol(
dsmr_version, telegram_callback, loop=loop)
conn = loop.create_connection(protocol, host, port)
return conn
class DSMRProtocol(asyncio.Protocol):
"""Assemble and handle incoming data into complete DSM telegrams."""
transport = None
telegram_callback = None
def __init__(self, loop, telegram_parser, telegram_callback=None):
"""Initialize class."""
self.loop = loop
self.log = logging.getLogger(__name__)
self.telegram_parser = telegram_parser
# callback to call on complete telegram
self.telegram_callback = telegram_callback
# buffer to keep incomplete incoming data
self.telegram_buffer = TelegramBuffer()
# keep a lock until the connection is closed
self._closed = asyncio.Event()
def connection_made(self, transport):
"""Just logging for now."""
self.transport = transport
self.log.debug('connected')
def data_received(self, data):
"""Add incoming data to buffer."""
data = data.decode('ascii')
self.log.debug('received data: %s', data)
self.telegram_buffer.append(data)
for telegram in self.telegram_buffer.get_all():
self.handle_telegram(telegram)
def connection_lost(self, exc):
"""Stop when connection is lost."""
if exc:
self.log.exception('disconnected due to exception')
else:
self.log.info('disconnected because of close/abort.')
self._closed.set()
def handle_telegram(self, telegram):
"""Send off parsed telegram to handling callback."""
self.log.debug('got telegram: %s', telegram)
try:
parsed_telegram = self.telegram_parser.parse(telegram)
except InvalidChecksumError as e:
self.log.warning(str(e))
except ParseError:
self.log.exception("failed to parse telegram")
else:
self.telegram_callback(parsed_telegram)
@asyncio.coroutine
def wait_closed(self):
"""Wait until connection is closed."""
yield from self._closed.wait()

View File

@@ -1,99 +0,0 @@
import asyncio
import logging
import serial
import serial_asyncio
from dsmr_parser.clients.telegram_buffer import TelegramBuffer
from dsmr_parser.exceptions import ParseError, InvalidChecksumError
from dsmr_parser.parsers import TelegramParser
from dsmr_parser.objects import Telegram
logger = logging.getLogger(__name__)
class SerialReader(object):
PORT_KEY = 'port'
def __init__(self, device, serial_settings, telegram_specification):
self.serial_settings = serial_settings
self.serial_settings[self.PORT_KEY] = device
self.telegram_parser = TelegramParser(telegram_specification)
self.telegram_buffer = TelegramBuffer()
self.telegram_specification = telegram_specification
def read(self):
"""
Read complete DSMR telegram's from the serial interface and parse it
into CosemObject's and MbusObject's
:rtype: generator
"""
with serial.Serial(**self.serial_settings) as serial_handle:
while True:
data = serial_handle.read(max(1, min(1024, serial_handle.in_waiting)))
self.telegram_buffer.append(data.decode('ascii'))
for telegram in self.telegram_buffer.get_all():
try:
yield self.telegram_parser.parse(telegram)
except InvalidChecksumError as e:
logger.warning(str(e))
except ParseError as e:
logger.error('Failed to parse telegram: %s', e)
def read_as_object(self):
"""
Read complete DSMR telegram's from the serial interface and return a Telegram object.
:rtype: generator
"""
with serial.Serial(**self.serial_settings) as serial_handle:
while True:
data = serial_handle.readline()
self.telegram_buffer.append(data.decode('ascii'))
for telegram in self.telegram_buffer.get_all():
try:
yield Telegram(telegram, self.telegram_parser, self.telegram_specification)
except InvalidChecksumError as e:
logger.warning(str(e))
except ParseError as e:
logger.error('Failed to parse telegram: %s', e)
class AsyncSerialReader(SerialReader):
"""Serial reader using asyncio pyserial."""
PORT_KEY = 'url'
@asyncio.coroutine
def read(self, queue):
"""
Read complete DSMR telegram's from the serial interface and parse it
into CosemObject's and MbusObject's.
Instead of being a generator, values are pushed to provided queue for
asynchronous processing.
:rtype: None
"""
# create Serial StreamReader
conn = serial_asyncio.open_serial_connection(**self.serial_settings)
reader, _ = yield from conn
while True:
# Read line if available or give control back to loop until new
# data has arrived.
data = yield from reader.readline()
self.telegram_buffer.append(data.decode('ascii'))
for telegram in self.telegram_buffer.get_all():
try:
# Push new parsed telegram onto queue.
queue.put_nowait(
self.telegram_parser.parse(telegram)
)
except ParseError as e:
logger.warning('Failed to parse telegram: %s', e)

View File

@@ -1,32 +0,0 @@
import serial
SERIAL_SETTINGS_V2_2 = {
'baudrate': 9600,
'bytesize': serial.SEVENBITS,
'parity': serial.PARITY_EVEN,
'stopbits': serial.STOPBITS_ONE,
'xonxoff': 0,
'rtscts': 0,
'timeout': 20
}
SERIAL_SETTINGS_V4 = {
'baudrate': 115200,
'bytesize': serial.SEVENBITS,
'parity': serial.PARITY_EVEN,
'stopbits': serial.STOPBITS_ONE,
'xonxoff': 0,
'rtscts': 0,
'timeout': 20
}
SERIAL_SETTINGS_V5 = {
'baudrate': 115200,
'bytesize': serial.EIGHTBITS,
'parity': serial.PARITY_NONE,
'stopbits': serial.STOPBITS_ONE,
'xonxoff': 0,
'rtscts': 0,
'timeout': 20
}

View File

@@ -1,57 +0,0 @@
import re
class TelegramBuffer(object):
"""
Used as a buffer for a stream of telegram data. Constructs full telegram
strings from the buffered data and returns it.
"""
def __init__(self):
self._buffer = ''
def get_all(self):
"""
Remove complete telegrams from buffer and yield them.
:rtype generator:
"""
for telegram in self._find_telegrams():
self._remove(telegram)
yield telegram
def append(self, data):
"""
Add telegram data to buffer.
:param str data: chars, lines or full telegram strings of telegram data
"""
self._buffer += data
def _remove(self, telegram):
"""
Remove telegram from buffer and incomplete data preceding it. This
is easier than validating the data before adding it to the buffer.
:param str telegram:
:return:
"""
# Remove data leading up to the telegram and the telegram itself.
index = self._buffer.index(telegram) + len(telegram)
self._buffer = self._buffer[index:]
def _find_telegrams(self):
"""
Find complete telegrams in buffer from start ('/') till ending
checksum ('!AB12\r\n').
:rtype: list
"""
# - Match all characters after start of telegram except for the start
# itself again '^\/]+', which eliminates incomplete preceding telegrams.
# - Do non greedy match using '?' so start is matched up to the first
# checksum that's found.
# - The checksum is optional '{0,4}' because not all telegram versions
# support it.
return re.findall(
r'\/[^\/]+?\![A-F0-9]{0,4}\0?\r\n',
self._buffer,
re.DOTALL
)

View File

@@ -1,6 +0,0 @@
class ParseError(Exception):
pass
class InvalidChecksumError(ParseError):
pass

View File

@@ -1,54 +0,0 @@
from dsmr_parser import obis_references as obis
"""
dsmr_parser.obis_name_mapping
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
This module contains a mapping of obis references to names.
"""
EN = {
obis.P1_MESSAGE_HEADER: 'P1_MESSAGE_HEADER',
obis.P1_MESSAGE_TIMESTAMP: 'P1_MESSAGE_TIMESTAMP',
obis.ELECTRICITY_IMPORTED_TOTAL: 'ELECTRICITY_IMPORTED_TOTAL',
obis.ELECTRICITY_USED_TARIFF_1: 'ELECTRICITY_USED_TARIFF_1',
obis.ELECTRICITY_USED_TARIFF_2: 'ELECTRICITY_USED_TARIFF_2',
obis.ELECTRICITY_DELIVERED_TARIFF_1: 'ELECTRICITY_DELIVERED_TARIFF_1',
obis.ELECTRICITY_DELIVERED_TARIFF_2: 'ELECTRICITY_DELIVERED_TARIFF_2',
obis.ELECTRICITY_ACTIVE_TARIFF: 'ELECTRICITY_ACTIVE_TARIFF',
obis.EQUIPMENT_IDENTIFIER: 'EQUIPMENT_IDENTIFIER',
obis.CURRENT_ELECTRICITY_USAGE: 'CURRENT_ELECTRICITY_USAGE',
obis.CURRENT_ELECTRICITY_DELIVERY: 'CURRENT_ELECTRICITY_DELIVERY',
obis.LONG_POWER_FAILURE_COUNT: 'LONG_POWER_FAILURE_COUNT',
obis.SHORT_POWER_FAILURE_COUNT: 'SHORT_POWER_FAILURE_COUNT',
obis.POWER_EVENT_FAILURE_LOG: 'POWER_EVENT_FAILURE_LOG',
obis.VOLTAGE_SAG_L1_COUNT: 'VOLTAGE_SAG_L1_COUNT',
obis.VOLTAGE_SAG_L2_COUNT: 'VOLTAGE_SAG_L2_COUNT',
obis.VOLTAGE_SAG_L3_COUNT: 'VOLTAGE_SAG_L3_COUNT',
obis.VOLTAGE_SWELL_L1_COUNT: 'VOLTAGE_SWELL_L1_COUNT',
obis.VOLTAGE_SWELL_L2_COUNT: 'VOLTAGE_SWELL_L2_COUNT',
obis.VOLTAGE_SWELL_L3_COUNT: 'VOLTAGE_SWELL_L3_COUNT',
obis.INSTANTANEOUS_VOLTAGE_L1: 'INSTANTANEOUS_VOLTAGE_L1',
obis.INSTANTANEOUS_VOLTAGE_L2: 'INSTANTANEOUS_VOLTAGE_L2',
obis.INSTANTANEOUS_VOLTAGE_L3: 'INSTANTANEOUS_VOLTAGE_L3',
obis.INSTANTANEOUS_CURRENT_L1: 'INSTANTANEOUS_CURRENT_L1',
obis.INSTANTANEOUS_CURRENT_L2: 'INSTANTANEOUS_CURRENT_L2',
obis.INSTANTANEOUS_CURRENT_L3: 'INSTANTANEOUS_CURRENT_L3',
obis.TEXT_MESSAGE_CODE: 'TEXT_MESSAGE_CODE',
obis.TEXT_MESSAGE: 'TEXT_MESSAGE',
obis.DEVICE_TYPE: 'DEVICE_TYPE',
obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE: 'INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE',
obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE: 'INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE',
obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE: 'INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE',
obis.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE: 'INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE',
obis.INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE: 'INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE',
obis.INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE: 'INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE',
obis.EQUIPMENT_IDENTIFIER_GAS: 'EQUIPMENT_IDENTIFIER_GAS',
obis.HOURLY_GAS_METER_READING: 'HOURLY_GAS_METER_READING',
obis.GAS_METER_READING: 'GAS_METER_READING',
obis.ACTUAL_TRESHOLD_ELECTRICITY: 'ACTUAL_TRESHOLD_ELECTRICITY',
obis.ACTUAL_SWITCH_POSITION: 'ACTUAL_SWITCH_POSITION',
obis.VALVE_POSITION_GAS: 'VALVE_POSITION_GAS'
}
REVERSE_EN = dict([(v, k) for k, v in EN.items()])

View File

@@ -1,67 +0,0 @@
"""
Contains the signatures of each telegram line.
Previously contained the channel + obis reference signatures, but has been
refactored to full line signatures to maintain backwards compatibility.
Might be refactored in a backwards incompatible way as soon as proper telegram
objects are introduced.
"""
P1_MESSAGE_HEADER = r'\d-\d:0\.2\.8.+?\r\n'
P1_MESSAGE_TIMESTAMP = r'\d-\d:1\.0\.0.+?\r\n'
ELECTRICITY_IMPORTED_TOTAL = r'\d-\d:1\.8\.0.+?\r\n'
ELECTRICITY_USED_TARIFF_1 = r'\d-\d:1\.8\.1.+?\r\n'
ELECTRICITY_USED_TARIFF_2 = r'\d-\d:1\.8\.2.+?\r\n'
ELECTRICITY_DELIVERED_TARIFF_1 = r'\d-\d:2\.8\.1.+?\r\n'
ELECTRICITY_DELIVERED_TARIFF_2 = r'\d-\d:2\.8\.2.+?\r\n'
ELECTRICITY_ACTIVE_TARIFF = r'\d-\d:96\.14\.0.+?\r\n'
EQUIPMENT_IDENTIFIER = r'\d-\d:96\.1\.1.+?\r\n'
CURRENT_ELECTRICITY_USAGE = r'\d-\d:1\.7\.0.+?\r\n'
CURRENT_ELECTRICITY_DELIVERY = r'\d-\d:2\.7\.0.+?\r\n'
LONG_POWER_FAILURE_COUNT = r'96\.7\.9.+?\r\n'
SHORT_POWER_FAILURE_COUNT = r'96\.7\.21.+?\r\n'
POWER_EVENT_FAILURE_LOG = r'99\.97\.0.+?\r\n'
VOLTAGE_SAG_L1_COUNT = r'\d-\d:32\.32\.0.+?\r\n'
VOLTAGE_SAG_L2_COUNT = r'\d-\d:52\.32\.0.+?\r\n'
VOLTAGE_SAG_L3_COUNT = r'\d-\d:72\.32\.0.+?\r\n'
VOLTAGE_SWELL_L1_COUNT = r'\d-\d:32\.36\.0.+?\r\n'
VOLTAGE_SWELL_L2_COUNT = r'\d-\d:52\.36\.0.+?\r\n'
VOLTAGE_SWELL_L3_COUNT = r'\d-\d:72\.36\.0.+?\r\n'
INSTANTANEOUS_VOLTAGE_L1 = r'\d-\d:32\.7\.0.+?\r\n'
INSTANTANEOUS_VOLTAGE_L2 = r'\d-\d:52\.7\.0.+?\r\n'
INSTANTANEOUS_VOLTAGE_L3 = r'\d-\d:72\.7\.0.+?\r\n'
INSTANTANEOUS_CURRENT_L1 = r'\d-\d:31\.7\.0.+?\r\n'
INSTANTANEOUS_CURRENT_L2 = r'\d-\d:51\.7\.0.+?\r\n'
INSTANTANEOUS_CURRENT_L3 = r'\d-\d:71\.7\.0.+?\r\n'
TEXT_MESSAGE_CODE = r'\d-\d:96\.13\.1.+?\r\n'
TEXT_MESSAGE = r'\d-\d:96\.13\.0.+?\r\n'
DEVICE_TYPE = r'\d-\d:24\.1\.0.+?\r\n'
INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE = r'\d-\d:21\.7\.0.+?\r\n'
INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE = r'\d-\d:41\.7\.0.+?\r\n'
INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE = r'\d-\d:61\.7\.0.+?\r\n'
INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE = r'\d-\d:22\.7\.0.+?\r\n'
INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE = r'\d-\d:42\.7\.0.+?\r\n'
INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE = r'\d-\d:62\.7\.0.+?\r\n'
EQUIPMENT_IDENTIFIER_GAS = r'\d-\d:96\.1\.0.+?\r\n'
# TODO differences between gas meter readings in v3 and lower and v4 and up
HOURLY_GAS_METER_READING = r'\d-\d:24\.2\.1.+?\r\n'
GAS_METER_READING = r'\d-\d:24\.3\.0.+?\r\n.+?\r\n'
ACTUAL_TRESHOLD_ELECTRICITY = r'\d-\d:17\.0\.0.+?\r\n'
ACTUAL_SWITCH_POSITION = r'\d-\d:96\.3\.10.+?\r\n'
VALVE_POSITION_GAS = r'\d-\d:24\.4\.0.+?\r\n'
# TODO 17.0.0
# TODO 96.3.10
ELECTRICITY_USED_TARIFF_ALL = (
ELECTRICITY_USED_TARIFF_1,
ELECTRICITY_USED_TARIFF_2
)
ELECTRICITY_DELIVERED_TARIFF_ALL = (
ELECTRICITY_DELIVERED_TARIFF_1,
ELECTRICITY_DELIVERED_TARIFF_2
)
# Alternate codes for foreign countries.
BELGIUM_HOURLY_GAS_METER_READING = r'\d-\d:24\.2\.3.+?\r\n' # Different code, same format.
LUXEMBOURG_ELECTRICITY_USED_TARIFF_GLOBAL = r'\d-\d:1\.8\.0.+?\r\n' # Total imported energy register (P+)
LUXEMBOURG_ELECTRICITY_DELIVERED_TARIFF_GLOBAL = r'\d-\d:2\.8\.0.+?\r\n' # Total exported energy register (P-)

View File

@@ -1,207 +0,0 @@
import dsmr_parser.obis_name_mapping
import datetime
import json
from decimal import Decimal
class Telegram(object):
"""
Container for raw and parsed telegram data.
Initializing:
from dsmr_parser import telegram_specifications
from dsmr_parser.exceptions import InvalidChecksumError, ParseError
from dsmr_parser.objects import CosemObject, MBusObject, Telegram
from dsmr_parser.parsers import TelegramParser
from test.example_telegrams import TELEGRAM_V4_2
parser = TelegramParser(telegram_specifications.V4)
telegram = Telegram(TELEGRAM_V4_2, parser, telegram_specifications.V4)
Attributes can be accessed on a telegram object by addressing by their english name, for example:
telegram.ELECTRICITY_USED_TARIFF_1
All attributes in a telegram can be iterated over, for example:
[k for k,v in telegram]
yields:
['P1_MESSAGE_HEADER', 'P1_MESSAGE_TIMESTAMP', 'EQUIPMENT_IDENTIFIER', ...]
"""
def __init__(self, telegram_data, telegram_parser, telegram_specification):
self._telegram_data = telegram_data
self._telegram_specification = telegram_specification
self._telegram_parser = telegram_parser
self._obis_name_mapping = dsmr_parser.obis_name_mapping.EN
self._reverse_obis_name_mapping = dsmr_parser.obis_name_mapping.REVERSE_EN
self._dictionary = self._telegram_parser.parse(telegram_data)
self._item_names = self._get_item_names()
def __getattr__(self, name):
''' will only get called for undefined attributes '''
obis_reference = self._reverse_obis_name_mapping[name]
value = self._dictionary[obis_reference]
setattr(self, name, value)
return value
def _get_item_names(self):
return [self._obis_name_mapping[k] for k, v in self._dictionary.items()]
def __iter__(self):
for attr in self._item_names:
value = getattr(self, attr)
yield attr, value
def __str__(self):
output = ""
for attr, value in self:
output += "{}: \t {}\n".format(attr, str(value))
return output
def to_json(self):
return json.dumps(dict([[attr, json.loads(value.to_json())] for attr, value in self]))
class DSMRObject(object):
"""
Represents all data from a single telegram line.
"""
def __init__(self, values):
self.values = values
class MBusObject(DSMRObject):
@property
def datetime(self):
return self.values[0]['value']
@property
def value(self):
# TODO temporary workaround for DSMR v2.2. Maybe use the same type of
# TODO object, but let the parse set them differently? So don't use
# TODO hardcoded indexes here.
if len(self.values) != 2: # v2
return self.values[6]['value']
else:
return self.values[1]['value']
@property
def unit(self):
# TODO temporary workaround for DSMR v2.2. Maybe use the same type of
# TODO object, but let the parse set them differently? So don't use
# TODO hardcoded indexes here.
if len(self.values) != 2: # v2
return self.values[5]['value']
else:
return self.values[1]['unit']
def __str__(self):
output = "{}\t[{}] at {}".format(str(self.value), str(self.unit), str(self.datetime.astimezone().isoformat()))
return output
def to_json(self):
timestamp = self.datetime
if isinstance(self.datetime, datetime.datetime):
timestamp = self.datetime.astimezone().isoformat()
value = self.value
if isinstance(self.value, datetime.datetime):
value = self.value.astimezone().isoformat()
if isinstance(self.value, Decimal):
value = float(self.value)
output = {
'datetime': timestamp,
'value': value,
'unit': self.unit
}
return json.dumps(output)
class CosemObject(DSMRObject):
@property
def value(self):
return self.values[0]['value']
@property
def unit(self):
return self.values[0]['unit']
def __str__(self):
print_value = self.value
if isinstance(self.value, datetime.datetime):
print_value = self.value.astimezone().isoformat()
output = "{}\t[{}]".format(str(print_value), str(self.unit))
return output
def to_json(self):
json_value = self.value
if isinstance(self.value, datetime.datetime):
json_value = self.value.astimezone().isoformat()
if isinstance(self.value, Decimal):
json_value = float(self.value)
output = {
'value': json_value,
'unit': self.unit
}
return json.dumps(output)
class ProfileGenericObject(DSMRObject):
"""
Represents all data in a GenericProfile value.
All buffer values are returned as a list of MBusObjects,
containing the datetime (timestamp) and the value.
"""
def __init__(self, values):
super().__init__(values)
self._buffer_list = None
@property
def buffer_length(self):
return self.values[0]['value']
@property
def buffer_type(self):
return self.values[1]['value']
@property
def buffer(self):
if self._buffer_list is None:
self._buffer_list = []
values_offset = 2
for i in range(self.buffer_length):
offset = values_offset + i*2
self._buffer_list.append(MBusObject([self.values[offset], self.values[offset + 1]]))
return self._buffer_list
def __str__(self):
output = "\t buffer length: {}\n".format(self.buffer_length)
output += "\t buffer type: {}".format(self.buffer_type)
for buffer_value in self.buffer:
timestamp = buffer_value.datetime
if isinstance(timestamp, datetime.datetime):
timestamp = str(timestamp.astimezone().isoformat())
output += "\n\t event occured at: {}".format(timestamp)
output += "\t for: {} [{}]".format(buffer_value.value, buffer_value.unit)
return output
def to_json(self):
"""
:return: A json of all values in the GenericProfileObject , with the following structure
{'buffer_length': n,
'buffer_type': obis_ref,
'buffer': [{'datetime': d1,
'value': v1,
'unit': u1},
...
{'datetime': dn,
'value': vn,
'unit': un}
]
}
"""
list = [['buffer_length', self.buffer_length]]
list.append(['buffer_type', self.buffer_type])
buffer_repr = [json.loads(buffer_item.to_json()) for buffer_item in self.buffer]
list.append(['buffer', buffer_repr])
output = dict(list)
return json.dumps(output)

View File

@@ -1,274 +0,0 @@
import logging
import re
from ctypes import c_ushort
from dsmr_parser.objects import MBusObject, CosemObject, ProfileGenericObject
from dsmr_parser.exceptions import ParseError, InvalidChecksumError
logger = logging.getLogger(__name__)
class TelegramParser(object):
def __init__(self, telegram_specification, apply_checksum_validation=True):
"""
:param telegram_specification: determines how the telegram is parsed
:param apply_checksum_validation: validate checksum if applicable for
telegram DSMR version (v4 and up).
:type telegram_specification: dict
"""
self.telegram_specification = telegram_specification
self.apply_checksum_validation = apply_checksum_validation
def parse(self, telegram_data):
"""
Parse telegram from string to dict.
The telegram str type makes python 2.x integration easier.
:param str telegram_data: full telegram from start ('/') to checksum
('!ABCD') including line endings in between the telegram's lines
:rtype: dict
:returns: Shortened example:
{
..
r'\d-\d:96\.1\.1.+?\r\n': <CosemObject>, # EQUIPMENT_IDENTIFIER
r'\d-\d:1\.8\.1.+?\r\n': <CosemObject>, # ELECTRICITY_USED_TARIFF_1
r'\d-\d:24\.3\.0.+?\r\n.+?\r\n': <MBusObject>, # GAS_METER_READING
..
}
:raises ParseError:
:raises InvalidChecksumError:
"""
if self.apply_checksum_validation \
and self.telegram_specification['checksum_support']:
self.validate_checksum(telegram_data)
telegram = {}
for signature, parser in self.telegram_specification['objects'].items():
match = re.search(signature, telegram_data, re.DOTALL)
# Some signatures are optional and may not be present,
# so only parse lines that match
if match:
telegram[signature] = parser.parse(match.group(0))
return telegram
@staticmethod
def validate_checksum(telegram):
"""
:param str telegram:
:raises ParseError:
:raises InvalidChecksumError:
"""
# Extract the part for which the checksum applies.
checksum_contents = re.search(r'\/.+\!', telegram, re.DOTALL)
# Extract the hexadecimal checksum value itself.
# The line ending '\r\n' for the checksum line can be ignored.
checksum_hex = re.search(r'((?<=\!)[0-9A-Z]{4})+', telegram)
if not checksum_contents or not checksum_hex:
raise ParseError(
'Failed to perform CRC validation because the telegram is '
'incomplete. The checksum and/or content values are missing.'
)
calculated_crc = TelegramParser.crc16(checksum_contents.group(0))
expected_crc = int(checksum_hex.group(0), base=16)
if calculated_crc != expected_crc:
raise InvalidChecksumError(
"Invalid telegram. The CRC checksum '{}' does not match the "
"expected '{}'".format(
calculated_crc,
expected_crc
)
)
@staticmethod
def crc16(telegram):
crc16_tab = []
for i in range(0, 256):
crc = c_ushort(i).value
for j in range(0, 8):
if (crc & 0x0001):
crc = c_ushort(crc >> 1).value ^ 0xA001
else:
crc = c_ushort(crc >> 1).value
crc16_tab.append(hex(crc))
crcValue = 0x0000
for c in telegram:
d = ord(c)
tmp = crcValue ^ d
rotated = c_ushort(crcValue >> 8).value
crcValue = rotated ^ int(crc16_tab[(tmp & 0x00ff)], 0)
return crcValue
class DSMRObjectParser(object):
"""
Parses an object (can also be see as a 'line') from a telegram.
"""
def __init__(self, *value_formats):
self.value_formats = value_formats
def _is_line_wellformed(self, line, values):
# allows overriding by child class
return (values and (len(values) == len(self.value_formats)))
def _parse_values(self, values):
# allows overriding by child class
return [self.value_formats[i].parse(value)
for i, value in enumerate(values)]
def _parse(self, line):
# Match value groups, but exclude the parentheses
pattern = re.compile(r'((?<=\()[0-9a-zA-Z\.\*\-\:]{0,}(?=\)))')
values = re.findall(pattern, line)
if not self._is_line_wellformed(line, values):
raise ParseError("Invalid '%s' line for '%s'", line, self)
# Convert empty value groups to None for clarity.
values = [None if value == '' else value for value in values]
return self._parse_values(values)
class MBusParser(DSMRObjectParser):
"""
Gas meter value parser.
These are lines with a timestamp and gas meter value.
Line format:
'ID (TST) (Mv1*U1)'
1 2 3 4
1) OBIS Reduced ID-code
2) Time Stamp (TST) of capture time of measurement value
3) Measurement value 1 (most recent entry of buffer attribute without unit)
4) Unit of measurement values (Unit of capture objects attribute)
"""
def parse(self, line):
return MBusObject(self._parse(line))
class CosemParser(DSMRObjectParser):
"""
Cosem object parser.
These are data objects with a single value that optionally have a unit of
measurement.
Line format:
ID (Mv*U)
1 23 45
1) OBIS Reduced ID-code
2) Separator "(", ASCII 28h
3) COSEM object attribute value
4) Unit of measurement values (Unit of capture objects attribute) - only if
applicable
5) Separator ")", ASCII 29h
"""
def parse(self, line):
return CosemObject(self._parse(line))
class ProfileGenericParser(DSMRObjectParser):
"""
Power failure log parser.
These are data objects with multiple repeating groups of values.
Line format:
ID (z) (ID1) (TST) (Bv1*U1) (TST) (Bvz*Uz)
1 2 3 4 5 6 7 8 9
1) OBIS Reduced ID-code
2) Number of values z (max 10).
3) Identifications of buffer values (OBIS Reduced ID codes of capture objects attribute)
4) Time Stamp (TST) of power failure end time
5) Buffer value 1 (most recent entry of buffer attribute without unit)
6) Unit of buffer values (Unit of capture objects attribute)
7) Time Stamp (TST) of power failure end time
8) Buffer value 2 (oldest entry of buffer attribute without unit)
9) Unit of buffer values (Unit of capture objects attribute)
"""
def __init__(self, buffer_types, head_parsers, parsers_for_unidentified):
self.value_formats = head_parsers
self.buffer_types = buffer_types
self.parsers_for_unidentified = parsers_for_unidentified
def _is_line_wellformed(self, line, values):
if values and (len(values) >= 2) and (values[0].isdigit()):
buffer_length = int(values[0])
return (buffer_length <= 10) and (len(values) == (buffer_length * 2 + 2))
else:
return False
def _parse_values(self, values):
buffer_length = int(values[0])
buffer_value_obis_ID = values[1]
if (buffer_length > 0):
if buffer_value_obis_ID in self.buffer_types:
bufferValueParsers = self.buffer_types[buffer_value_obis_ID]
else:
bufferValueParsers = self.parsers_for_unidentified
# add the parsers for the encountered value type z times
for _ in range(buffer_length):
self.value_formats.extend(bufferValueParsers)
return [self.value_formats[i].parse(value) for i, value in enumerate(values)]
def parse(self, line):
return ProfileGenericObject(self._parse(line))
class ValueParser(object):
"""
Parses a single value from DSMRObject's.
Example with coerce_type being int:
(002*A) becomes {'value': 1, 'unit': 'A'}
Example with coerce_type being str:
(42) becomes {'value': '42', 'unit': None}
"""
def __init__(self, coerce_type):
self.coerce_type = coerce_type
def parse(self, value):
unit_of_measurement = None
if value and '*' in value:
value, unit_of_measurement = value.split('*')
# A value group is not required to have a value, and then coercing does
# not apply.
value = self.coerce_type(value) if value is not None else value
return {
'value': value,
'unit': unit_of_measurement
}

View File

@@ -1,10 +0,0 @@
from dsmr_parser.parsers import ValueParser
from dsmr_parser.value_types import timestamp
PG_FAILURE_EVENT = r'0-0:96.7.19'
PG_HEAD_PARSERS = [ValueParser(int), ValueParser(str)]
PG_UNIDENTIFIED_BUFFERTYPE_PARSERS = [ValueParser(str), ValueParser(str)]
BUFFER_TYPES = {
PG_FAILURE_EVENT: [ValueParser(timestamp), ValueParser(int)]
}

View File

@@ -1,157 +0,0 @@
from decimal import Decimal
from copy import deepcopy
from dsmr_parser import obis_references as obis
from dsmr_parser.parsers import CosemParser, ValueParser, MBusParser, ProfileGenericParser
from dsmr_parser.value_types import timestamp
from dsmr_parser.profile_generic_specifications import BUFFER_TYPES, PG_HEAD_PARSERS, PG_UNIDENTIFIED_BUFFERTYPE_PARSERS
"""
dsmr_parser.telegram_specifications
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
This module contains DSMR telegram specifications. Each specifications describes
how the telegram lines are parsed.
"""
V2_2 = {
'checksum_support': False,
'objects': {
obis.EQUIPMENT_IDENTIFIER: CosemParser(ValueParser(str)),
obis.ELECTRICITY_USED_TARIFF_1: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_USED_TARIFF_2: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_DELIVERED_TARIFF_1: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_DELIVERED_TARIFF_2: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_ACTIVE_TARIFF: CosemParser(ValueParser(str)),
obis.CURRENT_ELECTRICITY_USAGE: CosemParser(ValueParser(Decimal)),
obis.CURRENT_ELECTRICITY_DELIVERY: CosemParser(ValueParser(Decimal)),
obis.ACTUAL_TRESHOLD_ELECTRICITY: CosemParser(ValueParser(Decimal)),
obis.ACTUAL_SWITCH_POSITION: CosemParser(ValueParser(str)),
obis.TEXT_MESSAGE_CODE: CosemParser(ValueParser(int)),
obis.TEXT_MESSAGE: CosemParser(ValueParser(str)),
obis.EQUIPMENT_IDENTIFIER_GAS: CosemParser(ValueParser(str)),
obis.DEVICE_TYPE: CosemParser(ValueParser(str)),
obis.VALVE_POSITION_GAS: CosemParser(ValueParser(str)),
obis.GAS_METER_READING: MBusParser(
ValueParser(timestamp),
ValueParser(int),
ValueParser(int),
ValueParser(int),
ValueParser(str), # obis ref
ValueParser(str), # unit, position 5
ValueParser(Decimal), # meter reading, position 6
),
}
}
V3 = V2_2
V4 = {
'checksum_support': True,
'objects': {
obis.P1_MESSAGE_HEADER: CosemParser(ValueParser(str)),
obis.P1_MESSAGE_TIMESTAMP: CosemParser(ValueParser(timestamp)),
obis.EQUIPMENT_IDENTIFIER: CosemParser(ValueParser(str)),
obis.ELECTRICITY_USED_TARIFF_1: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_USED_TARIFF_2: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_DELIVERED_TARIFF_1: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_DELIVERED_TARIFF_2: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_ACTIVE_TARIFF: CosemParser(ValueParser(str)),
obis.CURRENT_ELECTRICITY_USAGE: CosemParser(ValueParser(Decimal)),
obis.CURRENT_ELECTRICITY_DELIVERY: CosemParser(ValueParser(Decimal)),
obis.SHORT_POWER_FAILURE_COUNT: CosemParser(ValueParser(int)),
obis.LONG_POWER_FAILURE_COUNT: CosemParser(ValueParser(int)),
obis.POWER_EVENT_FAILURE_LOG:
ProfileGenericParser(BUFFER_TYPES,
PG_HEAD_PARSERS,
PG_UNIDENTIFIED_BUFFERTYPE_PARSERS),
obis.VOLTAGE_SAG_L1_COUNT: CosemParser(ValueParser(int)),
obis.VOLTAGE_SAG_L2_COUNT: CosemParser(ValueParser(int)),
obis.VOLTAGE_SAG_L3_COUNT: CosemParser(ValueParser(int)),
obis.VOLTAGE_SWELL_L1_COUNT: CosemParser(ValueParser(int)),
obis.VOLTAGE_SWELL_L2_COUNT: CosemParser(ValueParser(int)),
obis.VOLTAGE_SWELL_L3_COUNT: CosemParser(ValueParser(int)),
obis.TEXT_MESSAGE_CODE: CosemParser(ValueParser(int)),
obis.TEXT_MESSAGE: CosemParser(ValueParser(str)),
obis.DEVICE_TYPE: CosemParser(ValueParser(int)),
obis.INSTANTANEOUS_CURRENT_L1: CosemParser(ValueParser(Decimal)),
obis.INSTANTANEOUS_CURRENT_L2: CosemParser(ValueParser(Decimal)),
obis.INSTANTANEOUS_CURRENT_L3: CosemParser(ValueParser(Decimal)),
obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE: CosemParser(ValueParser(Decimal)),
obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE: CosemParser(ValueParser(Decimal)),
obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE: CosemParser(ValueParser(Decimal)),
obis.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE: CosemParser(ValueParser(Decimal)),
obis.INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE: CosemParser(ValueParser(Decimal)),
obis.INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE: CosemParser(ValueParser(Decimal)),
obis.EQUIPMENT_IDENTIFIER_GAS: CosemParser(ValueParser(str)),
obis.HOURLY_GAS_METER_READING: MBusParser(
ValueParser(timestamp),
ValueParser(Decimal)
)
}
}
V5 = {
'checksum_support': True,
'objects': {
obis.P1_MESSAGE_HEADER: CosemParser(ValueParser(str)),
obis.P1_MESSAGE_TIMESTAMP: CosemParser(ValueParser(timestamp)),
obis.EQUIPMENT_IDENTIFIER: CosemParser(ValueParser(str)),
obis.ELECTRICITY_IMPORTED_TOTAL: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_USED_TARIFF_1: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_USED_TARIFF_2: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_DELIVERED_TARIFF_1: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_DELIVERED_TARIFF_2: CosemParser(ValueParser(Decimal)),
obis.ELECTRICITY_ACTIVE_TARIFF: CosemParser(ValueParser(str)),
obis.CURRENT_ELECTRICITY_USAGE: CosemParser(ValueParser(Decimal)),
obis.CURRENT_ELECTRICITY_DELIVERY: CosemParser(ValueParser(Decimal)),
obis.LONG_POWER_FAILURE_COUNT: CosemParser(ValueParser(int)),
obis.SHORT_POWER_FAILURE_COUNT: CosemParser(ValueParser(int)),
obis.POWER_EVENT_FAILURE_LOG:
ProfileGenericParser(BUFFER_TYPES,
PG_HEAD_PARSERS,
PG_UNIDENTIFIED_BUFFERTYPE_PARSERS),
obis.VOLTAGE_SAG_L1_COUNT: CosemParser(ValueParser(int)),
obis.VOLTAGE_SAG_L2_COUNT: CosemParser(ValueParser(int)),
obis.VOLTAGE_SAG_L3_COUNT: CosemParser(ValueParser(int)),
obis.VOLTAGE_SWELL_L1_COUNT: CosemParser(ValueParser(int)),
obis.VOLTAGE_SWELL_L2_COUNT: CosemParser(ValueParser(int)),
obis.VOLTAGE_SWELL_L3_COUNT: CosemParser(ValueParser(int)),
obis.INSTANTANEOUS_VOLTAGE_L1: CosemParser(ValueParser(Decimal)),
obis.INSTANTANEOUS_VOLTAGE_L2: CosemParser(ValueParser(Decimal)),
obis.INSTANTANEOUS_VOLTAGE_L3: CosemParser(ValueParser(Decimal)),
obis.INSTANTANEOUS_CURRENT_L1: CosemParser(ValueParser(Decimal)),
obis.INSTANTANEOUS_CURRENT_L2: CosemParser(ValueParser(Decimal)),
obis.INSTANTANEOUS_CURRENT_L3: CosemParser(ValueParser(Decimal)),
obis.TEXT_MESSAGE: CosemParser(ValueParser(str)),
obis.DEVICE_TYPE: CosemParser(ValueParser(int)),
obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE: CosemParser(ValueParser(Decimal)),
obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE: CosemParser(ValueParser(Decimal)),
obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE: CosemParser(ValueParser(Decimal)),
obis.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE: CosemParser(ValueParser(Decimal)),
obis.INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE: CosemParser(ValueParser(Decimal)),
obis.INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE: CosemParser(ValueParser(Decimal)),
obis.EQUIPMENT_IDENTIFIER_GAS: CosemParser(ValueParser(str)),
obis.HOURLY_GAS_METER_READING: MBusParser(
ValueParser(timestamp),
ValueParser(Decimal)
)
}
}
ALL = (V2_2, V3, V4, V5)
BELGIUM_FLUVIUS = deepcopy(V5)
BELGIUM_FLUVIUS['objects'].update({
obis.BELGIUM_HOURLY_GAS_METER_READING: MBusParser(
ValueParser(timestamp),
ValueParser(Decimal)
)
})
LUXEMBOURG_SMARTY = deepcopy(V5)
LUXEMBOURG_SMARTY['objects'].update({
obis.LUXEMBOURG_ELECTRICITY_USED_TARIFF_GLOBAL: CosemParser(ValueParser(Decimal)),
obis.LUXEMBOURG_ELECTRICITY_DELIVERED_TARIFF_GLOBAL: CosemParser(ValueParser(Decimal)),
})

View File

@@ -1,18 +0,0 @@
import datetime
import pytz
def timestamp(value):
naive_datetime = datetime.datetime.strptime(value[:-1], '%y%m%d%H%M%S')
# TODO comment on this exception
if len(value) == 13:
is_dst = value[12] == 'S' # assume format 160322150000W
else:
is_dst = False
local_tz = pytz.timezone('Europe/Amsterdam')
localized_datetime = local_tz.localize(naive_datetime, is_dst=is_dst)
return localized_datetime.astimezone(pytz.utc)

View File

@@ -1,20 +0,0 @@
from setuptools import setup, find_packages
setup(
name='dsmr-parser',
description='Library to parse Dutch Smart Meter Requirements (DSMR)',
author='Nigel Dokter',
author_email='nigel@nldr.net',
url='https://github.com/ndokter/dsmr_parser',
version='0.21',
packages=find_packages(),
install_requires=[
'pyserial>=3,<4',
'pyserial-asyncio<1',
'pytz',
'Tailer==0.4.1'
],
entry_points={
'console_scripts': ['dsmr_console=dsmr_parser.__main__:console']
},
)

View File

@@ -1,130 +0,0 @@
TELEGRAM_V2_2 = (
'/ISk5\2MT382-1004\r\n'
'\r\n'
'0-0:96.1.1(00000000000000)\r\n'
'1-0:1.8.1(00001.001*kWh)\r\n'
'1-0:1.8.2(00001.001*kWh)\r\n'
'1-0:2.8.1(00001.001*kWh)\r\n'
'1-0:2.8.2(00001.001*kWh)\r\n'
'0-0:96.14.0(0001)\r\n'
'1-0:1.7.0(0001.01*kW)\r\n'
'1-0:2.7.0(0000.00*kW)\r\n'
'0-0:17.0.0(0999.00*kW)\r\n'
'0-0:96.3.10(1)\r\n'
'0-0:96.13.1()\r\n'
'0-0:96.13.0()\r\n'
'0-1:24.1.0(3)\r\n'
'0-1:96.1.0(000000000000)\r\n'
'0-1:24.3.0(161107190000)(00)(60)(1)(0-1:24.2.1)(m3)\r\n'
'(00001.001)\r\n'
'0-1:24.4.0(1)\r\n'
'!\r\n'
)
TELEGRAM_V3 = (
'/ISk5\2MT382-1000\r\n'
'\r\n'
'0-0:96.1.1(4B384547303034303436333935353037)\r\n'
'1-0:1.8.1(12345.678*kWh)\r\n'
'1-0:1.8.2(12345.678*kWh)\r\n'
'1-0:2.8.1(12345.678*kWh)\r\n'
'1-0:2.8.2(12345.678*kWh)\r\n'
'0-0:96.14.0(0002)\r\n'
'1-0:1.7.0(001.19*kW)\r\n'
'1-0:2.7.0(000.00*kW)\r\n'
'0-0:17.0.0(016*A)\r\n'
'0-0:96.3.10(1)\r\n'
'0-0:96.13.1(303132333435363738)\r\n'
'0-0:96.13.0(303132333435363738393A3B3C3D3E3F303132333435363738393A3B3C3D3E'
'3F303132333435363738393A3B3C3D3E3F303132333435363738393A3B3C3D3E3F30313233'
'3435363738393A3B3C3D3E3F)\r\n'
'0-1:96.1.0(3232323241424344313233343536373839)\r\n'
'0-1:24.1.0(03)\r\n'
'0-1:24.3.0(090212160000)(00)(60)(1)(0-1:24.2.1)(m3)\r\n'
'(00001.001)\r\n'
'0-1:24.4.0(1)\r\n'
'!\r\n'
)
TELEGRAM_V4_2 = (
'/KFM5KAIFA-METER\r\n'
'\r\n'
'1-3:0.2.8(42)\r\n'
'0-0:1.0.0(161113205757W)\r\n'
'0-0:96.1.1(3960221976967177082151037881335713)\r\n'
'1-0:1.8.1(001581.123*kWh)\r\n'
'1-0:1.8.2(001435.706*kWh)\r\n'
'1-0:2.8.1(000000.000*kWh)\r\n'
'1-0:2.8.2(000000.000*kWh)\r\n'
'0-0:96.14.0(0002)\r\n'
'1-0:1.7.0(02.027*kW)\r\n'
'1-0:2.7.0(00.000*kW)\r\n'
'0-0:96.7.21(00015)\r\n'
'0-0:96.7.9(00007)\r\n'
'1-0:99.97.0(3)(0-0:96.7.19)(000104180320W)(0000237126*s)(000101000001W)'
'(2147583646*s)(000102000003W)(2317482647*s)\r\n'
'1-0:32.32.0(00000)\r\n'
'1-0:52.32.0(00000)\r\n'
'1-0:72.32.0(00000)\r\n'
'1-0:32.36.0(00000)\r\n'
'1-0:52.36.0(00000)\r\n'
'1-0:72.36.0(00000)\r\n'
'0-0:96.13.1()\r\n'
'0-0:96.13.0()\r\n'
'1-0:31.7.0(000*A)\r\n'
'1-0:51.7.0(006*A)\r\n'
'1-0:71.7.0(002*A)\r\n'
'1-0:21.7.0(00.170*kW)\r\n'
'1-0:22.7.0(00.000*kW)\r\n'
'1-0:41.7.0(01.247*kW)\r\n'
'1-0:42.7.0(00.000*kW)\r\n'
'1-0:61.7.0(00.209*kW)\r\n'
'1-0:62.7.0(00.000*kW)\r\n'
'0-1:24.1.0(003)\r\n'
'0-1:96.1.0(4819243993373755377509728609491464)\r\n'
'0-1:24.2.1(161129200000W)(00981.443*m3)\r\n'
'!6796\r\n'
)
TELEGRAM_V5 = (
'/ISk5\2MT382-1000\r\n'
'\r\n'
'1-3:0.2.8(50)\r\n'
'0-0:1.0.0(170102192002W)\r\n'
'0-0:96.1.1(4B384547303034303436333935353037)\r\n'
'1-0:1.8.1(000004.426*kWh)\r\n'
'1-0:1.8.2(000002.399*kWh)\r\n'
'1-0:2.8.1(000002.444*kWh)\r\n'
'1-0:2.8.2(000000.000*kWh)\r\n'
'0-0:96.14.0(0002)\r\n'
'1-0:1.7.0(00.244*kW)\r\n'
'1-0:2.7.0(00.000*kW)\r\n'
'0-0:96.7.21(00013)\r\n'
'0-0:96.7.9(00000)\r\n'
'1-0:99.97.0(0)(0-0:96.7.19)\r\n'
'1-0:32.32.0(00000)\r\n'
'1-0:52.32.0(00000)\r\n'
'1-0:72.32.0(00000)\r\n'
'1-0:32.36.0(00000)\r\n'
'1-0:52.36.0(00000)\r\n'
'1-0:72.36.0(00000)\r\n'
'0-0:96.13.0()\r\n'
'1-0:32.7.0(0230.0*V)\r\n'
'1-0:52.7.0(0230.0*V)\r\n'
'1-0:72.7.0(0229.0*V)\r\n'
'1-0:31.7.0(0.48*A)\r\n'
'1-0:51.7.0(0.44*A)\r\n'
'1-0:71.7.0(0.86*A)\r\n'
'1-0:21.7.0(00.070*kW)\r\n'
'1-0:41.7.0(00.032*kW)\r\n'
'1-0:61.7.0(00.142*kW)\r\n'
'1-0:22.7.0(00.000*kW)\r\n'
'1-0:42.7.0(00.000*kW)\r\n'
'1-0:62.7.0(00.000*kW)\r\n'
'0-1:24.1.0(003)\r\n'
'0-1:96.1.0(3232323241424344313233343536373839)\r\n'
'0-1:24.2.1(170102161005W)(00000.107*m3)\r\n'
'0-2:24.1.0(003)\r\n'
'0-2:96.1.0()\r\n'
'!87B3\r\n'
)

View File

@@ -1,8 +0,0 @@
from dsmr_parser import telegram_specifications
from dsmr_parser.objects import Telegram
from dsmr_parser.parsers import TelegramParser
from example_telegrams import TELEGRAM_V4_2
parser = TelegramParser(telegram_specifications.V4)
telegram = Telegram(TELEGRAM_V4_2, parser, telegram_specifications.V4)
print(telegram)

View File

@@ -1,92 +0,0 @@
import unittest
from decimal import Decimal
from dsmr_parser.objects import MBusObject, CosemObject
from dsmr_parser.parsers import TelegramParser
from dsmr_parser import telegram_specifications
from dsmr_parser import obis_references as obis
from test.example_telegrams import TELEGRAM_V2_2
class TelegramParserV2_2Test(unittest.TestCase):
""" Test parsing of a DSMR v2.2 telegram. """
def test_parse(self):
parser = TelegramParser(telegram_specifications.V2_2)
result = parser.parse(TELEGRAM_V2_2)
# ELECTRICITY_USED_TARIFF_1 (1-0:1.8.1)
assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_1], CosemObject)
assert result[obis.ELECTRICITY_USED_TARIFF_1].unit == 'kWh'
assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_1].value, Decimal)
assert result[obis.ELECTRICITY_USED_TARIFF_1].value == Decimal('1.001')
# ELECTRICITY_USED_TARIFF_2 (1-0:1.8.2)
assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_2], CosemObject)
assert result[obis.ELECTRICITY_USED_TARIFF_2].unit == 'kWh'
assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_2].value, Decimal)
assert result[obis.ELECTRICITY_USED_TARIFF_2].value == Decimal('1.001')
# ELECTRICITY_DELIVERED_TARIFF_1 (1-0:2.8.1)
assert isinstance(result[obis.ELECTRICITY_DELIVERED_TARIFF_1], CosemObject)
assert result[obis.ELECTRICITY_DELIVERED_TARIFF_1].unit == 'kWh'
assert isinstance(result[obis.ELECTRICITY_DELIVERED_TARIFF_1].value, Decimal)
assert result[obis.ELECTRICITY_DELIVERED_TARIFF_1].value == Decimal('1.001')
# ELECTRICITY_DELIVERED_TARIFF_2 (1-0:2.8.2)
assert isinstance(result[obis.ELECTRICITY_DELIVERED_TARIFF_2], CosemObject)
assert result[obis.ELECTRICITY_DELIVERED_TARIFF_2].unit == 'kWh'
assert isinstance(result[obis.ELECTRICITY_DELIVERED_TARIFF_2].value, Decimal)
assert result[obis.ELECTRICITY_DELIVERED_TARIFF_2].value == Decimal('1.001')
# ELECTRICITY_ACTIVE_TARIFF (0-0:96.14.0)
assert isinstance(result[obis.ELECTRICITY_ACTIVE_TARIFF], CosemObject)
assert result[obis.ELECTRICITY_ACTIVE_TARIFF].unit is None
assert isinstance(result[obis.ELECTRICITY_ACTIVE_TARIFF].value, str)
assert result[obis.ELECTRICITY_ACTIVE_TARIFF].value == '0001'
# EQUIPMENT_IDENTIFIER (0-0:96.1.1)
assert isinstance(result[obis.EQUIPMENT_IDENTIFIER], CosemObject)
assert result[obis.EQUIPMENT_IDENTIFIER].unit is None
assert isinstance(result[obis.EQUIPMENT_IDENTIFIER].value, str)
assert result[obis.EQUIPMENT_IDENTIFIER].value == '00000000000000'
# CURRENT_ELECTRICITY_USAGE (1-0:1.7.0)
assert isinstance(result[obis.CURRENT_ELECTRICITY_USAGE], CosemObject)
assert result[obis.CURRENT_ELECTRICITY_USAGE].unit == 'kW'
assert isinstance(result[obis.CURRENT_ELECTRICITY_USAGE].value, Decimal)
assert result[obis.CURRENT_ELECTRICITY_USAGE].value == Decimal('1.01')
# CURRENT_ELECTRICITY_DELIVERY (1-0:2.7.0)
assert isinstance(result[obis.CURRENT_ELECTRICITY_DELIVERY], CosemObject)
assert result[obis.CURRENT_ELECTRICITY_DELIVERY].unit == 'kW'
assert isinstance(result[obis.CURRENT_ELECTRICITY_DELIVERY].value, Decimal)
assert result[obis.CURRENT_ELECTRICITY_DELIVERY].value == Decimal('0')
# TEXT_MESSAGE_CODE (0-0:96.13.1)
assert isinstance(result[obis.TEXT_MESSAGE_CODE], CosemObject)
assert result[obis.TEXT_MESSAGE_CODE].unit is None
# TEXT_MESSAGE (0-0:96.13.0)
assert isinstance(result[obis.TEXT_MESSAGE], CosemObject)
assert result[obis.TEXT_MESSAGE].unit is None
assert result[obis.TEXT_MESSAGE].value is None
# DEVICE_TYPE (0-x:24.1.0)
assert isinstance(result[obis.TEXT_MESSAGE], CosemObject)
assert result[obis.DEVICE_TYPE].unit is None
assert isinstance(result[obis.DEVICE_TYPE].value, str)
assert result[obis.DEVICE_TYPE].value == '3'
# EQUIPMENT_IDENTIFIER_GAS (0-x:96.1.0)
assert isinstance(result[obis.EQUIPMENT_IDENTIFIER_GAS], CosemObject)
assert result[obis.EQUIPMENT_IDENTIFIER_GAS].unit is None
assert isinstance(result[obis.EQUIPMENT_IDENTIFIER_GAS].value, str)
assert result[obis.EQUIPMENT_IDENTIFIER_GAS].value == '000000000000'
# GAS_METER_READING (0-1:24.3.0)
assert isinstance(result[obis.GAS_METER_READING], MBusObject)
assert result[obis.GAS_METER_READING].unit == 'm3'
assert isinstance(result[obis.GAS_METER_READING].value, Decimal)
assert result[obis.GAS_METER_READING].value == Decimal('1.001')

View File

@@ -1,98 +0,0 @@
import unittest
from decimal import Decimal
from dsmr_parser.objects import CosemObject, MBusObject
from dsmr_parser.parsers import TelegramParser
from dsmr_parser import telegram_specifications
from dsmr_parser import obis_references as obis
from test.example_telegrams import TELEGRAM_V3
class TelegramParserV3Test(unittest.TestCase):
""" Test parsing of a DSMR v3 telegram. """
def test_parse(self):
parser = TelegramParser(telegram_specifications.V3)
result = parser.parse(TELEGRAM_V3)
# ELECTRICITY_USED_TARIFF_1 (1-0:1.8.1)
assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_1], CosemObject)
assert result[obis.ELECTRICITY_USED_TARIFF_1].unit == 'kWh'
assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_1].value, Decimal)
assert result[obis.ELECTRICITY_USED_TARIFF_1].value == Decimal('12345.678')
# ELECTRICITY_USED_TARIFF_2 (1-0:1.8.2)
assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_2], CosemObject)
assert result[obis.ELECTRICITY_USED_TARIFF_2].unit == 'kWh'
assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_2].value, Decimal)
assert result[obis.ELECTRICITY_USED_TARIFF_2].value == Decimal('12345.678')
# ELECTRICITY_DELIVERED_TARIFF_1 (1-0:2.8.1)
assert isinstance(result[obis.ELECTRICITY_DELIVERED_TARIFF_1], CosemObject)
assert result[obis.ELECTRICITY_DELIVERED_TARIFF_1].unit == 'kWh'
assert isinstance(result[obis.ELECTRICITY_DELIVERED_TARIFF_1].value, Decimal)
assert result[obis.ELECTRICITY_DELIVERED_TARIFF_1].value == Decimal('12345.678')
# ELECTRICITY_DELIVERED_TARIFF_2 (1-0:2.8.2)
assert isinstance(result[obis.ELECTRICITY_DELIVERED_TARIFF_2], CosemObject)
assert result[obis.ELECTRICITY_DELIVERED_TARIFF_2].unit == 'kWh'
assert isinstance(result[obis.ELECTRICITY_DELIVERED_TARIFF_2].value, Decimal)
assert result[obis.ELECTRICITY_DELIVERED_TARIFF_2].value == Decimal('12345.678')
# ELECTRICITY_ACTIVE_TARIFF (0-0:96.14.0)
assert isinstance(result[obis.ELECTRICITY_ACTIVE_TARIFF], CosemObject)
assert result[obis.ELECTRICITY_ACTIVE_TARIFF].unit is None
assert isinstance(result[obis.ELECTRICITY_ACTIVE_TARIFF].value, str)
assert result[obis.ELECTRICITY_ACTIVE_TARIFF].value == '0002'
# EQUIPMENT_IDENTIFIER (0-0:96.1.1)
assert isinstance(result[obis.EQUIPMENT_IDENTIFIER], CosemObject)
assert result[obis.EQUIPMENT_IDENTIFIER].unit is None
assert isinstance(result[obis.EQUIPMENT_IDENTIFIER].value, str)
assert result[obis.EQUIPMENT_IDENTIFIER].value == '4B384547303034303436333935353037'
# CURRENT_ELECTRICITY_USAGE (1-0:1.7.0)
assert isinstance(result[obis.CURRENT_ELECTRICITY_USAGE], CosemObject)
assert result[obis.CURRENT_ELECTRICITY_USAGE].unit == 'kW'
assert isinstance(result[obis.CURRENT_ELECTRICITY_USAGE].value, Decimal)
assert result[obis.CURRENT_ELECTRICITY_USAGE].value == Decimal('1.19')
# CURRENT_ELECTRICITY_DELIVERY (1-0:2.7.0)
assert isinstance(result[obis.CURRENT_ELECTRICITY_DELIVERY], CosemObject)
assert result[obis.CURRENT_ELECTRICITY_DELIVERY].unit == 'kW'
assert isinstance(result[obis.CURRENT_ELECTRICITY_DELIVERY].value, Decimal)
assert result[obis.CURRENT_ELECTRICITY_DELIVERY].value == Decimal('0')
# TEXT_MESSAGE_CODE (0-0:96.13.1)
assert isinstance(result[obis.TEXT_MESSAGE_CODE], CosemObject)
assert result[obis.TEXT_MESSAGE_CODE].unit is None
assert isinstance(result[obis.TEXT_MESSAGE_CODE].value, int)
assert result[obis.TEXT_MESSAGE_CODE].value == 303132333435363738
# TEXT_MESSAGE (0-0:96.13.0)
assert isinstance(result[obis.TEXT_MESSAGE], CosemObject)
assert result[obis.TEXT_MESSAGE].unit is None
assert isinstance(result[obis.TEXT_MESSAGE].value, str)
assert result[obis.TEXT_MESSAGE].value == \
'303132333435363738393A3B3C3D3E3F303132333435363738393A3B3C3D3E3F' \
'303132333435363738393A3B3C3D3E3F303132333435363738393A3B3C3D3E3F' \
'303132333435363738393A3B3C3D3E3F'
# DEVICE_TYPE (0-x:24.1.0)
assert isinstance(result[obis.TEXT_MESSAGE], CosemObject)
assert result[obis.DEVICE_TYPE].unit is None
assert isinstance(result[obis.DEVICE_TYPE].value, str)
assert result[obis.DEVICE_TYPE].value == '03'
# EQUIPMENT_IDENTIFIER_GAS (0-x:96.1.0)
assert isinstance(result[obis.EQUIPMENT_IDENTIFIER_GAS], CosemObject)
assert result[obis.EQUIPMENT_IDENTIFIER_GAS].unit is None
assert isinstance(result[obis.EQUIPMENT_IDENTIFIER_GAS].value, str)
assert result[obis.EQUIPMENT_IDENTIFIER_GAS].value == '3232323241424344313233343536373839'
# GAS_METER_READING (0-1:24.3.0)
assert isinstance(result[obis.GAS_METER_READING], MBusObject)
assert result[obis.GAS_METER_READING].unit == 'm3'
assert isinstance(result[obis.GAS_METER_READING].value, Decimal)
assert result[obis.GAS_METER_READING].value == Decimal('1.001')

View File

@@ -1,245 +0,0 @@
from decimal import Decimal
import datetime
import unittest
import pytz
from dsmr_parser import obis_references as obis
from dsmr_parser import telegram_specifications
from dsmr_parser.exceptions import InvalidChecksumError, ParseError
from dsmr_parser.objects import CosemObject, MBusObject
from dsmr_parser.parsers import TelegramParser
from test.example_telegrams import TELEGRAM_V4_2
class TelegramParserV4_2Test(unittest.TestCase):
""" Test parsing of a DSMR v4.2 telegram. """
def test_parse(self):
parser = TelegramParser(telegram_specifications.V4)
result = parser.parse(TELEGRAM_V4_2)
# P1_MESSAGE_HEADER (1-3:0.2.8)
assert isinstance(result[obis.P1_MESSAGE_HEADER], CosemObject)
assert result[obis.P1_MESSAGE_HEADER].unit is None
assert isinstance(result[obis.P1_MESSAGE_HEADER].value, str)
assert result[obis.P1_MESSAGE_HEADER].value == '42'
# P1_MESSAGE_TIMESTAMP (0-0:1.0.0)
assert isinstance(result[obis.P1_MESSAGE_TIMESTAMP], CosemObject)
assert result[obis.P1_MESSAGE_TIMESTAMP].unit is None
assert isinstance(result[obis.P1_MESSAGE_TIMESTAMP].value, datetime.datetime)
assert result[obis.P1_MESSAGE_TIMESTAMP].value == \
datetime.datetime(2016, 11, 13, 19, 57, 57, tzinfo=pytz.UTC)
# ELECTRICITY_USED_TARIFF_1 (1-0:1.8.1)
assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_1], CosemObject)
assert result[obis.ELECTRICITY_USED_TARIFF_1].unit == 'kWh'
assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_1].value, Decimal)
assert result[obis.ELECTRICITY_USED_TARIFF_1].value == Decimal('1581.123')
# ELECTRICITY_USED_TARIFF_2 (1-0:1.8.2)
assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_2], CosemObject)
assert result[obis.ELECTRICITY_USED_TARIFF_2].unit == 'kWh'
assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_2].value, Decimal)
assert result[obis.ELECTRICITY_USED_TARIFF_2].value == Decimal('1435.706')
# ELECTRICITY_DELIVERED_TARIFF_1 (1-0:2.8.1)
assert isinstance(result[obis.ELECTRICITY_DELIVERED_TARIFF_1], CosemObject)
assert result[obis.ELECTRICITY_DELIVERED_TARIFF_1].unit == 'kWh'
assert isinstance(result[obis.ELECTRICITY_DELIVERED_TARIFF_1].value, Decimal)
assert result[obis.ELECTRICITY_DELIVERED_TARIFF_1].value == Decimal('0')
# ELECTRICITY_DELIVERED_TARIFF_2 (1-0:2.8.2)
assert isinstance(result[obis.ELECTRICITY_DELIVERED_TARIFF_2], CosemObject)
assert result[obis.ELECTRICITY_DELIVERED_TARIFF_2].unit == 'kWh'
assert isinstance(result[obis.ELECTRICITY_DELIVERED_TARIFF_2].value, Decimal)
assert result[obis.ELECTRICITY_DELIVERED_TARIFF_2].value == Decimal('0')
# ELECTRICITY_ACTIVE_TARIFF (0-0:96.14.0)
assert isinstance(result[obis.ELECTRICITY_ACTIVE_TARIFF], CosemObject)
assert result[obis.ELECTRICITY_ACTIVE_TARIFF].unit is None
assert isinstance(result[obis.ELECTRICITY_ACTIVE_TARIFF].value, str)
assert result[obis.ELECTRICITY_ACTIVE_TARIFF].value == '0002'
# EQUIPMENT_IDENTIFIER (0-0:96.1.1)
assert isinstance(result[obis.EQUIPMENT_IDENTIFIER], CosemObject)
assert result[obis.EQUIPMENT_IDENTIFIER].unit is None
assert isinstance(result[obis.EQUIPMENT_IDENTIFIER].value, str)
assert result[obis.EQUIPMENT_IDENTIFIER].value == '3960221976967177082151037881335713'
# CURRENT_ELECTRICITY_USAGE (1-0:1.7.0)
assert isinstance(result[obis.CURRENT_ELECTRICITY_USAGE], CosemObject)
assert result[obis.CURRENT_ELECTRICITY_USAGE].unit == 'kW'
assert isinstance(result[obis.CURRENT_ELECTRICITY_USAGE].value, Decimal)
assert result[obis.CURRENT_ELECTRICITY_USAGE].value == Decimal('2.027')
# CURRENT_ELECTRICITY_DELIVERY (1-0:2.7.0)
assert isinstance(result[obis.CURRENT_ELECTRICITY_DELIVERY], CosemObject)
assert result[obis.CURRENT_ELECTRICITY_DELIVERY].unit == 'kW'
assert isinstance(result[obis.CURRENT_ELECTRICITY_DELIVERY].value, Decimal)
assert result[obis.CURRENT_ELECTRICITY_DELIVERY].value == Decimal('0')
# SHORT_POWER_FAILURE_COUNT (1-0:96.7.21)
assert isinstance(result[obis.SHORT_POWER_FAILURE_COUNT], CosemObject)
assert result[obis.SHORT_POWER_FAILURE_COUNT].unit is None
assert isinstance(result[obis.SHORT_POWER_FAILURE_COUNT].value, int)
assert result[obis.SHORT_POWER_FAILURE_COUNT].value == 15
# LONG_POWER_FAILURE_COUNT (96.7.9)
assert isinstance(result[obis.LONG_POWER_FAILURE_COUNT], CosemObject)
assert result[obis.LONG_POWER_FAILURE_COUNT].unit is None
assert isinstance(result[obis.LONG_POWER_FAILURE_COUNT].value, int)
assert result[obis.LONG_POWER_FAILURE_COUNT].value == 7
# VOLTAGE_SAG_L1_COUNT (1-0:32.32.0)
assert isinstance(result[obis.VOLTAGE_SAG_L1_COUNT], CosemObject)
assert result[obis.VOLTAGE_SAG_L1_COUNT].unit is None
assert isinstance(result[obis.VOLTAGE_SAG_L1_COUNT].value, int)
assert result[obis.VOLTAGE_SAG_L1_COUNT].value == 0
# VOLTAGE_SAG_L2_COUNT (1-0:52.32.0)
assert isinstance(result[obis.VOLTAGE_SAG_L2_COUNT], CosemObject)
assert result[obis.VOLTAGE_SAG_L2_COUNT].unit is None
assert isinstance(result[obis.VOLTAGE_SAG_L2_COUNT].value, int)
assert result[obis.VOLTAGE_SAG_L2_COUNT].value == 0
# VOLTAGE_SAG_L3_COUNT (1-0:72.32.0)
assert isinstance(result[obis.VOLTAGE_SAG_L3_COUNT], CosemObject)
assert result[obis.VOLTAGE_SAG_L3_COUNT].unit is None
assert isinstance(result[obis.VOLTAGE_SAG_L3_COUNT].value, int)
assert result[obis.VOLTAGE_SAG_L3_COUNT].value == 0
# VOLTAGE_SWELL_L1_COUNT (1-0:32.36.0)
assert isinstance(result[obis.VOLTAGE_SWELL_L1_COUNT], CosemObject)
assert result[obis.VOLTAGE_SWELL_L1_COUNT].unit is None
assert isinstance(result[obis.VOLTAGE_SWELL_L1_COUNT].value, int)
assert result[obis.VOLTAGE_SWELL_L1_COUNT].value == 0
# VOLTAGE_SWELL_L2_COUNT (1-0:52.36.0)
assert isinstance(result[obis.VOLTAGE_SWELL_L2_COUNT], CosemObject)
assert result[obis.VOLTAGE_SWELL_L2_COUNT].unit is None
assert isinstance(result[obis.VOLTAGE_SWELL_L2_COUNT].value, int)
assert result[obis.VOLTAGE_SWELL_L2_COUNT].value == 0
# VOLTAGE_SWELL_L3_COUNT (1-0:72.36.0)
assert isinstance(result[obis.VOLTAGE_SWELL_L3_COUNT], CosemObject)
assert result[obis.VOLTAGE_SWELL_L3_COUNT].unit is None
assert isinstance(result[obis.VOLTAGE_SWELL_L3_COUNT].value, int)
assert result[obis.VOLTAGE_SWELL_L3_COUNT].value == 0
# TEXT_MESSAGE_CODE (0-0:96.13.1)
assert isinstance(result[obis.TEXT_MESSAGE_CODE], CosemObject)
assert result[obis.TEXT_MESSAGE_CODE].unit is None
assert result[obis.TEXT_MESSAGE_CODE].value is None
# TEXT_MESSAGE (0-0:96.13.0)
assert isinstance(result[obis.TEXT_MESSAGE], CosemObject)
assert result[obis.TEXT_MESSAGE].unit is None
assert result[obis.TEXT_MESSAGE].value is None
# INSTANTANEOUS_CURRENT_L1 (1-0:31.7.0)
assert isinstance(result[obis.INSTANTANEOUS_CURRENT_L1], CosemObject)
assert result[obis.INSTANTANEOUS_CURRENT_L1].unit == 'A'
assert isinstance(result[obis.INSTANTANEOUS_CURRENT_L1].value, Decimal)
assert result[obis.INSTANTANEOUS_CURRENT_L1].value == Decimal('0')
# INSTANTANEOUS_CURRENT_L2 (1-0:51.7.0)
assert isinstance(result[obis.INSTANTANEOUS_CURRENT_L2], CosemObject)
assert result[obis.INSTANTANEOUS_CURRENT_L2].unit == 'A'
assert isinstance(result[obis.INSTANTANEOUS_CURRENT_L2].value, Decimal)
assert result[obis.INSTANTANEOUS_CURRENT_L2].value == Decimal('6')
# INSTANTANEOUS_CURRENT_L3 (1-0:71.7.0)
assert isinstance(result[obis.INSTANTANEOUS_CURRENT_L3], CosemObject)
assert result[obis.INSTANTANEOUS_CURRENT_L3].unit == 'A'
assert isinstance(result[obis.INSTANTANEOUS_CURRENT_L3].value, Decimal)
assert result[obis.INSTANTANEOUS_CURRENT_L3].value == Decimal('2')
# DEVICE_TYPE (0-x:24.1.0)
assert isinstance(result[obis.DEVICE_TYPE], CosemObject)
assert result[obis.DEVICE_TYPE].unit is None
assert isinstance(result[obis.DEVICE_TYPE].value, int)
assert result[obis.DEVICE_TYPE].value == 3
# INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE (1-0:21.7.0)
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE], CosemObject)
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE].unit == 'kW'
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE].value, Decimal)
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE].value == Decimal('0.170')
# INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE (1-0:41.7.0)
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE], CosemObject)
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE].unit == 'kW'
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE].value, Decimal)
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE].value == Decimal('1.247')
# INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE (1-0:61.7.0)
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE], CosemObject)
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE].unit == 'kW'
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE].value, Decimal)
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE].value == Decimal('0.209')
# INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE (1-0:22.7.0)
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE], CosemObject)
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE].unit == 'kW'
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE].value, Decimal)
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE].value == Decimal('0')
# INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE (1-0:42.7.0)
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE], CosemObject)
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE].unit == 'kW'
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE].value, Decimal)
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE].value == Decimal('0')
# INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE (1-0:62.7.0)
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE], CosemObject)
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE].unit == 'kW'
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE].value, Decimal)
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE].value == Decimal('0')
# EQUIPMENT_IDENTIFIER_GAS (0-x:96.1.0)
assert isinstance(result[obis.EQUIPMENT_IDENTIFIER_GAS], CosemObject)
assert result[obis.EQUIPMENT_IDENTIFIER_GAS].unit is None
assert isinstance(result[obis.EQUIPMENT_IDENTIFIER_GAS].value, str)
assert result[obis.EQUIPMENT_IDENTIFIER_GAS].value == '4819243993373755377509728609491464'
# HOURLY_GAS_METER_READING (0-1:24.2.1)
assert isinstance(result[obis.HOURLY_GAS_METER_READING], MBusObject)
assert result[obis.HOURLY_GAS_METER_READING].unit == 'm3'
assert isinstance(result[obis.HOURLY_GAS_METER_READING].value, Decimal)
assert result[obis.HOURLY_GAS_METER_READING].value == Decimal('981.443')
# POWER_EVENT_FAILURE_LOG (99.97.0)
# TODO to be implemented
# ACTUAL_TRESHOLD_ELECTRICITY (0-0:17.0.0)
# TODO to be implemented
# ACTUAL_SWITCH_POSITION (0-0:96.3.10)
# TODO to be implemented
# VALVE_POSITION_GAS (0-x:24.4.0)
# TODO to be implemented
def test_checksum_valid(self):
# No exception is raised.
TelegramParser.validate_checksum(TELEGRAM_V4_2)
def test_checksum_invalid(self):
# Remove the electricty used data value. This causes the checksum to
# not match anymore.
corrupted_telegram = TELEGRAM_V4_2.replace(
'1-0:1.8.1(001581.123*kWh)\r\n',
''
)
with self.assertRaises(InvalidChecksumError):
TelegramParser.validate_checksum(corrupted_telegram)
def test_checksum_missing(self):
# Remove the checksum value causing a ParseError.
corrupted_telegram = TELEGRAM_V4_2.replace('!6796\r\n', '')
with self.assertRaises(ParseError):
TelegramParser.validate_checksum(corrupted_telegram)

View File

@@ -1,247 +0,0 @@
from decimal import Decimal
import datetime
import unittest
import pytz
from dsmr_parser import obis_references as obis
from dsmr_parser import telegram_specifications
from dsmr_parser.exceptions import InvalidChecksumError, ParseError
from dsmr_parser.objects import CosemObject, MBusObject
from dsmr_parser.parsers import TelegramParser
from test.example_telegrams import TELEGRAM_V5
class TelegramParserV5Test(unittest.TestCase):
""" Test parsing of a DSMR v5.x telegram. """
def test_parse(self):
parser = TelegramParser(telegram_specifications.V5)
result = parser.parse(TELEGRAM_V5)
# P1_MESSAGE_HEADER (1-3:0.2.8)
assert isinstance(result[obis.P1_MESSAGE_HEADER], CosemObject)
assert result[obis.P1_MESSAGE_HEADER].unit is None
assert isinstance(result[obis.P1_MESSAGE_HEADER].value, str)
assert result[obis.P1_MESSAGE_HEADER].value == '50'
# P1_MESSAGE_TIMESTAMP (0-0:1.0.0)
assert isinstance(result[obis.P1_MESSAGE_TIMESTAMP], CosemObject)
assert result[obis.P1_MESSAGE_TIMESTAMP].unit is None
assert isinstance(result[obis.P1_MESSAGE_TIMESTAMP].value, datetime.datetime)
assert result[obis.P1_MESSAGE_TIMESTAMP].value == \
datetime.datetime(2017, 1, 2, 18, 20, 2, tzinfo=pytz.UTC)
# ELECTRICITY_USED_TARIFF_1 (1-0:1.8.1)
assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_1], CosemObject)
assert result[obis.ELECTRICITY_USED_TARIFF_1].unit == 'kWh'
assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_1].value, Decimal)
assert result[obis.ELECTRICITY_USED_TARIFF_1].value == Decimal('4.426')
# ELECTRICITY_USED_TARIFF_2 (1-0:1.8.2)
assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_2], CosemObject)
assert result[obis.ELECTRICITY_USED_TARIFF_2].unit == 'kWh'
assert isinstance(result[obis.ELECTRICITY_USED_TARIFF_2].value, Decimal)
assert result[obis.ELECTRICITY_USED_TARIFF_2].value == Decimal('2.399')
# ELECTRICITY_DELIVERED_TARIFF_1 (1-0:2.8.1)
assert isinstance(result[obis.ELECTRICITY_DELIVERED_TARIFF_1], CosemObject)
assert result[obis.ELECTRICITY_DELIVERED_TARIFF_1].unit == 'kWh'
assert isinstance(result[obis.ELECTRICITY_DELIVERED_TARIFF_1].value, Decimal)
assert result[obis.ELECTRICITY_DELIVERED_TARIFF_1].value == Decimal('2.444')
# ELECTRICITY_DELIVERED_TARIFF_2 (1-0:2.8.2)
assert isinstance(result[obis.ELECTRICITY_DELIVERED_TARIFF_2], CosemObject)
assert result[obis.ELECTRICITY_DELIVERED_TARIFF_2].unit == 'kWh'
assert isinstance(result[obis.ELECTRICITY_DELIVERED_TARIFF_2].value, Decimal)
assert result[obis.ELECTRICITY_DELIVERED_TARIFF_2].value == Decimal('0')
# ELECTRICITY_ACTIVE_TARIFF (0-0:96.14.0)
assert isinstance(result[obis.ELECTRICITY_ACTIVE_TARIFF], CosemObject)
assert result[obis.ELECTRICITY_ACTIVE_TARIFF].unit is None
assert isinstance(result[obis.ELECTRICITY_ACTIVE_TARIFF].value, str)
assert result[obis.ELECTRICITY_ACTIVE_TARIFF].value == '0002'
# EQUIPMENT_IDENTIFIER (0-0:96.1.1)
assert isinstance(result[obis.EQUIPMENT_IDENTIFIER], CosemObject)
assert result[obis.EQUIPMENT_IDENTIFIER].unit is None
assert isinstance(result[obis.EQUIPMENT_IDENTIFIER].value, str)
assert result[obis.EQUIPMENT_IDENTIFIER].value == '4B384547303034303436333935353037'
# CURRENT_ELECTRICITY_USAGE (1-0:1.7.0)
assert isinstance(result[obis.CURRENT_ELECTRICITY_USAGE], CosemObject)
assert result[obis.CURRENT_ELECTRICITY_USAGE].unit == 'kW'
assert isinstance(result[obis.CURRENT_ELECTRICITY_USAGE].value, Decimal)
assert result[obis.CURRENT_ELECTRICITY_USAGE].value == Decimal('0.244')
# CURRENT_ELECTRICITY_DELIVERY (1-0:2.7.0)
assert isinstance(result[obis.CURRENT_ELECTRICITY_DELIVERY], CosemObject)
assert result[obis.CURRENT_ELECTRICITY_DELIVERY].unit == 'kW'
assert isinstance(result[obis.CURRENT_ELECTRICITY_DELIVERY].value, Decimal)
assert result[obis.CURRENT_ELECTRICITY_DELIVERY].value == Decimal('0')
# LONG_POWER_FAILURE_COUNT (96.7.9)
assert isinstance(result[obis.LONG_POWER_FAILURE_COUNT], CosemObject)
assert result[obis.LONG_POWER_FAILURE_COUNT].unit is None
assert isinstance(result[obis.LONG_POWER_FAILURE_COUNT].value, int)
assert result[obis.LONG_POWER_FAILURE_COUNT].value == 0
# SHORT_POWER_FAILURE_COUNT (1-0:96.7.21)
assert isinstance(result[obis.SHORT_POWER_FAILURE_COUNT], CosemObject)
assert result[obis.SHORT_POWER_FAILURE_COUNT].unit is None
assert isinstance(result[obis.SHORT_POWER_FAILURE_COUNT].value, int)
assert result[obis.SHORT_POWER_FAILURE_COUNT].value == 13
# VOLTAGE_SAG_L1_COUNT (1-0:32.32.0)
assert isinstance(result[obis.VOLTAGE_SAG_L1_COUNT], CosemObject)
assert result[obis.VOLTAGE_SAG_L1_COUNT].unit is None
assert isinstance(result[obis.VOLTAGE_SAG_L1_COUNT].value, int)
assert result[obis.VOLTAGE_SAG_L1_COUNT].value == 0
# VOLTAGE_SAG_L2_COUNT (1-0:52.32.0)
assert isinstance(result[obis.VOLTAGE_SAG_L2_COUNT], CosemObject)
assert result[obis.VOLTAGE_SAG_L2_COUNT].unit is None
assert isinstance(result[obis.VOLTAGE_SAG_L2_COUNT].value, int)
assert result[obis.VOLTAGE_SAG_L2_COUNT].value == 0
# VOLTAGE_SAG_L3_COUNT (1-0:72.32.0)
assert isinstance(result[obis.VOLTAGE_SAG_L3_COUNT], CosemObject)
assert result[obis.VOLTAGE_SAG_L3_COUNT].unit is None
assert isinstance(result[obis.VOLTAGE_SAG_L3_COUNT].value, int)
assert result[obis.VOLTAGE_SAG_L3_COUNT].value == 0
# VOLTAGE_SWELL_L1_COUNT (1-0:32.36.0)
assert isinstance(result[obis.VOLTAGE_SWELL_L1_COUNT], CosemObject)
assert result[obis.VOLTAGE_SWELL_L1_COUNT].unit is None
assert isinstance(result[obis.VOLTAGE_SWELL_L1_COUNT].value, int)
assert result[obis.VOLTAGE_SWELL_L1_COUNT].value == 0
# VOLTAGE_SWELL_L2_COUNT (1-0:52.36.0)
assert isinstance(result[obis.VOLTAGE_SWELL_L2_COUNT], CosemObject)
assert result[obis.VOLTAGE_SWELL_L2_COUNT].unit is None
assert isinstance(result[obis.VOLTAGE_SWELL_L2_COUNT].value, int)
assert result[obis.VOLTAGE_SWELL_L2_COUNT].value == 0
# VOLTAGE_SWELL_L3_COUNT (1-0:72.36.0)
assert isinstance(result[obis.VOLTAGE_SWELL_L3_COUNT], CosemObject)
assert result[obis.VOLTAGE_SWELL_L3_COUNT].unit is None
assert isinstance(result[obis.VOLTAGE_SWELL_L3_COUNT].value, int)
assert result[obis.VOLTAGE_SWELL_L3_COUNT].value == 0
# INSTANTANEOUS_VOLTAGE_L1 (1-0:32.7.0)
assert isinstance(result[obis.INSTANTANEOUS_VOLTAGE_L1], CosemObject)
assert result[obis.INSTANTANEOUS_VOLTAGE_L1].unit == 'V'
assert isinstance(result[obis.INSTANTANEOUS_VOLTAGE_L1].value, Decimal)
assert result[obis.INSTANTANEOUS_VOLTAGE_L1].value == Decimal('230.0')
# INSTANTANEOUS_VOLTAGE_L2 (1-0:52.7.0)
assert isinstance(result[obis.INSTANTANEOUS_VOLTAGE_L2], CosemObject)
assert result[obis.INSTANTANEOUS_VOLTAGE_L2].unit == 'V'
assert isinstance(result[obis.INSTANTANEOUS_VOLTAGE_L2].value, Decimal)
assert result[obis.INSTANTANEOUS_VOLTAGE_L2].value == Decimal('230.0')
# INSTANTANEOUS_VOLTAGE_L3 (1-0:72.7.0)
assert isinstance(result[obis.INSTANTANEOUS_VOLTAGE_L3], CosemObject)
assert result[obis.INSTANTANEOUS_VOLTAGE_L3].unit == 'V'
assert isinstance(result[obis.INSTANTANEOUS_VOLTAGE_L3].value, Decimal)
assert result[obis.INSTANTANEOUS_VOLTAGE_L3].value == Decimal('229.0')
# INSTANTANEOUS_CURRENT_L1 (1-0:31.7.0)
assert isinstance(result[obis.INSTANTANEOUS_CURRENT_L1], CosemObject)
assert result[obis.INSTANTANEOUS_CURRENT_L1].unit == 'A'
assert isinstance(result[obis.INSTANTANEOUS_CURRENT_L1].value, Decimal)
assert result[obis.INSTANTANEOUS_CURRENT_L1].value == Decimal('0.48')
# INSTANTANEOUS_CURRENT_L2 (1-0:51.7.0)
assert isinstance(result[obis.INSTANTANEOUS_CURRENT_L2], CosemObject)
assert result[obis.INSTANTANEOUS_CURRENT_L2].unit == 'A'
assert isinstance(result[obis.INSTANTANEOUS_CURRENT_L2].value, Decimal)
assert result[obis.INSTANTANEOUS_CURRENT_L2].value == Decimal('0.44')
# INSTANTANEOUS_CURRENT_L3 (1-0:71.7.0)
assert isinstance(result[obis.INSTANTANEOUS_CURRENT_L3], CosemObject)
assert result[obis.INSTANTANEOUS_CURRENT_L3].unit == 'A'
assert isinstance(result[obis.INSTANTANEOUS_CURRENT_L3].value, Decimal)
assert result[obis.INSTANTANEOUS_CURRENT_L3].value == Decimal('0.86')
# TEXT_MESSAGE (0-0:96.13.0)
assert isinstance(result[obis.TEXT_MESSAGE], CosemObject)
assert result[obis.TEXT_MESSAGE].unit is None
assert result[obis.TEXT_MESSAGE].value is None
# DEVICE_TYPE (0-x:24.1.0)
assert isinstance(result[obis.DEVICE_TYPE], CosemObject)
assert result[obis.DEVICE_TYPE].unit is None
assert isinstance(result[obis.DEVICE_TYPE].value, int)
assert result[obis.DEVICE_TYPE].value == 3
# INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE (1-0:21.7.0)
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE], CosemObject)
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE].unit == 'kW'
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE].value, Decimal)
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE].value == Decimal('0.070')
# INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE (1-0:41.7.0)
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE], CosemObject)
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE].unit == 'kW'
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE].value, Decimal)
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE].value == Decimal('0.032')
# INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE (1-0:61.7.0)
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE], CosemObject)
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE].unit == 'kW'
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE].value, Decimal)
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE].value == Decimal('0.142')
# INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE (1-0:22.7.0)
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE], CosemObject)
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE].unit == 'kW'
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE].value, Decimal)
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE].value == Decimal('0')
# INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE (1-0:42.7.0)
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE], CosemObject)
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE].unit == 'kW'
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE].value, Decimal)
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE].value == Decimal('0')
# INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE (1-0:62.7.0)
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE], CosemObject)
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE].unit == 'kW'
assert isinstance(result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE].value, Decimal)
assert result[obis.INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE].value == Decimal('0')
# EQUIPMENT_IDENTIFIER_GAS (0-x:96.1.0)
assert isinstance(result[obis.EQUIPMENT_IDENTIFIER_GAS], CosemObject)
assert result[obis.EQUIPMENT_IDENTIFIER_GAS].unit is None
assert isinstance(result[obis.EQUIPMENT_IDENTIFIER_GAS].value, str)
assert result[obis.EQUIPMENT_IDENTIFIER_GAS].value == '3232323241424344313233343536373839'
# HOURLY_GAS_METER_READING (0-1:24.2.1)
assert isinstance(result[obis.HOURLY_GAS_METER_READING], MBusObject)
assert result[obis.HOURLY_GAS_METER_READING].unit == 'm3'
assert isinstance(result[obis.HOURLY_GAS_METER_READING].value, Decimal)
assert result[obis.HOURLY_GAS_METER_READING].value == Decimal('0.107')
def test_checksum_valid(self):
# No exception is raised.
TelegramParser.validate_checksum(TELEGRAM_V5)
def test_checksum_invalid(self):
# Remove the electricty used data value. This causes the checksum to
# not match anymore.
corrupted_telegram = TELEGRAM_V5.replace(
'1-0:1.8.1(000004.426*kWh)\r\n',
''
)
with self.assertRaises(InvalidChecksumError):
TelegramParser.validate_checksum(corrupted_telegram)
def test_checksum_missing(self):
# Remove the checksum value causing a ParseError.
corrupted_telegram = TELEGRAM_V5.replace('!87B3\r\n', '')
with self.assertRaises(ParseError):
TelegramParser.validate_checksum(corrupted_telegram)

View File

@@ -1,54 +0,0 @@
from unittest.mock import Mock
import unittest
from dsmr_parser import obis_references as obis
from dsmr_parser import telegram_specifications
from dsmr_parser.parsers import TelegramParser
from dsmr_parser.clients.protocol import DSMRProtocol
TELEGRAM_V2_2 = (
'/ISk5\2MT382-1004\r\n'
'\r\n'
'0-0:96.1.1(00000000000000)\r\n'
'1-0:1.8.1(00001.001*kWh)\r\n'
'1-0:1.8.2(00001.001*kWh)\r\n'
'1-0:2.8.1(00001.001*kWh)\r\n'
'1-0:2.8.2(00001.001*kWh)\r\n'
'0-0:96.14.0(0001)\r\n'
'1-0:1.7.0(0001.01*kW)\r\n'
'1-0:2.7.0(0000.00*kW)\r\n'
'0-0:17.0.0(0999.00*kW)\r\n'
'0-0:96.3.10(1)\r\n'
'0-0:96.13.1()\r\n'
'0-0:96.13.0()\r\n'
'0-1:24.1.0(3)\r\n'
'0-1:96.1.0(000000000000)\r\n'
'0-1:24.3.0(161107190000)(00)(60)(1)(0-1:24.2.1)(m3)\r\n'
'(00001.001)\r\n'
'0-1:24.4.0(1)\r\n'
'!\r\n'
)
class ProtocolTest(unittest.TestCase):
def setUp(self):
telegram_parser = TelegramParser(telegram_specifications.V2_2)
self.protocol = DSMRProtocol(None, telegram_parser,
telegram_callback=Mock())
def test_complete_packet(self):
"""Protocol should assemble incoming lines into complete packet."""
self.protocol.data_received(TELEGRAM_V2_2.encode('ascii'))
telegram = self.protocol.telegram_callback.call_args_list[0][0][0]
assert isinstance(telegram, dict)
assert float(telegram[obis.CURRENT_ELECTRICITY_USAGE].value) == 1.01
assert telegram[obis.CURRENT_ELECTRICITY_USAGE].unit == 'kW'
assert float(telegram[obis.GAS_METER_READING].value) == 1.001
assert telegram[obis.GAS_METER_READING].unit == 'm3'

View File

@@ -1,322 +0,0 @@
import unittest
import datetime
import pytz
from dsmr_parser import telegram_specifications
from dsmr_parser import obis_name_mapping
from dsmr_parser.objects import CosemObject
from dsmr_parser.objects import MBusObject
from dsmr_parser.objects import Telegram
from dsmr_parser.objects import ProfileGenericObject
from dsmr_parser.parsers import TelegramParser
from test.example_telegrams import TELEGRAM_V4_2
from decimal import Decimal
class TelegramTest(unittest.TestCase):
""" Test instantiation of Telegram object """
def __init__(self, *args, **kwargs):
self.item_names_tested = []
super(TelegramTest, self).__init__(*args, **kwargs)
def verify_telegram_item(self, telegram, testitem_name, object_type, unit_val, value_type, value_val):
testitem = eval("telegram.{}".format(testitem_name))
assert isinstance(testitem, object_type)
assert testitem.unit == unit_val
assert isinstance(testitem.value, value_type)
assert testitem.value == value_val
self.item_names_tested.append(testitem_name)
def test_instantiate(self):
parser = TelegramParser(telegram_specifications.V4)
telegram = Telegram(TELEGRAM_V4_2, parser, telegram_specifications.V4)
# P1_MESSAGE_HEADER (1-3:0.2.8)
self.verify_telegram_item(telegram,
'P1_MESSAGE_HEADER',
object_type=CosemObject,
unit_val=None,
value_type=str,
value_val='42')
# P1_MESSAGE_TIMESTAMP (0-0:1.0.0)
self.verify_telegram_item(telegram,
'P1_MESSAGE_TIMESTAMP',
CosemObject,
unit_val=None,
value_type=datetime.datetime,
value_val=datetime.datetime(2016, 11, 13, 19, 57, 57, tzinfo=pytz.UTC))
# ELECTRICITY_USED_TARIFF_1 (1-0:1.8.1)
self.verify_telegram_item(telegram,
'ELECTRICITY_USED_TARIFF_1',
object_type=CosemObject,
unit_val='kWh',
value_type=Decimal,
value_val=Decimal('1581.123'))
# ELECTRICITY_USED_TARIFF_2 (1-0:1.8.2)
self.verify_telegram_item(telegram,
'ELECTRICITY_USED_TARIFF_2',
object_type=CosemObject,
unit_val='kWh',
value_type=Decimal,
value_val=Decimal('1435.706'))
# ELECTRICITY_DELIVERED_TARIFF_1 (1-0:2.8.1)
self.verify_telegram_item(telegram,
'ELECTRICITY_DELIVERED_TARIFF_1',
object_type=CosemObject,
unit_val='kWh',
value_type=Decimal,
value_val=Decimal('0'))
# ELECTRICITY_DELIVERED_TARIFF_2 (1-0:2.8.2)
self.verify_telegram_item(telegram,
'ELECTRICITY_DELIVERED_TARIFF_2',
object_type=CosemObject,
unit_val='kWh',
value_type=Decimal,
value_val=Decimal('0'))
# ELECTRICITY_ACTIVE_TARIFF (0-0:96.14.0)
self.verify_telegram_item(telegram,
'ELECTRICITY_ACTIVE_TARIFF',
object_type=CosemObject,
unit_val=None,
value_type=str,
value_val='0002')
# EQUIPMENT_IDENTIFIER (0-0:96.1.1)
self.verify_telegram_item(telegram,
'EQUIPMENT_IDENTIFIER',
object_type=CosemObject,
unit_val=None,
value_type=str,
value_val='3960221976967177082151037881335713')
# CURRENT_ELECTRICITY_USAGE (1-0:1.7.0)
self.verify_telegram_item(telegram,
'CURRENT_ELECTRICITY_USAGE',
object_type=CosemObject,
unit_val='kW',
value_type=Decimal,
value_val=Decimal('2.027'))
# CURRENT_ELECTRICITY_DELIVERY (1-0:2.7.0)
self.verify_telegram_item(telegram,
'CURRENT_ELECTRICITY_DELIVERY',
object_type=CosemObject,
unit_val='kW',
value_type=Decimal,
value_val=Decimal('0'))
# SHORT_POWER_FAILURE_COUNT (1-0:96.7.21)
self.verify_telegram_item(telegram,
'SHORT_POWER_FAILURE_COUNT',
object_type=CosemObject,
unit_val=None,
value_type=int,
value_val=15)
# LONG_POWER_FAILURE_COUNT (96.7.9)
self.verify_telegram_item(telegram,
'LONG_POWER_FAILURE_COUNT',
object_type=CosemObject,
unit_val=None,
value_type=int,
value_val=7)
# VOLTAGE_SAG_L1_COUNT (1-0:32.32.0)
self.verify_telegram_item(telegram,
'VOLTAGE_SAG_L1_COUNT',
object_type=CosemObject,
unit_val=None,
value_type=int,
value_val=0)
# VOLTAGE_SAG_L2_COUNT (1-0:52.32.0)
self.verify_telegram_item(telegram,
'VOLTAGE_SAG_L2_COUNT',
object_type=CosemObject,
unit_val=None,
value_type=int,
value_val=0)
# VOLTAGE_SAG_L3_COUNT (1-0:72.32.0)
self.verify_telegram_item(telegram,
'VOLTAGE_SAG_L3_COUNT',
object_type=CosemObject,
unit_val=None,
value_type=int,
value_val=0)
# VOLTAGE_SWELL_L1_COUNT (1-0:32.36.0)
self.verify_telegram_item(telegram,
'VOLTAGE_SWELL_L1_COUNT',
object_type=CosemObject,
unit_val=None,
value_type=int,
value_val=0)
# VOLTAGE_SWELL_L2_COUNT (1-0:52.36.0)
self.verify_telegram_item(telegram,
'VOLTAGE_SWELL_L2_COUNT',
object_type=CosemObject,
unit_val=None,
value_type=int,
value_val=0)
# VOLTAGE_SWELL_L3_COUNT (1-0:72.36.0)
self.verify_telegram_item(telegram,
'VOLTAGE_SWELL_L3_COUNT',
object_type=CosemObject,
unit_val=None,
value_type=int,
value_val=0)
# TEXT_MESSAGE_CODE (0-0:96.13.1)
self.verify_telegram_item(telegram,
'TEXT_MESSAGE_CODE',
object_type=CosemObject,
unit_val=None,
value_type=type(None),
value_val=None)
# TEXT_MESSAGE (0-0:96.13.0)
self.verify_telegram_item(telegram,
'TEXT_MESSAGE',
object_type=CosemObject,
unit_val=None,
value_type=type(None),
value_val=None)
# INSTANTANEOUS_CURRENT_L1 (1-0:31.7.0)
self.verify_telegram_item(telegram,
'INSTANTANEOUS_CURRENT_L1',
object_type=CosemObject,
unit_val='A',
value_type=Decimal,
value_val=Decimal('0'))
# INSTANTANEOUS_CURRENT_L2 (1-0:51.7.0)
self.verify_telegram_item(telegram,
'INSTANTANEOUS_CURRENT_L2',
object_type=CosemObject,
unit_val='A',
value_type=Decimal,
value_val=Decimal('6'))
# INSTANTANEOUS_CURRENT_L3 (1-0:71.7.0)
self.verify_telegram_item(telegram,
'INSTANTANEOUS_CURRENT_L3',
object_type=CosemObject,
unit_val='A',
value_type=Decimal,
value_val=Decimal('2'))
# DEVICE_TYPE (0-x:24.1.0)
self.verify_telegram_item(telegram,
'DEVICE_TYPE',
object_type=CosemObject,
unit_val=None,
value_type=int,
value_val=3)
# INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE (1-0:21.7.0)
self.verify_telegram_item(telegram,
'INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE',
object_type=CosemObject,
unit_val='kW',
value_type=Decimal,
value_val=Decimal('0.170'))
# INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE (1-0:41.7.0)
self.verify_telegram_item(telegram,
'INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE',
object_type=CosemObject,
unit_val='kW',
value_type=Decimal,
value_val=Decimal('1.247'))
# INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE (1-0:61.7.0)
self.verify_telegram_item(telegram,
'INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE',
object_type=CosemObject,
unit_val='kW',
value_type=Decimal,
value_val=Decimal('0.209'))
# INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE (1-0:22.7.0)
self.verify_telegram_item(telegram,
'INSTANTANEOUS_ACTIVE_POWER_L1_NEGATIVE',
object_type=CosemObject,
unit_val='kW',
value_type=Decimal,
value_val=Decimal('0'))
# INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE (1-0:42.7.0)
self.verify_telegram_item(telegram,
'INSTANTANEOUS_ACTIVE_POWER_L2_NEGATIVE',
object_type=CosemObject,
unit_val='kW',
value_type=Decimal,
value_val=Decimal('0'))
# INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE (1-0:62.7.0)
self.verify_telegram_item(telegram,
'INSTANTANEOUS_ACTIVE_POWER_L3_NEGATIVE',
object_type=CosemObject,
unit_val='kW',
value_type=Decimal,
value_val=Decimal('0'))
# EQUIPMENT_IDENTIFIER_GAS (0-x:96.1.0)
self.verify_telegram_item(telegram,
'EQUIPMENT_IDENTIFIER_GAS',
object_type=CosemObject,
unit_val=None,
value_type=str,
value_val='4819243993373755377509728609491464')
# HOURLY_GAS_METER_READING (0-1:24.2.1)
self.verify_telegram_item(telegram,
'HOURLY_GAS_METER_READING',
object_type=MBusObject,
unit_val='m3',
value_type=Decimal,
value_val=Decimal('981.443'))
# POWER_EVENT_FAILURE_LOG (1-0:99.97.0)
testitem_name = 'POWER_EVENT_FAILURE_LOG'
object_type = ProfileGenericObject
testitem = eval("telegram.{}".format(testitem_name))
assert isinstance(testitem, object_type)
assert testitem.buffer_length == 3
assert testitem.buffer_type == '0-0:96.7.19'
buffer = testitem.buffer
assert isinstance(testitem.buffer, list)
assert len(buffer) == 3
assert all([isinstance(item, MBusObject) for item in buffer])
date0 = datetime.datetime(2000, 1, 4, 17, 3, 20, tzinfo=datetime.timezone.utc)
date1 = datetime.datetime(1999, 12, 31, 23, 0, 1, tzinfo=datetime.timezone.utc)
date2 = datetime.datetime(2000, 1, 1, 23, 0, 3, tzinfo=datetime.timezone.utc)
assert buffer[0].datetime == date0
assert buffer[1].datetime == date1
assert buffer[2].datetime == date2
assert buffer[0].value == 237126
assert buffer[1].value == 2147583646
assert buffer[2].value == 2317482647
assert all([isinstance(item.value, int) for item in buffer])
assert all([isinstance(item.unit, str) for item in buffer])
assert all([(item.unit == 's') for item in buffer])
self.item_names_tested.append(testitem_name)
# check if all items in telegram V4 specification are covered
V4_name_list = [obis_name_mapping.EN[signature] for signature, parser in
telegram_specifications.V4['objects'].items()]
V4_name_set = set(V4_name_list)
item_names_tested_set = set(self.item_names_tested)
assert item_names_tested_set == V4_name_set

View File

@@ -1,105 +0,0 @@
import unittest
from dsmr_parser.clients.telegram_buffer import TelegramBuffer
from test.example_telegrams import TELEGRAM_V2_2, TELEGRAM_V4_2
class TelegramBufferTest(unittest.TestCase):
def setUp(self):
self.telegram_buffer = TelegramBuffer()
def test_v22_telegram(self):
self.telegram_buffer.append(TELEGRAM_V2_2)
telegram = next(self.telegram_buffer.get_all())
self.assertEqual(telegram, TELEGRAM_V2_2)
self.assertEqual(self.telegram_buffer._buffer, '')
def test_v42_telegram(self):
self.telegram_buffer.append(TELEGRAM_V4_2)
telegram = next(self.telegram_buffer.get_all())
self.assertEqual(telegram, TELEGRAM_V4_2)
self.assertEqual(self.telegram_buffer._buffer, '')
def test_multiple_mixed_telegrams(self):
self.telegram_buffer.append(
''.join((TELEGRAM_V2_2, TELEGRAM_V4_2, TELEGRAM_V2_2))
)
telegrams = list(self.telegram_buffer.get_all())
self.assertListEqual(
telegrams,
[
TELEGRAM_V2_2,
TELEGRAM_V4_2,
TELEGRAM_V2_2
]
)
self.assertEqual(self.telegram_buffer._buffer, '')
def test_v42_telegram_preceded_with_unclosed_telegram(self):
# There are unclosed telegrams at the start of the buffer.
incomplete_telegram = TELEGRAM_V4_2[:-1]
self.telegram_buffer.append(incomplete_telegram + TELEGRAM_V4_2)
telegram = next(self.telegram_buffer.get_all())
self.assertEqual(telegram, TELEGRAM_V4_2)
self.assertEqual(self.telegram_buffer._buffer, '')
def test_v42_telegram_preceded_with_unopened_telegram(self):
# There is unopened telegrams at the start of the buffer indicating that
# the buffer was being filled while the telegram was outputted halfway.
incomplete_telegram = TELEGRAM_V4_2[1:]
self.telegram_buffer.append(incomplete_telegram + TELEGRAM_V4_2)
telegram = next(self.telegram_buffer.get_all())
self.assertEqual(telegram, TELEGRAM_V4_2)
self.assertEqual(self.telegram_buffer._buffer, '')
def test_v42_telegram_trailed_by_unclosed_telegram(self):
incomplete_telegram = TELEGRAM_V4_2[:-1]
self.telegram_buffer.append(TELEGRAM_V4_2 + incomplete_telegram)
telegram = next(self.telegram_buffer.get_all())
self.assertEqual(telegram, TELEGRAM_V4_2)
self.assertEqual(self.telegram_buffer._buffer, incomplete_telegram)
def test_v42_telegram_trailed_by_unopened_telegram(self):
incomplete_telegram = TELEGRAM_V4_2[1:]
self.telegram_buffer.append(TELEGRAM_V4_2 + incomplete_telegram)
telegram = next(self.telegram_buffer.get_all())
self.assertEqual(telegram, TELEGRAM_V4_2)
self.assertEqual(self.telegram_buffer._buffer, incomplete_telegram)
def test_v42_telegram_adding_line_by_line(self):
for line in TELEGRAM_V4_2.splitlines(keepends=True):
self.telegram_buffer.append(line)
telegram = next(self.telegram_buffer.get_all())
self.assertEqual(telegram, TELEGRAM_V4_2)
self.assertEqual(self.telegram_buffer._buffer, '')
def test_v42_telegram_adding_char_by_char(self):
for char in TELEGRAM_V4_2:
self.telegram_buffer.append(char)
telegram = next(self.telegram_buffer.get_all())
self.assertEqual(telegram, TELEGRAM_V4_2)
self.assertEqual(self.telegram_buffer._buffer, '')

View File

@@ -1,26 +0,0 @@
[tox]
envlist = py34,py35,py36,py37
[testenv]
deps=
pytest
pytest-cov
pylama
pytest-asyncio
pytest-catchlog
pytest-mock
commands=
py.test --cov=dsmr_parser test {posargs}
pylama dsmr_parser test
[pylama:dsmr_parser/clients/__init__.py]
ignore = W0611
[pylama:dsmr_parser/parsers.py]
ignore = W605
[pylama:pylint]
max_line_length = 120
[pylama:pycodestyle]
max_line_length = 120

View File

@@ -1,11 +1,17 @@
from dsmr_parser.objects import Telegram
from homie.node.node_base import Node_Base from homie.node.node_base import Node_Base
from homie.node.property.property_enum import Property_Enum from homie.node.property.property_enum import Property_Enum
from homie.node.property.property_integer import Property_Integer from homie.node.property.property_integer import Property_Integer
from dsmr2mqtt.node.property.property_energy import Property_Energy from node.property.property_energy import Property_Energy
from dsmr2mqtt.node.property.property_power import Property_Power from node.property.property_power import Property_Power
from dsmr2mqtt.node.property.property_current import Property_Current from node.property.property_current import Property_Current
from dsmr2mqtt.node.property.property_voltage import Property_Voltage from node.property.property_voltage import Property_Voltage
import logging
logger = logging.getLogger(__name__)
class Node_ElectricityMeter(Node_Base): class Node_ElectricityMeter(Node_Base):
@@ -16,70 +22,50 @@ class Node_ElectricityMeter(Node_Base):
name="Electricity meter", name="Electricity meter",
type_="state", type_="state",
retain=True, retain=True,
qos=1, qos=1
state_values=None,
set_state=None,
): ):
assert state_values
assert set_state
logger.debug("Configuring electricity meter")
super().__init__(device, id, name, type_, retain, qos) super().__init__(device, id, name, type_, retain, qos)
self.add_property(Property_Integer(self, "tariff-indicator", "Tariff indicator", data_format="1:2", settable=False))
self.add_property(Property_Enum(self, "tariff", "Tariff", data_format="tariff1,tariff2", settable=False))
self._tariffs = {1: "tariff1", 2: "tariff2" }
self.add_property(Property_Integer(self, "tariff_indicator", "Tariff indicator", data_format="1:2", settable=False)) self.add_property(Property_Energy(self, "delivery-tariff1", "Delivery tariff 1"))
self.add_property(Property_Energy(self, "delivery-tariff2", "Delivery tariff 2"))
self.add_property(Property_Energy(self, "delivery_tariff1", "Delivery tariff 1"))
self.add_property(Property_Energy(self, "delivery_tariff2", "Delivery tariff 2"))
self.add_property(Property_Power(self, "power", "Power")) self.add_property(Property_Power(self, "power", "Power"))
self.add_property(Property_Voltage(self, "voltage_L1", "Voltage L1")) self.add_property(Property_Voltage(self, "voltage-l1", "Voltage L1"))
self.add_property(Property_Voltage(self, "voltage_L2", "Voltage L2")) self.add_property(Property_Voltage(self, "voltage-l2", "Voltage L2"))
self.add_property(Property_Voltage(self, "voltage_L3", "Voltage L3")) self.add_property(Property_Voltage(self, "voltage-l3", "Voltage L3"))
self.add_property(Property_Current(self, "current_L1", "Current L1")) self.add_property(Property_Current(self, "current-l1", "Current L1"))
self.add_property(Property_Current(self, "current_L2", "Current L2")) self.add_property(Property_Current(self, "current-l2", "Current L2"))
self.add_property(Property_Current(self, "current_L3", "Current L3")) self.add_property(Property_Current(self, "current-l3", "Current L3"))
self.add_property(Property_Power(self, "power_L1", "Power L1")) self.add_property(Property_Power(self, "power-l1", "Power L1"))
self.add_property(Property_Power(self, "power_L2", "Power L2")) self.add_property(Property_Power(self, "power-l2", "Power L2"))
self.add_property(Property_Power(self, "power_L3", "Power L3")) self.add_property(Property_Power(self, "power-l3", "Power L3"))
def update(self, telegram: Telegram):
logger.debug("Updating electricity meter properties")
self.get_property("tariff-indicator").value = int(telegram.ELECTRICITY_ACTIVE_TARIFF.value)
self.get_property("tariff").value = self._tariffs[int(telegram.ELECTRICITY_ACTIVE_TARIFF.value)]
self.get_property("delivery-tariff1").value = float(telegram.ELECTRICITY_USED_TARIFF_1.value)
self.get_property("delivery-tariff2").value = float(telegram.ELECTRICITY_USED_TARIFF_2.value)
self.get_property("power").value = float(telegram.CURRENT_ELECTRICITY_USAGE.value)
self.get_property("voltage-l1").value = float(telegram.INSTANTANEOUS_VOLTAGE_L1.value)
self.get_property("current-l1").value = float(telegram.INSTANTANEOUS_CURRENT_L1.value)
self.get_property("power-l1").value = float(telegram.INSTANTANEOUS_ACTIVE_POWER_L1_POSITIVE.value)
self.get_property("voltage-l2").value = float(telegram.INSTANTANEOUS_VOLTAGE_L2.value)
self.get_property("current-l2").value = float(telegram.INSTANTANEOUS_CURRENT_L2.value)
self.get_property("power-l2").value = float(telegram.INSTANTANEOUS_ACTIVE_POWER_L2_POSITIVE.value)
self.get_property("voltage-l3").value = float(telegram.INSTANTANEOUS_VOLTAGE_L3.value)
self.get_property("current-l3").value = float(telegram.INSTANTANEOUS_CURRENT_L3.value)
self.get_property("power-l3").value = float(telegram.INSTANTANEOUS_ACTIVE_POWER_L3_POSITIVE.value)
def update_status(self, telegram: str):
#Telegram
# 1-3:0.2.8(50)
# 0-0:1.0.0(200603122725S)
# 0-0:96.1.1(4530303438303030303339393038333139)
# 1-0:1.8.1(001807.864*kWh)
# 1-0:1.8.2(001173.872*kWh)
# 1-0:2.8.1(000000.091*kWh)
# 1-0:2.8.2(000000.000*kWh)
# 0-0:96.14.0(0002)
# 1-0:1.7.0(00.909*kW)
# 1-0:2.7.0(00.000*kW)
# 0-0:96.7.21(00016)
# 0-0:96.7.9(00003)
# 1-0:99.97.0(0)(0-0:96.7.19)
# 1-0:32.32.0(00002)
# 1-0:52.32.0(00002)
# 1-0:72.32.0(00002)
# 1-0:32.36.0(00000)
# 1-0:52.36.0(00000)
# 1-0:72.36.0(00000)
# 0-0:96.13.0()
# 1-0:32.7.0(235.0*V)
# 1-0:52.7.0(237.0*V)
# 1-0:72.7.0(236.0*V)
# 1-0:31.7.0(001*A)
# 1-0:51.7.0(001*A)
# 1-0:71.7.0(001*A)
# 1-0:21.7.0(00.290*kW)
# 1-0:41.7.0(00.268*kW)
# 1-0:61.7.0(00.350*kW)
# 1-0:22.7.0(00.000*kW)
# 1-0:42.7.0(00.000*kW)
# 1-0:62.7.0(00.000*kW)
# 0-1:24.1.0(003)
# 0-1:96.1.0(4730303732303033393435373234323139)
# 0-1:24.2.1(200603122500S)(01741.782*m3)
pass

View File

@@ -1,10 +1,15 @@
import logging
from dsmr_parser.objects import Telegram
from homie.node.node_base import Node_Base from homie.node.node_base import Node_Base
from homie.node.property.property_enum import Property_Enum from homie.node.property.property_enum import Property_Enum
from dsmr2mqtt.node.property.property_volume import Property_Volume
from .property.property_volume import Property_Volume
logger = logging.getLogger(__name__)
class Node_Gasmeter(Node_Base): class Node_GasMeter(Node_Base):
def __init__( def __init__(
self, self,
device, device,
@@ -12,20 +17,13 @@ class Node_Gasmeter(Node_Base):
name="Gas meter", name="Gas meter",
type_="state", type_="state",
retain=True, retain=True,
qos=1, qos=1
state_values=None, ):
set_state=None, logger.debug("Configuring gas meter")
):
assert state_values
assert set_state
super().__init__(device, id, name, type_, retain, qos) super().__init__(device, id, name, type_, retain, qos)
self.add_property( self.add_property(Property_Volume(self, "volume", "Volume"))
Property_Volume(
self, "volume", "Volume", data_format=state_values, set_value=set_state
)
)
def update_volume(self, volume): def update(self, telegram: Telegram):
self.get_property("volume").value = volume logger.debug("Updating electricity meter properties")
self.get_property("volume").value = float(telegram.HOURLY_GAS_METER_READING.value)

View File

@@ -10,7 +10,7 @@ class Property_Power(Property_Float):
settable=False, settable=False,
retained=True, retained=True,
qos=1, qos=1,
unit="W", unit="kW",
data_type=None, data_type=None,
data_format=None, data_format=None,
value=None, value=None,

134
requirements.txt Normal file
View File

@@ -0,0 +1,134 @@
appdirs==1.4.3
asn1crypto==0.24.0
astroid==2.1.0
asttokens==1.1.13
automationhat==0.2.0
beautifulsoup4==4.7.1
blinker==1.4
blinkt==0.1.2
buttonshim==0.0.2
Cap1xxx==0.1.3
certifi==2018.8.24
chardet==3.0.4
Click==7.0
colorama==0.3.7
colorzero==1.1
cookies==2.2.1
cryptography==2.6.1
cycler==0.10.0
decorator==4.3.0
docutils==0.14
drumhat==0.1.0
dsmr-parser==0.21
entrypoints==0.3
envirophat==1.0.0
ExplorerHAT==0.4.2
Flask==1.0.2
fourletterphat==0.1.0
gpiozero==1.5.1
guizero==0.6.0
Homie4==0.3.5
html5lib==1.0.1
idna==2.6
ipykernel==4.9.0
ipython==5.8.0
ipython-genutils==0.2.0
isort==4.3.4
itsdangerous==0.24
jedi==0.13.2
Jinja2==2.10
jupyter-client==5.2.3
jupyter-core==4.4.0
keyring==17.1.1
keyrings.alt==3.1.1
kiwisolver==1.0.1
lazy-object-proxy==1.3.1
logilab-common==1.4.2
lxml==4.3.2
MarkupSafe==1.1.0
matplotlib==3.0.2
mccabe==0.6.1
microdotphat==0.2.1
mote==0.0.4
motephat==0.0.2
mypy==0.670
mypy-extensions==0.4.1
netifaces==0.10.9
nudatus==0.0.4
numpy==1.16.2
oauthlib==2.1.0
olefile==0.46
paho-mqtt==1.5.0
pantilthat==0.0.7
parso==0.3.1
pexpect==4.6.0
pgzero==1.2
phatbeat==0.1.1
pianohat==0.1.0
picamera==1.13
pickleshare==0.7.5
picraft==1.0
piglow==1.2.5
pigpio==1.44
Pillow==5.4.1
prompt-toolkit==1.0.15
psutil==5.5.1
pycairo==1.16.2
pycodestyle==2.4.0
pycrypto==2.6.1
pydantic==1.5.1
pyflakes==2.0.0
pygame==1.9.4.post1
Pygments==2.3.1
PyGObject==3.30.4
pyinotify==0.9.6
PyJWT==1.7.0
pylint==2.2.2
pyOpenSSL==19.0.0
pyparsing==2.2.0
pyserial==3.4
pyserial-asyncio==0.4
python-apt==1.8.4.1
python-dateutil==2.7.3
python-dotenv==0.13.0
pytz==2020.1
pyxdg==0.25
pyzmq==17.1.2
qtconsole==4.3.1
rainbowhat==0.1.0
requests==2.21.0
requests-oauthlib==1.0.0
responses==0.9.0
roman==2.0.0
RPi.GPIO==0.7.0
RTIMULib==7.2.1
scrollphat==0.0.7
scrollphathd==1.2.1
SecretStorage==2.3.1
semver==2.0.1
Send2Trash==1.5.0
sense-emu==1.1
sense-hat==2.2.0
simplegeneric==0.8.1
simplejson==3.16.0
six==1.12.0
skywriter==0.0.7
sn3218==1.2.7
soupsieve==1.8
spidev==3.4
ssh-import-id==5.7
tailer==0.4.1
thonny==3.2.6
tornado==5.1.1
touchphat==0.0.1
traitlets==4.3.2
twython==3.7.0
typed-ast==1.3.1
uflash==1.2.4
unicornhathd==0.0.4
urllib3==1.24.1
virtualenv==15.1.0
wcwidth==0.1.7
webencodings==0.5.1
Werkzeug==0.14.1
wrapt==1.10.11