Libmodbus Client Example

#!/usr/bin/env python3
# pylint: disable=missing-type-doc,missing-param-doc,differing-param-doc,missing-raises-doc
"""Libmodbus Protocol Wrapper.

What follows is an example wrapper of the libmodbus library
(https://libmodbus.org/documentation/) for use with pymodbus.
There are two utilities involved here:

* LibmodbusLevel1Client

  This is simply a python wrapper around the c library. It is
  mostly a clone of the pylibmodbus implementation, but I plan
  on extending it to implement all the available protocol using
  the raw execute methods.

* LibmodbusClient

  This is just another modbus client that can be used just like
  any other client in pymodbus.

For these to work, you must have `cffi` and `libmodbus-dev` installed:

    sudo apt-get install libmodbus-dev
    pip install cffi
"""
# -------------------------------------------------------------------------- #
# import system libraries
# -------------------------------------------------------------------------- #
from cffi import FFI  # pylint: disable=import-error

from pymodbus.bit_read_message import (
    ReadCoilsResponse,
    ReadDiscreteInputsResponse,
)
from pymodbus.bit_write_message import (
    WriteMultipleCoilsResponse,
    WriteSingleCoilResponse,
)
from pymodbus.client.mixin import ModbusClientMixin
from pymodbus.constants import Defaults
from pymodbus.exceptions import ModbusException
from pymodbus.register_read_message import (
    ReadHoldingRegistersResponse,
    ReadInputRegistersResponse,
    ReadWriteMultipleRegistersResponse,
)
from pymodbus.register_write_message import (
    WriteMultipleRegistersResponse,
    WriteSingleRegisterResponse,
)


# -------------------------------------------------------------------------- #
# import pymodbus libraries
# -------------------------------------------------------------------------- #


# --------------------------------------------------------------------------- #
# create the C interface
# --------------------------------------------------------------------------- #
# * TODO add the protocol needed for the servers
# --------------------------------------------------------------------------- #

compiler = FFI()
compiler.cdef(
    """
    typedef struct _modbus modbus_t;

    int modbus_connect(modbus_t *ctx);
    int modbus_flush(modbus_t *ctx);
    void modbus_close(modbus_t *ctx);

    const char *modbus_strerror(int errnum);
    int modbus_set_slave(modbus_t *ctx, int slave);

    void modbus_get_response_timeout(modbus_t *ctx, uint32_t *to_sec, uint32_t *to_usec);
    void modbus_set_response_timeout(modbus_t *ctx, uint32_t to_sec, uint32_t to_usec);

    int modbus_read_bits(modbus_t *ctx, int addr, int nb, uint8_t *dest);
    int modbus_read_input_bits(modbus_t *ctx, int addr, int nb, uint8_t *dest);
    int modbus_read_registers(modbus_t *ctx, int addr, int nb, uint16_t *dest);
    int modbus_read_input_registers(modbus_t *ctx, int addr, int nb, uint16_t *dest);

    int modbus_write_bit(modbus_t *ctx, int coil_addr, int status);
    int modbus_write_bits(modbus_t *ctx, int addr, int nb, const uint8_t *data);
    int modbus_write_register(modbus_t *ctx, int reg_addr, int value);
    int modbus_write_registers(modbus_t *ctx, int addr, int nb, const uint16_t *data);
    int modbus_write_and_read_registers(modbus_t *ctx, int write_addr, int write_nb,
                                        const uint16_t *src, int read_addr, int read_nb, uint16_t *dest);

    int modbus_mask_write_register(modbus_t *ctx, int addr, uint16_t and_mask, uint16_t or_mask);
    int modbus_send_raw_request(modbus_t *ctx, uint8_t *raw_req, int raw_req_length);

    float modbus_get_float(const uint16_t *src);
    void modbus_set_float(float f, uint16_t *dest);

    modbus_t* modbus_new_tcp(const char *ip_address, int port);
    modbus_t* modbus_new_rtu(const char *device, int baud, char parity, int data_bit, int stop_bit);
    void modbus_free(modbus_t *ctx);

    int modbus_receive(modbus_t *ctx, uint8_t *req);
    int modbus_receive_from(modbus_t *ctx, int sockfd, uint8_t *req);
    int modbus_receive_confirmation(modbus_t *ctx, uint8_t *rsp);
"""
)
LIB = compiler.dlopen("modbus")  # create our bindings

# -------------------------------------------------------------------------- #
# helper utilities
# -------------------------------------------------------------------------- #


def get_float(data):
    """Get float."""
    return LIB.modbus_get_float(data)


def set_float(value, data):
    """Set float."""
    LIB.modbus_set_float(value, data)


def cast_to_int16(data):
    """Cast to int16."""
    return int(compiler.cast("int16_t", data))


def cast_to_int32(data):
    """Cast to int32."""
    return int(compiler.cast("int32_t", data))


class NotImplementedException(Exception):
    """Not implemented exception."""


# -------------------------------------------------------------------------- #
# level1 client
# -------------------------------------------------------------------------- #


class LibmodbusLevel1Client:
    """A raw wrapper around the libmodbus c library.

    Feel free to use it if you want increased performance and don"t mind the
    entire protocol not being implemented.
    """

    @classmethod
    def create_tcp_client(cls, my_host="127.0.0.1", my_port=Defaults.TcpPort):
        """Create a TCP modbus client for the supplied parameters.

        :param host: The host to connect to
        :param port: The port to connect to on that host
        :returns: A new level1 client
        """
        my_client = LIB.modbus_new_tcp(my_host.encode(), my_port)
        return cls(my_client)

    @classmethod
    def create_rtu_client(cls, **kwargs):
        """Create a TCP modbus client for the supplied parameters.

        :param port: The serial port to attach to
        :param stopbits: The number of stop bits to use
        :param bytesize: The bytesize of the serial messages
        :param parity: Which kind of parity to use
        :param baudrate: The baud rate to use for the serial device
        :returns: A new level1 client
        """
        my_port = kwargs.get("port", "/dev/ttyS0")
        baudrate = kwargs.get("baud", Defaults.Baudrate)
        parity = kwargs.get("parity", Defaults.Parity)
        bytesize = kwargs.get("bytesize", Defaults.Bytesize)
        stopbits = kwargs.get("stopbits", Defaults.Stopbits)
        my_client = LIB.modbus_new_rtu(my_port, baudrate, parity, bytesize, stopbits)
        return cls(my_client)

    def __init__(self, my_client):
        """Initialize a new instance of the LibmodbusLevel1Client.

        This method should not be used, instead new instances should be created
        using the two supplied factory methods:

        * LibmodbusLevel1Client.create_rtu_client(...)
        * LibmodbusLevel1Client.create_tcp_client(...)

        :param client: The underlying client instance to operate with.
        """
        self.client = my_client
        self.slave = Defaults.Slave

    def set_slave(self, slave):
        """Set the current slave to operate against.

        :param slave: The new slave to operate against
        :returns: The resulting slave to operate against
        """
        self.slave = self._execute(  # pylint: disable=no-member
            LIB.modbus_set_slave, slave
        )
        return self.slave

    def connect(self):
        """Attempt to connect to the client target.

        :returns: True if successful, throws otherwise
        """
        return not self.__execute(LIB.modbus_connect)

    def flush(self):
        """Discard the existing bytes on the wire.

        :returns: The number of flushed bytes, or throws
        """
        return self.__execute(LIB.modbus_flush)

    def close(self):
        """Close and frees the underlying connection and context structure.

        :returns: Always True
        """
        LIB.modbus_close(self.client)
        LIB.modbus_free(self.client)
        return True

    def __execute(self, command, *args):
        """Run the supplied command against the currently instantiated client with the supplied arguments.

        This will make sure to correctly handle resulting errors.

        :param command: The command to execute against the context
        :param *args: The arguments for the given command
        :returns: The result of the operation unless -1 which throws
        """
        if (result := command(self.client, *args)) == -1:
            message = LIB.modbus_strerror(compiler.errno)
            raise ModbusException(compiler.string(message))
        return result

    def read_bits(self, address, count=1):
        """Read bits.

        :param address: The starting address to read from
        :param count: The number of coils to read
        :returns: The resulting bits
        """
        result = compiler.new("uint8_t[]", count)
        self.__execute(LIB.modbus_read_bits, address, count, result)
        return result

    def read_input_bits(self, address, count=1):
        """Read input bits.

        :param address: The starting address to read from
        :param count: The number of discretes to read
        :returns: The resulting bits
        """
        result = compiler.new("uint8_t[]", count)
        self.__execute(LIB.modbus_read_input_bits, address, count, result)
        return result

    def write_bit(self, address, value):
        """Write bit.

        :param address: The starting address to write to
        :param value: The value to write to the specified address
        :returns: The number of written bits
        """
        return self.__execute(LIB.modbus_write_bit, address, value)

    def write_bits(self, address, values):
        """Write bits.

        :param address: The starting address to write to
        :param values: The values to write to the specified address
        :returns: The number of written bits
        """
        count = len(values)
        return self.__execute(LIB.modbus_write_bits, address, count, values)

    def write_register(self, address, value):
        """Write register.

        :param address: The starting address to write to
        :param value: The value to write to the specified address
        :returns: The number of written registers
        """
        return self.__execute(LIB.modbus_write_register, address, value)

    def write_registers(self, address, values):
        """Write registers.

        :param address: The starting address to write to
        :param values: The values to write to the specified address
        :returns: The number of written registers
        """
        count = len(values)
        return self.__execute(LIB.modbus_write_registers, address, count, values)

    def read_registers(self, address, count=1):
        """Read registers.

        :param address: The starting address to read from
        :param count: The number of registers to read
        :returns: The resulting read registers
        """
        result = compiler.new("uint16_t[]", count)
        self.__execute(LIB.modbus_read_registers, address, count, result)
        return result

    def read_input_registers(self, address, count=1):
        """Read input registers.

        :param address: The starting address to read from
        :param count: The number of registers to read
        :returns: The resulting read registers
        """
        result = compiler.new("uint16_t[]", count)
        self.__execute(LIB.modbus_read_input_registers, address, count, result)
        return result

    def read_and_write_registers(
        self, read_address, read_count, write_address, write_registers
    ):
        """Read/write registers.

        :param read_address: The address to start reading from
        :param read_count: The number of registers to read from address
        :param write_address: The address to start writing to
        :param write_registers: The registers to write to the specified address
        :returns: The resulting read registers
        """
        write_count = len(write_registers)
        read_result = compiler.new("uint16_t[]", read_count)
        self.__execute(
            LIB.modbus_write_and_read_registers,
            write_address,
            write_count,
            write_registers,
            read_address,
            read_count,
            read_result,
        )
        return read_result


# -------------------------------------------------------------------------- #
# level2 client
# -------------------------------------------------------------------------- #


class LibmodbusClient(ModbusClientMixin):
    """A facade around the raw level 1 libmodbus client.

    that implements the pymodbus protocol on top of the lower level client.
    """

    # ----------------------------------------------------------------------- #
    # these are used to convert from the pymodbus request types to the
    # libmodbus operations (overloaded operator).
    # ----------------------------------------------------------------------- #

    __methods = {
        "ReadCoilsRequest": lambda c, r: c.read_bits(r.address, r.count),
        "ReadDiscreteInputsRequest": lambda c, r: c.read_input_bits(r.address, r.count),
        "WriteSingleCoilRequest": lambda c, r: c.write_bit(r.address, r.value),
        "WriteMultipleCoilsRequest": lambda c, r: c.write_bits(r.address, r.values),
        "WriteSingleRegisterRequest": lambda c, r: c.write_register(r.address, r.value),
        "WriteMultipleRegistersRequest": lambda c, r: c.write_registers(
            r.address, r.values
        ),
        "ReadHoldingRegistersRequest": lambda c, r: c.read_registers(
            r.address, r.count
        ),
        "ReadInputRegistersRequest": lambda c, r: c.read_input_registers(
            r.address, r.count
        ),
        "ReadWriteMultipleRegistersRequest": lambda c, r: c.read_and_write_registers(
            r.read_address, r.read_count, r.write_address, r.write_registers
        ),
    }

    # ----------------------------------------------------------------------- #
    # these are used to convert from the libmodbus result to the
    # pymodbus response type
    # ----------------------------------------------------------------------- #

    __adapters = {
        "ReadCoilsRequest": lambda tx, rx: ReadCoilsResponse(list(rx)),
        "ReadDiscreteInputsRequest": lambda tx, rx: ReadDiscreteInputsResponse(
            list(rx)
        ),
        "WriteSingleCoilRequest": lambda tx, rx: WriteSingleCoilResponse(
            tx.address, rx
        ),
        "WriteMultipleCoilsRequest": lambda tx, rx: WriteMultipleCoilsResponse(
            tx.address, rx
        ),
        "WriteSingleRegisterRequest": lambda tx, rx: WriteSingleRegisterResponse(
            tx.address, rx
        ),
        "WriteMultipleRegistersRequest": lambda tx, rx: WriteMultipleRegistersResponse(
            tx.address, rx
        ),
        "ReadHoldingRegistersRequest": lambda tx, rx: ReadHoldingRegistersResponse(
            list(rx)
        ),
        "ReadInputRegistersRequest": lambda tx, rx: ReadInputRegistersResponse(
            list(rx)
        ),
        "ReadWriteMultipleRegistersRequest": lambda tx, rx: ReadWriteMultipleRegistersResponse(
            list(rx)
        ),
    }

    def __init__(self, my_client):
        """Initialize a new instance of the LibmodbusClient.

        This should be initialized with one of the LibmodbusLevel1Client instances:

        * LibmodbusLevel1Client.create_rtu_client(...)
        * LibmodbusLevel1Client.create_tcp_client(...)
        :param client: The underlying client instance to operate with.
        """
        self.client = my_client

    # ----------------------------------------------------------------------- #
    # We use the client mixin to implement the api methods which are all
    # forwarded to this method. It is implemented using the previously
    # defined lookup tables. Any method not defined simply throws.
    # ----------------------------------------------------------------------- #

    def execute(self, request):
        """Execute the supplied request against the server.

        :param request: The request to process
        :returns: The result of the request execution
        """
        if self.client.slave != request.unit_id:
            self.client.set_slave(request.unit_id)

        method = request.__class__.__name__
        operation = self.__methods.get(method, None)
        adapter = self.__adapters.get(method, None)

        if not operation or not adapter:
            raise NotImplementedException("Method not implemented: " + operation)

        response = operation(self.client, request)
        return adapter(request, response)

    # ----------------------------------------------------------------------- #
    # Other methods can simply be forwarded using the decorator pattern
    # ----------------------------------------------------------------------- #

    def connect(self):
        """Connect."""
        return self.client.connect()

    def close(self):
        """Close."""
        return self.client.close()

    # ----------------------------------------------------------------------- #
    # magic methods
    # ----------------------------------------------------------------------- #

    def __enter__(self):
        """Implement the client with enter block

        :returns: The current instance of the client
        """
        self.client.connect()
        return self

    def __exit__(self, klass, value, traceback):
        """Implement the client with exit block"""
        self.client.close()


# -------------------------------------------------------------------------- #
# main example runner
# -------------------------------------------------------------------------- #


if __name__ == "__main__":

    # create our low level client
    host = "127.0.0.1"  # pylint: disable=invalid-name
    port = 502  # pylint: disable=invalid-name
    protocol = LibmodbusLevel1Client.create_tcp_client(host, port)

    # operate with our high level client
    with LibmodbusClient(protocol) as client:
        registers = client.write_registers(0, [13, 12, 11])
        print(registers)
        registers = client.read_holding_registers(0, 10)
        print(registers.registers)