refactored TelegramParser.parse to accept a str instead of list

This commit is contained in:
Nigel Dokter
2017-01-05 21:24:41 +01:00
parent 1373d570d2
commit f10032f701
6 changed files with 180 additions and 154 deletions

View File

@@ -26,48 +26,58 @@ class TelegramParser(object):
return None, None
def parse(self, line_values):
telegram = {}
def parse(self, telegram):
"""
Parse telegram from string to dict.
for line_value in line_values:
# TODO temporarily strip newline characters.
line_value = line_value.strip()
The telegram str type makes python 2.x integration easier.
obis_reference, dsmr_object = self.parse_line(line_value)
:param str telegram: full telegram from start ('/') to checksum
('!ABCD') including line endings inbetween the telegram's lines
:rtype: dict
:returns: Shortened example:
{
..
r'0-0:96\.1\.1': <CosemObject>, # EQUIPMENT_IDENTIFIER
r'1-0:1\.8\.1': <CosemObject>, # ELECTRICITY_USED_TARIFF_1
r'0-\d:24\.3\.0': <MBusObject>, # GAS_METER_READING
..
}
"""
telegram_lines = telegram.splitlines()
parsed_lines = map(self.parse_line, telegram_lines)
telegram[obis_reference] = dsmr_object
return {obis_reference: dsmr_object
for obis_reference, dsmr_object in parsed_lines}
return telegram
def parse_line(self, line):
logger.debug("Parsing line '%s'", line)
def parse_line(self, line_value):
logger.debug('Parsing line \'%s\'', line_value)
obis_reference, parser = self._find_line_parser(line)
obis_reference, parser = self._find_line_parser(line_value)
if not parser:
logger.warning("No line class found for: '%s'", line_value)
if not obis_reference:
logger.debug("No line class found for: '%s'", line)
return None, None
return obis_reference, parser.parse(line_value)
return obis_reference, parser.parse(line)
class TelegramParserV4(TelegramParser):
@staticmethod
def validate_telegram_checksum(line_values):
def validate_telegram_checksum(telegram):
"""
:type line_values: list
:param str telegram:
:raises ParseError:
:raises InvalidChecksumError:
"""
full_telegram = ''.join(line_values)
# Extract the bytes that count towards the checksum.
checksum_contents = re.search(r'\/.+\!', full_telegram, re.DOTALL)
# Extract the part for which the checksum applies.
checksum_contents = re.search(r'\/.+\!', telegram, re.DOTALL)
# Extract the hexadecimal checksum value itself.
checksum_hex = re.search(r'((?<=\!)[0-9A-Z]{4}(?=\r\n))+', full_telegram)
# The line ending '\r\n' for the checksum line can be ignored.
checksum_hex = re.search(r'((?<=\!)[0-9A-Z]{4})+', telegram)
if not checksum_contents or not checksum_hex:
raise ParseError(
@@ -76,8 +86,7 @@ class TelegramParserV4(TelegramParser):
)
calculated_crc = CRC16().calculate(checksum_contents.group(0))
expected_crc = checksum_hex.group(0)
expected_crc = int(expected_crc, base=16)
expected_crc = int(checksum_hex.group(0), base=16)
if calculated_crc != expected_crc:
raise InvalidChecksumError(
@@ -88,31 +97,44 @@ class TelegramParserV4(TelegramParser):
)
)
def parse(self, line_values):
self.validate_telegram_checksum(line_values)
def parse(self, telegram):
"""
:param str telegram:
:rtype: dict
"""
self.validate_telegram_checksum(telegram)
return super().parse(line_values)
return super().parse(telegram)
class TelegramParserV2_2(TelegramParser):
def parse(self, line_values):
"""Join lines for gas meter."""
def parse(self, telegram):
"""
:param str telegram:
:rtype: dict
"""
def join_lines(line_values):
# TODO fix this in the specification: telegram_specifications.V2_2
def join_lines(telegram):
"""Join lines for gas meter."""
join_next = re.compile(GAS_METER_READING)
join = None
for line_value in line_values:
for line_value in telegram.splitlines():
if join:
yield join.strip() + line_value
yield join + line_value
join = None
elif join_next.match(line_value):
join = line_value
else:
yield line_value
return super().parse(join_lines(line_values))
# TODO temporary workaround
lines = join_lines(telegram)
telegram = '\r\n'.join(lines)
return super().parse(telegram)
class DSMRObjectParser(object):