Files
dsmr2mqtt/app.py
2023-02-12 16:10:03 +01:00

77 lines
2.5 KiB
Python

import os
import time
import re
import sys
import threading
import _thread
from dsmr_parser import telegram_specifications
from dsmr_parser.clients import SerialReader, SocketReader, SERIAL_SETTINGS_V2_2, SERIAL_SETTINGS_V4, SERIAL_SETTINGS_V5
from app_settings import Settings
from device_dsmr import Device_DSMR
import logging
logger = logging.getLogger(__name__)
def main():
cfg = Settings()
logfmt = "%(asctime)s [%(levelname)-5.5s] [%(name)s] %(message)s"
logging.basicConfig(level=cfg.loglevel, format=logfmt, filename="dsmr2mqtt.log")
# set up logging to console
consolelog = logging.StreamHandler(sys.stdout)
consolelog.setLevel(cfg.loglevel)
# set a format which is simpler for console use
formatter = logging.Formatter(logfmt)
consolelog.setFormatter(formatter)
logging.getLogger("").addHandler(consolelog)
protocol_version: str = str(cfg.dsmr_protocol)
if 'V2' in protocol_version:
serial_setup = SERIAL_SETTINGS_V2_2
spec = telegram_specifications.V2_2
elif 'V3' in protocol_version:
serial_setup = SERIAL_SETTINGS_V4
spec = telegram_specifications.V3
elif 'V4' in protocol_version:
serial_setup = SERIAL_SETTINGS_V4
spec = telegram_specifications.V4
else:
serial_setup = SERIAL_SETTINGS_V5
spec = telegram_specifications.V5
device = Device_DSMR("dsmr", name="Digitale Slimme Meter")
if "tcp:" in cfg.dsmr_port:
result = re.search(r"tcp://([^:]+):(.+)", cfg.dsmr_port)
host = result.group(1)
port = int(result.group(2))
logger.info(f"reading from remote host {host} on port {port}")
reader = SocketReader(host, port, spec)
else:
logger.info(f"using serial port {cfg.dsmr_port}")
reader = SerialReader(cfg.dsmr_port, serial_setup, spec)
def handle_next_telegram():
telegram = next(reader.read_as_object())
device.update(telegram)
logger.info(f"New telegram at {telegram.P1_MESSAGE_TIMESTAMP.value}")
def start_timer():
threading.Timer(cfg.dsmr_update_interval, start_timer).start()
try:
handle_next_telegram()
except UnicodeDecodeError:
logger.warning("Cannot decode telegram")
except Exception as ex:
_thread.interrupt_main()
start_timer.has_decode_error = False
start_timer()
# for telegram in serial_reader.read_as_object():
# device.update(telegram)
if __name__ == "__main__":
main()