Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- from pymodbus.server.sync import StartTcpServer
- from pymodbus.server.sync import StartUdpServer
- from pymodbus.server.sync import StartSerialServer
- from pymodbus.device import ModbusDeviceIdentification
- from pymodbus.datastore import ModbusSparseDataBlock
- from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext
- from math import sin, pi
- from datetime import datetime
- from random import random, randint
- from struct import pack, unpack
- import logging
- import time
- logging.basicConfig(format='%(asctime)-15s %(levelname)s:%(filename)s: %(message)s')
- log = logging.getLogger()
- log.setLevel(logging.DEBUG)
- class ThermometrEmulator(ModbusSparseDataBlock):
- def __init__(self, from_t=20, to_t=80, period=120):
- self.start = datetime.utcnow()
- self.from_t = from_t
- self.to_t = to_t
- self.period = period / pi
- @staticmethod
- def clamp(n, minn, maxn):
- return max(min(maxn, n), minn)
- def fluctuation(self, seconds):
- k = 2.0
- if random() > 0.95:
- k = random() * 10.0
- return (random() - 0.5) * k
- def wavelet(self, seconds):
- return sin(seconds / (self.period / 11)) / 2.0
- def wave(self, seconds):
- radians = seconds / self.period
- val = sin(radians)
- one_val = (val + 1.0) / 2.0
- scaled_val = one_val * (self.to_t - self.from_t) + self.from_t
- return scaled_val
- def generate(self, seconds):
- wave = self.wave(seconds)
- wavelet = self.wavelet(seconds)
- fluctuation = self.fluctuation(seconds)
- return wave + wavelet + fluctuation
- def validate(self, address, count=1):
- return True
- # Get values by count ignoring address
- def getValues(self, address, count=1):
- duration = datetime.utcnow() - self.start
- seconds = duration.total_seconds()
- value = self.generate(seconds)
- value = ThermometrEmulator.clamp(value, 0.0, 100.0)
- time.sleep(0.1)
- if count >= 5:
- values16from64f = unpack('<4H', pack('<d', value))
- values_repr = [f"0x{v:04X}" for v in values16from64f]
- log.debug(f" -- .getValues(address:{address}(ignored), count:{count}) -> Number {value} as {values_repr}")
- # First 16bit value is magic number 0x07d0 then four 16bit parts of the 64bit float
- # If more then 5 register requested fill the res with random 16bit numbers
- return [0x07d0] + [*values16from64f] + [randint(0x0000, 0xffff) for _ in range(count - 5)]
- else:
- # Keep first value 0x07d0 in order to make MB_TCP_FC3 example logic work
- # Fill the rest with random 16bit numbers
- log.debug(f" -- .getValues(address:{address}(ignored), count:{count}) -> Whatever")
- return [0x07d0] + [randint(0x0000, 0xffff) for _ in range(count - 1)]
- def setValues(self, address, values):
- log.debug(f" -- .setValues(address:{address}(ignored), values:{values}(ignored)) -> Unimplemented")
- store = ModbusSlaveContext(hr = ThermometrEmulator(20, 80, 120))
- context = ModbusServerContext(slaves=store, single=True)
- identity = ModbusDeviceIdentification()
- identity.VendorName = 'PyModbus'
- identity.ProductCode = 'PM'
- identity.VendorUrl = 'http://example.org'
- identity.ProductName = 'PyModbusServer'
- identity.ModelName = 'PyModbusServer'
- identity.MajorMinorRevision = '1.0'
- StartTcpServer(context, identity=identity, address=("192.168.1.166", 5022))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement