254 lines
7.3 KiB
Python
254 lines
7.3 KiB
Python
import logging
|
||
import re
|
||
|
||
from PyCRC.CRC16 import CRC16
|
||
|
||
from .objects import MBusObject, MBusObjectV2_2, CosemObject
|
||
from .exceptions import ParseError, InvalidChecksumError
|
||
from .obis_references import GAS_METER_READING
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class TelegramParser(object):
|
||
|
||
def __init__(self, telegram_specification):
|
||
"""
|
||
:param telegram_specification: determines how the telegram is parsed
|
||
:type telegram_specification: dict
|
||
"""
|
||
self.telegram_specification = telegram_specification
|
||
|
||
def _find_line_parser(self, line_value):
|
||
for obis_reference, parser in self.telegram_specification.items():
|
||
if re.search(obis_reference, line_value):
|
||
return obis_reference, parser
|
||
|
||
return None, None
|
||
|
||
def parse(self, telegram):
|
||
"""
|
||
Parse telegram from string to dict.
|
||
|
||
The telegram str type makes python 2.x integration easier.
|
||
|
||
:param str telegram: full telegram from start ('/') to checksum
|
||
('!ABCD') including line endings inbetween the telegram's lines
|
||
:rtype: dict
|
||
:returns: Shortened example:
|
||
{
|
||
..
|
||
r'0-0:96\.1\.1': <CosemObject>, # EQUIPMENT_IDENTIFIER
|
||
r'1-0:1\.8\.1': <CosemObject>, # ELECTRICITY_USED_TARIFF_1
|
||
r'0-\d:24\.3\.0': <MBusObject>, # GAS_METER_READING
|
||
..
|
||
}
|
||
"""
|
||
telegram_lines = telegram.splitlines()
|
||
parsed_lines = map(self.parse_line, telegram_lines)
|
||
|
||
return {obis_reference: dsmr_object
|
||
for obis_reference, dsmr_object in parsed_lines}
|
||
|
||
def parse_line(self, line):
|
||
logger.debug("Parsing line '%s'", line)
|
||
|
||
obis_reference, parser = self._find_line_parser(line)
|
||
|
||
if not obis_reference:
|
||
logger.debug("No line class found for: '%s'", line)
|
||
return None, None
|
||
|
||
return obis_reference, parser.parse(line)
|
||
|
||
|
||
class TelegramParserV4(TelegramParser):
|
||
|
||
@staticmethod
|
||
def validate_telegram_checksum(telegram):
|
||
"""
|
||
:param str telegram:
|
||
:raises ParseError:
|
||
:raises InvalidChecksumError:
|
||
"""
|
||
|
||
# Extract the part for which the checksum applies.
|
||
checksum_contents = re.search(r'\/.+\!', telegram, re.DOTALL)
|
||
|
||
# Extract the hexadecimal checksum value itself.
|
||
# The line ending '\r\n' for the checksum line can be ignored.
|
||
checksum_hex = re.search(r'((?<=\!)[0-9A-Z]{4})+', telegram)
|
||
|
||
if not checksum_contents or not checksum_hex:
|
||
raise ParseError(
|
||
'Failed to perform CRC validation because the telegram is '
|
||
'incomplete. The checksum and/or content values are missing.'
|
||
)
|
||
|
||
calculated_crc = CRC16().calculate(checksum_contents.group(0))
|
||
expected_crc = int(checksum_hex.group(0), base=16)
|
||
|
||
if calculated_crc != expected_crc:
|
||
raise InvalidChecksumError(
|
||
"Invalid telegram. The CRC checksum '{}' does not match the "
|
||
"expected '{}'".format(
|
||
calculated_crc,
|
||
expected_crc
|
||
)
|
||
)
|
||
|
||
def parse(self, telegram):
|
||
"""
|
||
:param str telegram:
|
||
:rtype: dict
|
||
"""
|
||
self.validate_telegram_checksum(telegram)
|
||
|
||
return super().parse(telegram)
|
||
|
||
|
||
class TelegramParserV2_2(TelegramParser):
|
||
|
||
def parse(self, telegram):
|
||
"""
|
||
:param str telegram:
|
||
:rtype: dict
|
||
"""
|
||
|
||
# TODO fix this in the specification: telegram_specifications.V2_2
|
||
def join_lines(telegram):
|
||
"""Join lines for gas meter."""
|
||
join_next = re.compile(GAS_METER_READING)
|
||
|
||
join = None
|
||
for line_value in telegram.splitlines():
|
||
if join:
|
||
yield join + line_value
|
||
join = None
|
||
elif join_next.match(line_value):
|
||
join = line_value
|
||
else:
|
||
yield line_value
|
||
|
||
# TODO temporary workaround
|
||
lines = join_lines(telegram)
|
||
telegram = '\r\n'.join(lines)
|
||
|
||
return super().parse(telegram)
|
||
|
||
|
||
class DSMRObjectParser(object):
|
||
|
||
def __init__(self, *value_formats):
|
||
self.value_formats = value_formats
|
||
|
||
def _parse(self, line):
|
||
# Match value groups, but exclude the parentheses
|
||
pattern = re.compile(r'((?<=\()[0-9a-zA-Z\.\*]{0,}(?=\)))+')
|
||
values = re.findall(pattern, line)
|
||
|
||
# Convert empty value groups to None for clarity.
|
||
values = [None if value == '' else value for value in values]
|
||
|
||
if not values or len(values) != len(self.value_formats):
|
||
raise ParseError("Invalid '%s' line for '%s'", line, self)
|
||
|
||
return [self.value_formats[i].parse(value)
|
||
for i, value in enumerate(values)]
|
||
|
||
|
||
class MBusParser(DSMRObjectParser):
|
||
"""
|
||
Gas meter value parser.
|
||
|
||
These are lines with a timestamp and gas meter value.
|
||
|
||
Line format:
|
||
'ID (TST) (Mv1*U1)'
|
||
|
||
1 2 3 4
|
||
|
||
1) OBIS Reduced ID-code
|
||
2) Time Stamp (TST) of capture time of measurement value
|
||
3) Measurement value 1 (most recent entry of buffer attribute without unit)
|
||
4) Unit of measurement values (Unit of capture objects attribute)
|
||
"""
|
||
|
||
def parse(self, line):
|
||
values = self._parse(line)
|
||
if len(values) == 2:
|
||
return MBusObject(values)
|
||
else:
|
||
return MBusObjectV2_2(values)
|
||
|
||
|
||
class CosemParser(DSMRObjectParser):
|
||
"""
|
||
Cosem object parser.
|
||
|
||
These are data objects with a single value that optionally have a unit of
|
||
measurement.
|
||
|
||
Line format:
|
||
ID (Mv*U)
|
||
|
||
1 23 45
|
||
|
||
1) OBIS Reduced ID-code
|
||
2) Separator “(“, ASCII 28h
|
||
3) COSEM object attribute value
|
||
4) Unit of measurement values (Unit of capture objects attribute) – only if applicable
|
||
5) Separator “)”, ASCII 29h
|
||
"""
|
||
|
||
def parse(self, line):
|
||
return CosemObject(self._parse(line))
|
||
|
||
|
||
class ProfileGenericParser(DSMRObjectParser):
|
||
"""
|
||
Power failure log parser.
|
||
|
||
These are data objects with multiple repeating groups of values.
|
||
|
||
Line format:
|
||
ID (z) (ID1) (TST) (Bv1*U1) (TST) (Bvz*Uz)
|
||
|
||
1 2 3 4 5 6 7 8 9
|
||
|
||
1) OBIS Reduced ID-code
|
||
2) Number of values z (max 10).
|
||
3) Identifications of buffer values (OBIS Reduced ID codes of capture objects attribute)
|
||
4) Time Stamp (TST) of power failure end time
|
||
5) Buffer value 1 (most recent entry of buffer attribute without unit)
|
||
6) Unit of buffer values (Unit of capture objects attribute)
|
||
7) Time Stamp (TST) of power failure end time
|
||
8) Buffer value 2 (oldest entry of buffer attribute without unit)
|
||
9) Unit of buffer values (Unit of capture objects attribute)
|
||
"""
|
||
|
||
def parse(self, line):
|
||
raise NotImplementedError()
|
||
|
||
|
||
class ValueParser(object):
|
||
|
||
def __init__(self, coerce_type):
|
||
self.coerce_type = coerce_type
|
||
|
||
def parse(self, value):
|
||
|
||
unit_of_measurement = None
|
||
|
||
if value and '*' in value:
|
||
value, unit_of_measurement = value.split('*')
|
||
|
||
# A value group is not required to have a value, and then coercing does
|
||
# not apply.
|
||
value = self.coerce_type(value) if value is not None else value
|
||
|
||
return {
|
||
'value': value,
|
||
'unit': unit_of_measurement
|
||
}
|