from time import sleep
import os
import osc4py3
from osc4py3.as_comthreads import *
from osc4py3 import oscmethod as osm
from queue import Queue
import time
q = Queue()
recv = {}
to_print= {}
port = 6565
last_sample = 0
sample_delay = 0.1
clear_cmd = "clear"
try:
os.system(clear_cmd)
except Exception as e:
clear_cmd = "cls"
try:
os.system(clear_cmd)
except Exception as e:
print("Can't run the program, we don't know how to clear the console on your system")
quit()
def handlerfunction(address, args):
global recv
global last_sample, sample_delay
t = time.time()
sample_delay = t - last_sample
last_sample = t
if address not in recv.keys():
recv[address] = time.time()
return
to_print[address] = t - recv[address]
recv[address] = t
# print(recv[address])
osc_startup()
osc_udp_server("0.0.0.0", 6565, "anotherserver")
osc_method("//", handlerfunction, argscheme=osm.OSCARG_ADDRESS + osm.OSCARG_DATA)
def display_title_bar():
os.system('clear')
print("*"*56)
print(" Test notochord send rate | listening at port {:6d}".format(port))
print("*"*56)
bigger_recv_delta = 0.1
for delta in to_print.values():
bigger_recv_delta = max(bigger_recv_delta, delta)
acc = 100
try:
acc = 100 / (bigger_recv_delta / sample_delay)
except Exception as e:
pass
print(" Sample rate = {:4d}ms | Sampling accuracy = {:2.1f}%".format(
int(sample_delay*1000), 100-acc ))
print("-"*56)
count = 1
for ms, t in to_print.items():
print("{:2d}] {:_<26} > {:5.0f}ms | {:2.1f}Hz".format(count, ms, t*1000, 1/t ))
count +=1
while 1:
display_title_bar()
for i in range(500):
osc_process()
sleep(0.001)
osc_terminate()