Advertisement
homer512

sensor merge input

Apr 20th, 2024
418
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 3.28 KB | Source Code | 0 0
  1.  
  2. #include <chrono>
  3. #include <future>
  4. #include <ios>
  5. #include <memory>
  6. #include <optional>
  7. #include <string>
  8. #include <vector>
  9.  
  10. namespace bench {
  11.  
  12. using clock_t = std::chrono::system_clock;
  13.  
  14. struct SensorSample
  15. {
  16.     unsigned sensor_id;
  17.     clock_t::time_point timestamp;
  18.     std::string payload;
  19. };
  20.  
  21. struct SensorState
  22. {
  23.     std::vector<SensorSample> samples;
  24.     std::ios::pos_type next_read_pos;
  25.     bool eof;
  26. };
  27.  
  28. class CsvFile
  29. {
  30. public:
  31.     /**
  32.      * Reads several sensor messages, if available
  33.      *
  34.      * Will be called from multiple threads, should parallelize well.
  35.      * Batch size (number of messages) should be tuned for efficient inter-thread
  36.      * communication. Maybe 64 kiB worth of raw data as a starting point.
  37.      * But be careful: 64 kiB for 60,000 sensors is already 3.6 GiB
  38.      */
  39.     SensorState read_batch(unsigned sensor_id, std::ios::pos_type);
  40. };
  41.  
  42. class SensorReader
  43. {
  44.     std::shared_ptr<CsvFile> infile;
  45.     unsigned id;
  46.     SensorState current_state;
  47.     std::vector<SensorSample>::iterator next_pos;
  48.     std::future<SensorState> next_state;
  49.  
  50.     std::optional<SensorSample> next_slowpath();
  51. public:
  52.     SensorReader(const std::shared_ptr<CsvFile>& infile, unsigned id,
  53.                  std::ios::pos_type first_read_pos)
  54.     : infile(infile),
  55.       id(id),
  56.       current_state({{}, first_read_pos, false}),
  57.       next_pos(current_state.samples.end())
  58.     {}
  59.     /**
  60.      * Next sample for this sensor or empty result on EOF
  61.      */
  62.     std::optional<SensorSample> next()
  63.     {
  64.         if(next_pos != current_state.samples.end())
  65.             return std::move(*(next_pos++));
  66.         return next_slowpath();
  67.     }
  68. };
  69.  
  70. std::optional<SensorSample> SensorReader::next_slowpath()
  71. {
  72.     if(current_state.eof) // no more samples for this sensor in file
  73.         return {};
  74.     /*
  75.      * Normally, the next batch should have been queued up, except when
  76.      * this is the first time we read
  77.      */
  78.     if(next_state.valid())
  79.         current_state = next_state.get();
  80.     else
  81.         current_state = infile->read_batch(
  82.                 id, current_state.next_read_pos);
  83.     next_pos = current_state.samples.begin();
  84.     /*
  85.      * Queue up prefetching the next batch
  86.      *
  87.      * Potential optimizations:
  88.      * 1. If the batch contains a lot of dynamic memory allocations,
  89.      *    it might be worth moving the old state into the async function so
  90.      *    that it can be deallocated by another thread than the one doing the
  91.      *    sorting and merging
  92.      * 2. A custom work queue with worker threads allows threads to have
  93.      *    more state persistent between calls. This can improve locality of
  94.      *    data if memory or other resources like file descriptors are
  95.      *    associated with read operations.
  96.      *    However, It won't help with per-sensor state since we don't want
  97.      *    60k threads
  98.      */
  99.     if(! current_state.eof)
  100.         next_state = std::async([infile=this->infile, id=this->id,
  101.                                  read_pos=current_state.next_read_pos]() {
  102.                 return infile->read_batch(id, read_pos); });
  103.     /*
  104.      * Return result
  105.      */
  106.     if(next_pos != current_state.samples.end())
  107.         return std::move(*(next_pos++));
  108.     return {};
  109. }
  110.  
  111. } /* namespace bench */
  112.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement