Advertisement
ANIK123

uhd with pps time sync

Jun 8th, 2017
150
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 11.61 KB | None | 0 0
  1. /**
  2.  * @filevierview
  3.  * I use several devices and try to synchronize their time as much as possible, using an OctoClock as PPS reference and multithreading.
  4.  */
  5.  
  6. #include <uhd/utils/thread_priority.hpp>
  7. #include <uhd/utils/safe_main.hpp>
  8. #include <uhd/usrp/multi_usrp.hpp>
  9. #include <uhd/exception.hpp>
  10. #include <uhd/types/tune_request.hpp>
  11. #include <boost/program_options.hpp>
  12. #include <boost/format.hpp>
  13. #include <boost/thread.hpp>
  14. #include <iostream>
  15. #include <fstream>
  16. #include <thread>
  17. #include <csignal>
  18. #include <future>
  19. #include <sys/stat.h>
  20. #include <fcntl.h>
  21. #include <boost/variant/detail/substitute.hpp>
  22.  
  23. //region Colorize
  24. /**
  25.  * Just colorized std::output, nevermind
  26.  */
  27. namespace Color {
  28.     enum Code {
  29.         FG_RED = 31,
  30.         FG_GREEN = 32,
  31.         FG_BLUE = 34,
  32.         FG_DEFAULT = 39,
  33.         BG_RED = 41,
  34.         BG_GREEN = 42,
  35.         BG_BLUE = 44,
  36.         BG_DEFAULT = 49,
  37.         S_BOLD = 1
  38.     };
  39.  
  40.     class Modifier {
  41.         Code code;
  42.     public:
  43.         Modifier(Code pCode) : code(pCode) {}
  44.  
  45.         friend std::ostream &
  46.         operator<<(std::ostream &os, const Modifier &mod) {
  47.             return os << "\033[" << mod.code << "m";
  48.         }
  49.     };
  50. }
  51.  
  52. Color::Modifier red(Color::FG_RED);
  53. Color::Modifier green(Color::FG_GREEN);
  54. Color::Modifier blue(Color::FG_BLUE);
  55. Color::Modifier def(Color::FG_DEFAULT);
  56. Color::Modifier bold(Color::S_BOLD);
  57. //endregion
  58.  
  59. static bool stop_signal_called = false;
  60.  
  61. void sig_int_handler(int) {
  62.     std::cout << green << "SIG INT HANDLED" << def << std::endl;
  63.     stop_signal_called = true;
  64. }
  65.  
  66. /**
  67.  * The target in which data is written from the buffer.
  68.  */
  69. class SinkBase{
  70. public:
  71.     virtual void write(const char*, std::streamsize) = 0;
  72.     virtual std::string get_full_file_name() = 0;
  73.     virtual bool is_open() = 0;
  74.     virtual void close() = 0;
  75. };
  76.  
  77. /**
  78.  * Write to "nothing"
  79.  */
  80. class SinkNull{
  81. public:
  82.     SinkNull(std::string file_name){
  83.     }
  84.     void write(const char* __s, std::streamsize __n){
  85.     }
  86.     bool is_open(){
  87.         return true;
  88.     }
  89.     void close(){
  90.     }
  91.     std::string get_full_file_name(){
  92.         return "NULL SINK";
  93.     }
  94. };
  95.  
  96. /**
  97.  * Just a wrapper over the device.
  98.  */
  99. class USRPController {
  100. public:
  101.     uhd::usrp::multi_usrp::sptr device;
  102.  
  103.     USRPController(std::string serial, double rate, double freq, std::string file_name) :
  104.             serial(serial), rate(rate), freq(freq), sink(file_name) {
  105.         this->gain = 25;
  106.         std::ostringstream string_stream;
  107.         string_stream << bold << "[DEV " << blue << serial << def << "] ";
  108.         log_prepend = string_stream.str();
  109.         std::stringstream ss;
  110.         ss << log_prepend << "Creating dev with rate " << rate << ", freq " << freq << ", file_name " << file_name << std::endl;
  111.         std::cout << ss.str();
  112.  
  113.         this->device = uhd::usrp::multi_usrp::make("serial=" + serial);
  114.         this->device->set_time_source("external");
  115.         this->device->set_rx_subdev_spec(std::string("A:A"));
  116.         std::string cpu_format = "fc32";
  117.         std::string wire_format = "sc12";
  118.         uhd::stream_args_t stream_args(cpu_format, wire_format);
  119.         this->rx_stream = this->device->get_rx_stream(stream_args);
  120.  
  121.         size_t samps_per_buff = 10000;
  122.         this->buff = std::vector<std::complex<float>>(samps_per_buff);
  123.     }
  124.  
  125.     /**
  126.      * Timed initialization of freq, gain, rate, etc.,
  127.      * and wait for lo_locked
  128.      * @param cmd_time Time to start command
  129.      */
  130.     void initialize(uhd::time_spec_t cmd_time){
  131.         this->device->set_command_time(cmd_time);
  132.  
  133.         uhd::tune_request_t tune_request(this->freq);
  134.         this->device->set_rx_freq(tune_request);
  135.         this->device->set_rx_gain(this->gain);
  136.         this->device->set_rx_rate(this->rate);
  137.         this->device->set_rx_bandwidth(this->rate);
  138.         this->device->set_rx_antenna("RX2");
  139.  
  140.         //end timed commands
  141.         this->device->clear_command_time();
  142.  
  143.         while (not this->device->get_rx_sensor("lo_locked").to_bool()){
  144.             boost::this_thread::sleep(boost::posix_time::milliseconds(1));
  145.         }
  146.     }
  147.  
  148.     /**
  149.      * Issue timed stream command
  150.      * @param cmd_time Time to start stream
  151.      */
  152.     void start_stream(uhd::time_spec_t cmd_time) {
  153.         uhd::stream_cmd_t stream_cmd(uhd::stream_cmd_t::STREAM_MODE_START_CONTINUOUS);
  154.         stream_cmd.num_samps = size_t(0);// What is this?)
  155.         stream_cmd.stream_now = false;
  156.         stream_cmd.time_spec = cmd_time;
  157.         this->rx_stream->issue_stream_cmd(stream_cmd);
  158.     }
  159.  
  160.     /**
  161.      * Receive samples from device,
  162.      * log metadata's timestamp to std::out,
  163.      * check for errors and
  164.      * write to sink
  165.      * @return Returns "false" on fail: it will breaks record from this device
  166.      */
  167.     bool work() {
  168.         size_t num_rx_samps = this->rx_stream->recv(&buff.front(), buff.size(), md, 3.0, true);// Try TRUE
  169.  
  170.         std::stringstream ss;
  171.         ss << this->serial << " : ";
  172.         ss << md.time_spec.get_tick_count(1e6) << std::endl;
  173.         std::cout << ss.str();
  174.  
  175.         if (md.error_code == uhd::rx_metadata_t::ERROR_CODE_TIMEOUT) {
  176.             std::cout << log_prepend << boost::format("Timeout while streaming") << std::endl;
  177.         } else if (md.error_code == uhd::rx_metadata_t::ERROR_CODE_OVERFLOW) {
  178.             if (overflow_message) {
  179.                 overflow_message = false;
  180.                 std::cerr << log_prepend << boost::format(
  181.                         "Got an overflow indication. Please consider the following:\n"
  182.                                 "  Your write medium must sustain a rate of %fMB/s.\n"
  183.                                 "  Dropped samples will not be written to the file.\n"
  184.                                 "  Please modify this example for your purposes.\n"
  185.                                 "  This message will not appear again.\n"
  186.                 ) % (this->device->get_rx_rate() * sizeof(std::complex<float>) / 1e6);
  187.             }
  188.             return true;
  189.         } else if (md.error_code == uhd::rx_metadata_t::ERROR_CODE_NONE) {
  190. //            its o'k'?))
  191.         } else {
  192.             std::string error = str(boost::format("Uncaught error at %s %s") % (log_prepend, md.strerror()));
  193.             throw std::runtime_error(error);
  194.         }
  195.  
  196.         if (sink.is_open()) {
  197.             sink.write((const char *) &buff.front(), num_rx_samps * sizeof(std::complex<float>));
  198.         }
  199.  
  200.         return true;
  201.     }
  202.  
  203.     uhd::rx_streamer::sptr rx_stream;
  204.  
  205. private:
  206.     std::string serial;
  207.     double rate;
  208.     double freq;
  209.     double gain;
  210.  
  211.     std::vector<std::complex<float>> buff;
  212.     uhd::rx_metadata_t md;
  213.     bool overflow_message = true;
  214.  
  215.     std::string log_prepend;
  216.  
  217.     SinkNull sink;
  218. };
  219.  
  220. /**
  221.  * Function-thread to call in the "infinite" loop of the device's worker method.
  222.  * @param controller Device to work
  223.  */
  224. void deviceThreadFunc(USRPController *controller) {
  225.     while (not stop_signal_called) {
  226.         if (not controller->work()) break;
  227.     }
  228. }
  229.  
  230. /**
  231.  * Device fabric, used to return the created device to promise.
  232.  * @param p Promise
  233.  * @param serial Target device's serial
  234.  * @param rate Sample rate
  235.  * @param freq Frequency
  236.  * @param file_name Filename, used for "Sinks"
  237.  */
  238. void create_device(std::promise<USRPController *> &&p, std::string serial, double rate, double freq, std::string file_name) {
  239.     USRPController *device = new USRPController(serial, rate, freq, file_name);
  240.     p.set_value(device);
  241. }
  242.  
  243. /**
  244.  * @todo Refactor to use vectors
  245.  */
  246. const int DEVICES_LENGTH_MAX = 6;
  247. int DEVICES_LENGTH = 0;
  248.  
  249. /**
  250.  * Collection of device's serials
  251.  */
  252. std::string device_serials [DEVICES_LENGTH_MAX] = {
  253.         "a_____",
  254.         "bb____",
  255.         "ccc___",
  256.         "dddd__",
  257.         "eeeee_",
  258.         "ffffff"
  259. };
  260. //Promises for the return of the result of the work of the devices factory.
  261. std::promise<USRPController *> device_create_promises [DEVICES_LENGTH_MAX];
  262. std::future<USRPController *> device_create_futures [DEVICES_LENGTH_MAX];
  263. //Devices factory threads collection
  264. std::thread device_create_threads [DEVICES_LENGTH_MAX];
  265. //Array of created devices
  266. USRPController * devices_array [DEVICES_LENGTH_MAX];
  267. //Threads who used for "Work" (recv. samples, etc) in infinity loop
  268. std::thread work_threads [6];
  269.  
  270. /**
  271.  * Method for starting the creation of devices.
  272.  * @param name Nameof device, used in Sinks
  273.  * @param rate Sample rate
  274.  * @param freq Frequency
  275.  */
  276. void initialize_device(std::string name, double rate, double freq){
  277.     std::stringstream ss;
  278.     ss << "Create dev " << name << std::endl;
  279.     std::cout << ss.str();
  280.     device_create_futures[DEVICES_LENGTH] = device_create_promises[DEVICES_LENGTH].get_future();
  281.     device_create_threads[DEVICES_LENGTH] = std::thread(
  282.             &create_device,
  283.             std::move(device_create_promises[DEVICES_LENGTH]),
  284.             device_serials[DEVICES_LENGTH],
  285.             rate,
  286.             freq,
  287.             name
  288.     );
  289.     DEVICES_LENGTH++;
  290.     std::cout << "Done" << std::endl;
  291. }
  292.  
  293. /**
  294.  * Starts this all together
  295.  */
  296. void run_work(){
  297.     std::cout << "Create birth threads" << std::endl;
  298.     for (int i = 0; i < DEVICES_LENGTH; i++){
  299.         device_create_threads[i].join();
  300.     }
  301.  
  302.     std::cout << "Create futures" << std::endl;
  303.     for (int i = 0; i < DEVICES_LENGTH; i++){
  304.         devices_array[i] = device_create_futures[i].get();
  305.     }
  306.     USRPController * common_device = devices_array[0];
  307.  
  308.     std::cout << "Setting PPS time" << std::endl;
  309.     const uhd::time_spec_t last_pps_time = common_device->device->get_time_last_pps();
  310.     while (last_pps_time == common_device->device->get_time_last_pps()){
  311.         boost::this_thread::sleep(boost::posix_time::milliseconds(100));
  312.     }
  313.     uhd::time_spec_t clock_time = common_device->device->get_time_now() + uhd::time_spec_t(1.0);
  314.     for (int i = 0; i < DEVICES_LENGTH; i++){
  315.         devices_array[i]->device->set_time_next_pps(clock_time);
  316.     }
  317.     boost::this_thread::sleep(boost::posix_time::seconds(2));
  318.  
  319.     std::cout << "Initializing" << std::endl;
  320.     uhd::time_spec_t init_timeout = common_device->device->get_time_now() + uhd::time_spec_t(1.0);
  321.     boost::thread_group thread_group;
  322.     for (int i = 0; i < DEVICES_LENGTH; i++) {
  323.         thread_group.create_thread(boost::bind(&USRPController::initialize, devices_array[i], init_timeout));
  324.     }
  325.     thread_group.join_all();
  326.     boost::this_thread::sleep(boost::posix_time::seconds(2));
  327.  
  328.     std::cout << "Starting stream" << std::endl;
  329.     uhd::time_spec_t start_timeout = common_device->device->get_time_now() + uhd::time_spec_t(1.0);
  330.     for (int i = 0; i < DEVICES_LENGTH; i++){
  331.         devices_array[i]->start_stream(start_timeout);
  332.     }
  333.  
  334.     std::cout << "Make work threads" << std::endl;
  335.     for (int i = 0; i < DEVICES_LENGTH; i++){
  336.         work_threads[i] = std::thread(deviceThreadFunc, devices_array[i]);
  337.     }
  338.  
  339.     std::cout << "Done. Working..." << std::endl;
  340.     for (int i = 0; i < DEVICES_LENGTH; i++){
  341.         work_threads[i].join();
  342.     }
  343.  
  344.     std::cout << "Finished" << std::endl;
  345. }
  346.  
  347. int UHD_SAFE_MAIN(int argc, char *argv[]) {
  348.     uhd::set_thread_priority_safe();
  349.  
  350.     std::signal(SIGINT, &sig_int_handler);
  351.     std::signal(SIGHUP, &sig_int_handler);
  352.  
  353.     double sample_rate(40e6);
  354.     double freq(120e6);
  355.     initialize_device("_", sample_rate, freq);
  356.     initialize_device("_", sample_rate, freq);
  357.     initialize_device("_", sample_rate, freq);
  358.     initialize_device("_", sample_rate, freq);
  359.     initialize_device("_", sample_rate, freq);
  360.     initialize_device("_", sample_rate, freq);
  361.     run_work();
  362.  
  363.     return EXIT_SUCCESS;
  364. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement