Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /**
- * @filevierview
- * I use several devices and try to synchronize their time as much as possible, using an OctoClock as PPS reference and multithreading.
- */
- #include <uhd/utils/thread_priority.hpp>
- #include <uhd/utils/safe_main.hpp>
- #include <uhd/usrp/multi_usrp.hpp>
- #include <uhd/exception.hpp>
- #include <uhd/types/tune_request.hpp>
- #include <boost/program_options.hpp>
- #include <boost/format.hpp>
- #include <boost/thread.hpp>
- #include <iostream>
- #include <fstream>
- #include <thread>
- #include <csignal>
- #include <future>
- #include <sys/stat.h>
- #include <fcntl.h>
- #include <boost/variant/detail/substitute.hpp>
- //region Colorize
- /**
- * Just colorized std::output, nevermind
- */
- namespace Color {
- enum Code {
- FG_RED = 31,
- FG_GREEN = 32,
- FG_BLUE = 34,
- FG_DEFAULT = 39,
- BG_RED = 41,
- BG_GREEN = 42,
- BG_BLUE = 44,
- BG_DEFAULT = 49,
- S_BOLD = 1
- };
- class Modifier {
- Code code;
- public:
- Modifier(Code pCode) : code(pCode) {}
- friend std::ostream &
- operator<<(std::ostream &os, const Modifier &mod) {
- return os << "\033[" << mod.code << "m";
- }
- };
- }
- Color::Modifier red(Color::FG_RED);
- Color::Modifier green(Color::FG_GREEN);
- Color::Modifier blue(Color::FG_BLUE);
- Color::Modifier def(Color::FG_DEFAULT);
- Color::Modifier bold(Color::S_BOLD);
- //endregion
- static bool stop_signal_called = false;
- void sig_int_handler(int) {
- std::cout << green << "SIG INT HANDLED" << def << std::endl;
- stop_signal_called = true;
- }
- /**
- * The target in which data is written from the buffer.
- */
- class SinkBase{
- public:
- virtual void write(const char*, std::streamsize) = 0;
- virtual std::string get_full_file_name() = 0;
- virtual bool is_open() = 0;
- virtual void close() = 0;
- };
- /**
- * Write to "nothing"
- */
- class SinkNull{
- public:
- SinkNull(std::string file_name){
- }
- void write(const char* __s, std::streamsize __n){
- }
- bool is_open(){
- return true;
- }
- void close(){
- }
- std::string get_full_file_name(){
- return "NULL SINK";
- }
- };
- /**
- * Just a wrapper over the device.
- */
- class USRPController {
- public:
- uhd::usrp::multi_usrp::sptr device;
- USRPController(std::string serial, double rate, double freq, std::string file_name) :
- serial(serial), rate(rate), freq(freq), sink(file_name) {
- this->gain = 25;
- std::ostringstream string_stream;
- string_stream << bold << "[DEV " << blue << serial << def << "] ";
- log_prepend = string_stream.str();
- std::stringstream ss;
- ss << log_prepend << "Creating dev with rate " << rate << ", freq " << freq << ", file_name " << file_name << std::endl;
- std::cout << ss.str();
- this->device = uhd::usrp::multi_usrp::make("serial=" + serial);
- this->device->set_time_source("external");
- this->device->set_rx_subdev_spec(std::string("A:A"));
- std::string cpu_format = "fc32";
- std::string wire_format = "sc12";
- uhd::stream_args_t stream_args(cpu_format, wire_format);
- this->rx_stream = this->device->get_rx_stream(stream_args);
- size_t samps_per_buff = 10000;
- this->buff = std::vector<std::complex<float>>(samps_per_buff);
- }
- /**
- * Timed initialization of freq, gain, rate, etc.,
- * and wait for lo_locked
- * @param cmd_time Time to start command
- */
- void initialize(uhd::time_spec_t cmd_time){
- this->device->set_command_time(cmd_time);
- uhd::tune_request_t tune_request(this->freq);
- this->device->set_rx_freq(tune_request);
- this->device->set_rx_gain(this->gain);
- this->device->set_rx_rate(this->rate);
- this->device->set_rx_bandwidth(this->rate);
- this->device->set_rx_antenna("RX2");
- //end timed commands
- this->device->clear_command_time();
- while (not this->device->get_rx_sensor("lo_locked").to_bool()){
- boost::this_thread::sleep(boost::posix_time::milliseconds(1));
- }
- }
- /**
- * Issue timed stream command
- * @param cmd_time Time to start stream
- */
- void start_stream(uhd::time_spec_t cmd_time) {
- uhd::stream_cmd_t stream_cmd(uhd::stream_cmd_t::STREAM_MODE_START_CONTINUOUS);
- stream_cmd.num_samps = size_t(0);// What is this?)
- stream_cmd.stream_now = false;
- stream_cmd.time_spec = cmd_time;
- this->rx_stream->issue_stream_cmd(stream_cmd);
- }
- /**
- * Receive samples from device,
- * log metadata's timestamp to std::out,
- * check for errors and
- * write to sink
- * @return Returns "false" on fail: it will breaks record from this device
- */
- bool work() {
- size_t num_rx_samps = this->rx_stream->recv(&buff.front(), buff.size(), md, 3.0, true);// Try TRUE
- std::stringstream ss;
- ss << this->serial << " : ";
- ss << md.time_spec.get_tick_count(1e6) << std::endl;
- std::cout << ss.str();
- if (md.error_code == uhd::rx_metadata_t::ERROR_CODE_TIMEOUT) {
- std::cout << log_prepend << boost::format("Timeout while streaming") << std::endl;
- } else if (md.error_code == uhd::rx_metadata_t::ERROR_CODE_OVERFLOW) {
- if (overflow_message) {
- overflow_message = false;
- std::cerr << log_prepend << boost::format(
- "Got an overflow indication. Please consider the following:\n"
- " Your write medium must sustain a rate of %fMB/s.\n"
- " Dropped samples will not be written to the file.\n"
- " Please modify this example for your purposes.\n"
- " This message will not appear again.\n"
- ) % (this->device->get_rx_rate() * sizeof(std::complex<float>) / 1e6);
- }
- return true;
- } else if (md.error_code == uhd::rx_metadata_t::ERROR_CODE_NONE) {
- // its o'k'?))
- } else {
- std::string error = str(boost::format("Uncaught error at %s %s") % (log_prepend, md.strerror()));
- throw std::runtime_error(error);
- }
- if (sink.is_open()) {
- sink.write((const char *) &buff.front(), num_rx_samps * sizeof(std::complex<float>));
- }
- return true;
- }
- uhd::rx_streamer::sptr rx_stream;
- private:
- std::string serial;
- double rate;
- double freq;
- double gain;
- std::vector<std::complex<float>> buff;
- uhd::rx_metadata_t md;
- bool overflow_message = true;
- std::string log_prepend;
- SinkNull sink;
- };
- /**
- * Function-thread to call in the "infinite" loop of the device's worker method.
- * @param controller Device to work
- */
- void deviceThreadFunc(USRPController *controller) {
- while (not stop_signal_called) {
- if (not controller->work()) break;
- }
- }
- /**
- * Device fabric, used to return the created device to promise.
- * @param p Promise
- * @param serial Target device's serial
- * @param rate Sample rate
- * @param freq Frequency
- * @param file_name Filename, used for "Sinks"
- */
- void create_device(std::promise<USRPController *> &&p, std::string serial, double rate, double freq, std::string file_name) {
- USRPController *device = new USRPController(serial, rate, freq, file_name);
- p.set_value(device);
- }
- /**
- * @todo Refactor to use vectors
- */
- const int DEVICES_LENGTH_MAX = 6;
- int DEVICES_LENGTH = 0;
- /**
- * Collection of device's serials
- */
- std::string device_serials [DEVICES_LENGTH_MAX] = {
- "a_____",
- "bb____",
- "ccc___",
- "dddd__",
- "eeeee_",
- "ffffff"
- };
- //Promises for the return of the result of the work of the devices factory.
- std::promise<USRPController *> device_create_promises [DEVICES_LENGTH_MAX];
- std::future<USRPController *> device_create_futures [DEVICES_LENGTH_MAX];
- //Devices factory threads collection
- std::thread device_create_threads [DEVICES_LENGTH_MAX];
- //Array of created devices
- USRPController * devices_array [DEVICES_LENGTH_MAX];
- //Threads who used for "Work" (recv. samples, etc) in infinity loop
- std::thread work_threads [6];
- /**
- * Method for starting the creation of devices.
- * @param name Nameof device, used in Sinks
- * @param rate Sample rate
- * @param freq Frequency
- */
- void initialize_device(std::string name, double rate, double freq){
- std::stringstream ss;
- ss << "Create dev " << name << std::endl;
- std::cout << ss.str();
- device_create_futures[DEVICES_LENGTH] = device_create_promises[DEVICES_LENGTH].get_future();
- device_create_threads[DEVICES_LENGTH] = std::thread(
- &create_device,
- std::move(device_create_promises[DEVICES_LENGTH]),
- device_serials[DEVICES_LENGTH],
- rate,
- freq,
- name
- );
- DEVICES_LENGTH++;
- std::cout << "Done" << std::endl;
- }
- /**
- * Starts this all together
- */
- void run_work(){
- std::cout << "Create birth threads" << std::endl;
- for (int i = 0; i < DEVICES_LENGTH; i++){
- device_create_threads[i].join();
- }
- std::cout << "Create futures" << std::endl;
- for (int i = 0; i < DEVICES_LENGTH; i++){
- devices_array[i] = device_create_futures[i].get();
- }
- USRPController * common_device = devices_array[0];
- std::cout << "Setting PPS time" << std::endl;
- const uhd::time_spec_t last_pps_time = common_device->device->get_time_last_pps();
- while (last_pps_time == common_device->device->get_time_last_pps()){
- boost::this_thread::sleep(boost::posix_time::milliseconds(100));
- }
- uhd::time_spec_t clock_time = common_device->device->get_time_now() + uhd::time_spec_t(1.0);
- for (int i = 0; i < DEVICES_LENGTH; i++){
- devices_array[i]->device->set_time_next_pps(clock_time);
- }
- boost::this_thread::sleep(boost::posix_time::seconds(2));
- std::cout << "Initializing" << std::endl;
- uhd::time_spec_t init_timeout = common_device->device->get_time_now() + uhd::time_spec_t(1.0);
- boost::thread_group thread_group;
- for (int i = 0; i < DEVICES_LENGTH; i++) {
- thread_group.create_thread(boost::bind(&USRPController::initialize, devices_array[i], init_timeout));
- }
- thread_group.join_all();
- boost::this_thread::sleep(boost::posix_time::seconds(2));
- std::cout << "Starting stream" << std::endl;
- uhd::time_spec_t start_timeout = common_device->device->get_time_now() + uhd::time_spec_t(1.0);
- for (int i = 0; i < DEVICES_LENGTH; i++){
- devices_array[i]->start_stream(start_timeout);
- }
- std::cout << "Make work threads" << std::endl;
- for (int i = 0; i < DEVICES_LENGTH; i++){
- work_threads[i] = std::thread(deviceThreadFunc, devices_array[i]);
- }
- std::cout << "Done. Working..." << std::endl;
- for (int i = 0; i < DEVICES_LENGTH; i++){
- work_threads[i].join();
- }
- std::cout << "Finished" << std::endl;
- }
- int UHD_SAFE_MAIN(int argc, char *argv[]) {
- uhd::set_thread_priority_safe();
- std::signal(SIGINT, &sig_int_handler);
- std::signal(SIGHUP, &sig_int_handler);
- double sample_rate(40e6);
- double freq(120e6);
- initialize_device("_", sample_rate, freq);
- initialize_device("_", sample_rate, freq);
- initialize_device("_", sample_rate, freq);
- initialize_device("_", sample_rate, freq);
- initialize_device("_", sample_rate, freq);
- initialize_device("_", sample_rate, freq);
- run_work();
- return EXIT_SUCCESS;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement