added telegram CRC verification

This commit is contained in:
Nigel Dokter
2016-12-28 20:29:34 +01:00
parent 81fd581e57
commit 1c69b4e9ee
9 changed files with 198 additions and 81 deletions

View File

@@ -1,8 +1,10 @@
import logging
import re
from PyCRC.CRC16 import CRC16
from .objects import MBusObject, MBusObjectV2_2, CosemObject
from .exceptions import ParseError
from .exceptions import ParseError, InvalidChecksumError
from .obis_references import GAS_METER_READING
logger = logging.getLogger(__name__)
@@ -18,7 +20,6 @@ class TelegramParser(object):
self.telegram_specification = telegram_specification
def _find_line_parser(self, line_value):
for obis_reference, parser in self.telegram_specification.items():
if re.search(obis_reference, line_value):
return obis_reference, parser
@@ -29,7 +30,10 @@ class TelegramParser(object):
telegram = {}
for line_value in line_values:
obis_reference, dsmr_object = self.parse_line(line_value.strip())
# TODO temporarily strip newline characters.
line_value = line_value.strip()
obis_reference, dsmr_object = self.parse_line(line_value)
telegram[obis_reference] = dsmr_object
@@ -47,7 +51,51 @@ class TelegramParser(object):
return obis_reference, parser.parse(line_value)
class TelegramParserV4(TelegramParser):
@staticmethod
def validate_telegram_checksum(line_values):
"""
:type line_values: list
:raises ParseError:
:raises InvalidChecksumError:
"""
full_telegram = ''.join(line_values)
# Extract the bytes that count towards the checksum.
checksum_contents = re.search(r'\/.+\!', full_telegram, re.DOTALL)
# Extract the hexadecimal checksum value itself.
checksum_hex = re.search(r'((?<=\!)[0-9A-Z]{4}(?=\r\n))+', full_telegram)
if not checksum_contents or not checksum_hex:
raise ParseError(
'Failed to perform CRC validation because the telegram is '
'incomplete. The checksum and/or content values are missing.'
)
calculated_crc = CRC16().calculate(checksum_contents.group(0))
expected_crc = checksum_hex.group(0)
expected_crc = int(expected_crc, base=16)
if calculated_crc != expected_crc:
raise InvalidChecksumError(
"Invalid telegram. The CRC checksum '{}' does not match the "
"expected '{}'".format(
calculated_crc,
expected_crc
)
)
def parse(self, line_values):
self.validate_telegram_checksum(line_values)
return super(self, TelegramParserV4).parse(line_values)
class TelegramParserV2_2(TelegramParser):
def parse(self, line_values):
"""Join lines for gas meter."""