Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- #
- # Author: Kyle Manna
- #
- import collections
- import csv
- import json
- import paramiko
- import sys
- import time
- #
- # Class to interface to Trimble GM100 GPS Discipline PTP Grandmaster Clock
- #
- class GM100:
- def __init__(self, host):
- self._host = host
- self._user = 'trimblesuper'
- self._pass = 'trimblesuper'
- pass
- def connect(self):
- self._ssh = paramiko.SSHClient()
- self._ssh.set_missing_host_key_policy(paramiko.client.AutoAddPolicy())
- self._ssh.connect(self._host, username=self._user, password=self._pass)
- self._chan = self._ssh.invoke_shell()
- def wait_for_prompt(self):
- resp = b''
- while not resp.endswith(b'> '):
- buf = self._chan.recv(1024)
- resp += buf
- return resp
- def get_gnss(self):
- self._chan.send('view freq\n\r')
- resp = self.wait_for_prompt()
- if b'No frequency status available at this time.' in resp:
- return None
- # Discard first line (command sent) and last line (command prompt)
- resp = resp.split(b'\r\n')[1:-1]
- # Split the 'Key: Value' byte array in to a stripped string list
- resp = [i.decode().strip().split(': ',1) for i in resp]
- # Convert the list to a kv dictionary
- gnss = collections.OrderedDict([(kv[0], kv[1]) for kv in resp])
- return gnss
- def readline(self):
- resp = b''
- while not resp.endswith(b'\r\n'):
- # HACK cringe reading one byte at a time
- buf = self._chan.recv(1)
- resp += buf
- return resp
- def view_stream(self):
- self._chan.send('view stream\n\r')
- # Discard command echo'd back
- self.readline()
- keys = self.readline().decode().strip().split(',')
- while True:
- values = gm100.readline().decode().strip().split(',')
- yield collections.OrderedDict(zip(keys, values))
- #
- # Test application dumps the frequency data to stdout in CSV format
- #
- if __name__ == '__main__':
- gm100 = GM100(sys.argv[1])
- gm100.connect()
- gm100.wait_for_prompt()
- """
- sample = gm100.get_gnss()
- while not sample:
- sample = gm100.get_gnss()
- fieldnames = sample.keys()
- writer = csv.DictWriter(sys.stdout, fieldnames=fieldnames)
- writer.writeheader()
- while True:
- sample = gm100.get_gnss()
- #print(json.dumps(sample))
- writer.writerow(sample)
- time.sleep(10)
- """
- writer = None
- for i in gm100.view_stream():
- if not writer:
- writer = csv.DictWriter(sys.stdout, fieldnames=i.keys())
- writer.writeheader()
- #print(json.dumps(i))
- writer.writerow(i)
Add Comment
Please, Sign In to add comment