Advertisement
Guest User

Untitled

a guest
Aug 8th, 2021
48
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.56 KB | None | 0 0
  1. #!/usr/bin/env python3
  2.  
  3. from pymodbus.server.sync import StartTcpServer
  4. from pymodbus.server.sync import StartUdpServer
  5. from pymodbus.server.sync import StartSerialServer
  6.  
  7. from pymodbus.device import ModbusDeviceIdentification
  8. from pymodbus.datastore import ModbusSparseDataBlock
  9. from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext
  10.  
  11. from math import sin, pi
  12. from datetime import datetime
  13. from random import random, randint
  14. from struct import pack, unpack
  15.  
  16. import logging
  17. import time
  18.  
  19.  
  20. logging.basicConfig(format='%(asctime)-15s %(levelname)s:%(filename)s: %(message)s')
  21. log = logging.getLogger()
  22. log.setLevel(logging.DEBUG)
  23.  
  24. class ThermometrEmulator(ModbusSparseDataBlock):
  25.     def __init__(self, from_t=20, to_t=80, period=120):
  26.         self.start = datetime.utcnow()
  27.         self.from_t = from_t
  28.         self.to_t = to_t
  29.         self.period = period / pi
  30.    
  31.     @staticmethod
  32.     def clamp(n, minn, maxn):
  33.         return max(min(maxn, n), minn)
  34.  
  35.     def fluctuation(self, seconds):
  36.         k = 2.0
  37.         if random() > 0.95:
  38.             k = random() * 10.0
  39.         return (random() - 0.5) * k
  40.  
  41.     def wavelet(self, seconds):
  42.         return sin(seconds / (self.period / 11)) / 2.0
  43.  
  44.     def wave(self, seconds):
  45.         radians = seconds / self.period
  46.         val = sin(radians)
  47.         one_val = (val + 1.0) / 2.0
  48.         scaled_val = one_val * (self.to_t - self.from_t) + self.from_t
  49.         return scaled_val
  50.    
  51.     def generate(self, seconds):
  52.         wave = self.wave(seconds)
  53.         wavelet = self.wavelet(seconds)
  54.         fluctuation = self.fluctuation(seconds)
  55.         return wave + wavelet + fluctuation
  56.    
  57.     def validate(self, address, count=1):
  58.         return True
  59.    
  60.     # Get values by count ignoring address
  61.     def getValues(self, address, count=1):
  62.         duration = datetime.utcnow() - self.start
  63.  
  64.         seconds = duration.total_seconds()
  65.         value = self.generate(seconds)
  66.  
  67.         value = ThermometrEmulator.clamp(value, 0.0, 100.0)
  68.        
  69.         time.sleep(0.1)
  70.  
  71.         if count >= 5:
  72.             values16from64f = unpack('<4H', pack('<d', value))
  73.             values_repr = [f"0x{v:04X}" for v in values16from64f]
  74.  
  75.             log.debug(f" -- .getValues(address:{address}(ignored), count:{count}) -> Number {value} as {values_repr}")
  76.            
  77.             # First 16bit value is magic number 0x07d0 then four 16bit parts of the 64bit float
  78.             # If more then 5 register requested fill the res with random 16bit numbers
  79.             return [0x07d0] + [*values16from64f] + [randint(0x0000, 0xffff) for _ in range(count - 5)]
  80.         else:
  81.             # Keep first value 0x07d0 in order to make MB_TCP_FC3 example logic work
  82.             # Fill the rest with random 16bit numbers
  83.             log.debug(f" -- .getValues(address:{address}(ignored), count:{count}) -> Whatever")
  84.             return [0x07d0] + [randint(0x0000, 0xffff) for _ in range(count - 1)]
  85.  
  86.     def setValues(self, address, values):
  87.         log.debug(f" -- .setValues(address:{address}(ignored), values:{values}(ignored)) -> Unimplemented")
  88.  
  89.  
  90. store = ModbusSlaveContext(hr = ThermometrEmulator(20, 80, 120))
  91. context = ModbusServerContext(slaves=store, single=True)
  92.  
  93. identity = ModbusDeviceIdentification()
  94. identity.VendorName  = 'PyModbus'
  95. identity.ProductCode = 'PM'
  96. identity.VendorUrl   = 'http://example.org'
  97. identity.ProductName = 'PyModbusServer'
  98. identity.ModelName   = 'PyModbusServer'
  99. identity.MajorMinorRevision = '1.0'
  100.  
  101. StartTcpServer(context, identity=identity, address=("192.168.1.166", 5022))
  102.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement