Progress on removing TelegramParserV2_2 and TelegramParserV4 in favor of a generic TelegramParser
This commit is contained in:
@@ -5,28 +5,22 @@ from PyCRC.CRC16 import CRC16
|
||||
|
||||
from dsmr_parser.objects import MBusObject, MBusObjectV2_2, CosemObject
|
||||
from dsmr_parser.exceptions import ParseError, InvalidChecksumError
|
||||
from dsmr_parser.obis_references import GAS_METER_READING
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TelegramParser(object):
|
||||
|
||||
def __init__(self, telegram_specification):
|
||||
def __init__(self, telegram_specification,
|
||||
enable_checksum_validation=False):
|
||||
"""
|
||||
:param telegram_specification: determines how the telegram is parsed
|
||||
:type telegram_specification: dict
|
||||
"""
|
||||
self.telegram_specification = telegram_specification
|
||||
self.enable_checksum_validation = enable_checksum_validation
|
||||
|
||||
def _find_line_parser(self, line_value):
|
||||
for obis_reference, parser in self.telegram_specification.items():
|
||||
if re.search(obis_reference, line_value):
|
||||
return obis_reference, parser
|
||||
|
||||
return None, None
|
||||
|
||||
def parse(self, telegram):
|
||||
def parse(self, telegram_data):
|
||||
"""
|
||||
Parse telegram from string to dict.
|
||||
|
||||
@@ -44,28 +38,22 @@ class TelegramParser(object):
|
||||
..
|
||||
}
|
||||
"""
|
||||
telegram_lines = telegram.splitlines()
|
||||
parsed_lines = map(self.parse_line, telegram_lines)
|
||||
|
||||
return {obis_reference: dsmr_object
|
||||
for obis_reference, dsmr_object in parsed_lines}
|
||||
if self.enable_checksum_validation:
|
||||
self.validate_checksum(telegram_data)
|
||||
|
||||
def parse_line(self, line):
|
||||
logger.debug("Parsing line '%s'", line)
|
||||
telegram = {}
|
||||
|
||||
obis_reference, parser = self._find_line_parser(line)
|
||||
for signature, parser in self.telegram_specification.items():
|
||||
match = re.search(signature, telegram_data, re.DOTALL)
|
||||
|
||||
if not obis_reference:
|
||||
logger.debug("No line class found for: '%s'", line)
|
||||
return None, None
|
||||
if match:
|
||||
telegram[signature] = parser.parse(match.group(0))
|
||||
|
||||
return obis_reference, parser.parse(line)
|
||||
|
||||
|
||||
class TelegramParserV4(TelegramParser):
|
||||
return telegram
|
||||
|
||||
@staticmethod
|
||||
def validate_telegram_checksum(telegram):
|
||||
def validate_checksum(telegram):
|
||||
"""
|
||||
:param str telegram:
|
||||
:raises ParseError:
|
||||
@@ -97,45 +85,6 @@ class TelegramParserV4(TelegramParser):
|
||||
)
|
||||
)
|
||||
|
||||
def parse(self, telegram):
|
||||
"""
|
||||
:param str telegram:
|
||||
:rtype: dict
|
||||
"""
|
||||
self.validate_telegram_checksum(telegram)
|
||||
|
||||
return super().parse(telegram)
|
||||
|
||||
|
||||
class TelegramParserV2_2(TelegramParser):
|
||||
|
||||
def parse(self, telegram):
|
||||
"""
|
||||
:param str telegram:
|
||||
:rtype: dict
|
||||
"""
|
||||
|
||||
# TODO fix this in the specification: telegram_specifications.V2_2
|
||||
def join_lines(telegram):
|
||||
"""Join lines for gas meter."""
|
||||
join_next = re.compile(GAS_METER_READING)
|
||||
|
||||
join = None
|
||||
for line_value in telegram.splitlines():
|
||||
if join:
|
||||
yield join + line_value
|
||||
join = None
|
||||
elif join_next.match(line_value):
|
||||
join = line_value
|
||||
else:
|
||||
yield line_value
|
||||
|
||||
# TODO temporary workaround
|
||||
lines = join_lines(telegram)
|
||||
telegram = '\r\n'.join(lines)
|
||||
|
||||
return super().parse(telegram)
|
||||
|
||||
|
||||
class DSMRObjectParser(object):
|
||||
|
||||
|
||||
Reference in New Issue
Block a user