Advertisement
nr0q

multihreadosm

Nov 11th, 2023
106
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 4.82 KB | None | 0 0
  1. #include <fstream>
  2. #include <iostream>
  3. #include <vector>
  4. #include <string>
  5. #include <future>
  6. #include <thread>
  7. #include <mutex>
  8. #include <condition_variable>
  9. #include <functional>
  10. #include <osmium/io/any_input.hpp>
  11. #include <osmium/handler.hpp>
  12. #include <osmium/visitor.hpp>
  13.  
  14. class WayData {
  15. public:
  16.     osmium::object_id_type id;
  17.     std::vector<std::pair<double, double>> nodes; // Store lon, lat pairs
  18.     std::string name;
  19.  
  20.     WayData(const osmium::Way& way) {
  21.         id = way.id();
  22.         for (const auto& node : way.nodes()) {
  23.             if (node.location().valid()) {
  24.                 nodes.emplace_back(node.lon(), node.lat());
  25.             }
  26.         }
  27.         const char* name_tag = way.tags()["name"];
  28.         if (name_tag) {
  29.             name = name_tag;
  30.         }
  31.     }
  32. };
  33.  
  34. std::string process_way(const WayData& wayData) {
  35.     std::string feature = "{ \"type\": \"Feature\", \"properties\": { \"id\": \"" + std::to_string(wayData.id) + "\", ";
  36.  
  37.     if (!wayData.name.empty()) {
  38.         feature += "\"name\": \"" + wayData.name + "\", ";
  39.     }
  40.  
  41.     feature += "\"geometry\": { \"type\": \"LineString\", \"coordinates\": [";
  42.     for (const auto& node : wayData.nodes) {
  43.         feature += "[" + std::to_string(node.first) + ", " + std::to_string(node.second) + "],";
  44.     }
  45.  
  46.     if (feature.back() == ',') {
  47.         feature.pop_back();
  48.     }
  49.  
  50.     feature += "] } }";
  51.     return feature;
  52. }
  53.  
  54. class ThreadPool {
  55. public:
  56.     ThreadPool(size_t threads) : stop(false) {
  57.         for(size_t i = 0; i < threads; ++i)
  58.             workers.emplace_back([this] {
  59.                 while(true) {
  60.                     std::function<void()> task;
  61.                     {
  62.                         std::unique_lock<std::mutex> lock(this->queue_mutex);
  63.                         this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); });
  64.                         if(this->stop && this->tasks.empty())
  65.                             return;
  66.                         task = std::move(this->tasks.front());
  67.                         this->tasks.pop();
  68.                     }
  69.                     task();
  70.                 }
  71.             });
  72.     }
  73.  
  74.     ~ThreadPool() {
  75.         {
  76.             std::unique_lock<std::mutex> lock(queue_mutex);
  77.             stop = true;
  78.         }
  79.         condition.notify_all();
  80.         for(std::thread &worker: workers)
  81.             worker.join();
  82.     }
  83.  
  84.     template<class F, class... Args>
  85.     auto enqueue(F&& f, Args&&... args)
  86.         -> std::future<typename std::result_of<F(Args...)>::type> {
  87.         using return_type = typename std::result_of<F(Args...)>::type;
  88.  
  89.         auto task = std::make_shared< std::packaged_task<return_type()> >(
  90.             std::bind(std::forward<F>(f), std::forward<Args>(args)...)
  91.         );
  92.        
  93.         std::future<return_type> res = task->get_future();
  94.         {
  95.             std::unique_lock<std::mutex> lock(queue_mutex);
  96.             if(stop)
  97.                 throw std::runtime_error("enqueue on stopped ThreadPool");
  98.  
  99.             tasks.emplace([task](){ (*task)(); });
  100.         }
  101.         condition.notify_one();
  102.         return res;
  103.     }
  104.  
  105. private:
  106.     std::vector< std::thread > workers;
  107.     std::queue< std::function<void()> > tasks;
  108.    
  109.     std::mutex queue_mutex;
  110.     std::condition_variable condition;
  111.     bool stop;
  112. };
  113.  
  114. int main(int argc, char* argv[]) {
  115.     int num_threads = std::thread::hardware_concurrency();
  116.     std::string input_file_name;
  117.  
  118.     for (int i = 1; i < argc; ++i) {
  119.         std::string arg = argv[i];
  120.         if (arg == "-j" && i + 1 < argc) {
  121.             num_threads = std::stoi(argv[++i]);
  122.         } else {
  123.             input_file_name = argv[i];
  124.         }
  125.     }
  126.  
  127.     if (input_file_name.empty()) {
  128.         std::cerr << "Usage: " << argv[0] << " [-j num_threads] OSM_FILE.osm.pbf" << std::endl;
  129.         return 1;
  130.     }
  131.  
  132.     osmium::io::File input_file(input_file_name);
  133.     osmium::io::Reader reader(input_file);
  134.  
  135.     ThreadPool pool(num_threads);
  136.     std::vector<std::future<std::string>> results;
  137.  
  138.     while (osmium::memory::Buffer buffer = reader.read()) {
  139.         for (const auto& way : buffer.select<osmium::Way>()) {
  140.             WayData wayData(way);
  141.             results.emplace_back(pool.enqueue(process_way, wayData));
  142.         }
  143.     }
  144.  
  145.     std::vector<std::string> features;
  146.     for (auto& result : results) {
  147.         std::string feature = result.get();
  148.         if (!feature.empty()) {
  149.             features.push_back(feature);
  150.         }
  151.     }
  152.  
  153.     std::ofstream file("output.geojson");
  154.     file << "{ \"type\": \"FeatureCollection\", \"features\": [";
  155.     for (const auto& feature : features) {
  156.         file << feature << ",";
  157.     }
  158.     if (!features.empty()) {
  159.         file.seekp(-1, std::ios_base::end);
  160.     }
  161.     file << "] }";
  162.     file.close();
  163.  
  164.     reader.close();
  165. }
  166.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement