SHARE
TWEET

Untitled

a guest Jun 18th, 2019 63 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. #ifndef SIMPLE_DRIVER_H_
  2. #define SIMPLE_DRIVER_H_
  3.  
  4. #include "simple.decl.h"
  5. #include "common.h"
  6. #include <algorithm>
  7. #include <vector>
  8.  
  9. #include <numeric>
  10. #include "Reader.h"
  11. #include "Splitter.h"
  12. #include "TreePiece.h"
  13. #include "BoundingBox.h"
  14. #include "BufferedVec.h"
  15. #include "Utility.h"
  16. #include "TreeElement.h"
  17. #include "DensityVisitor.h"
  18. #include "GravityVisitor.h"
  19. #include "PressureVisitor.h"
  20. #include "CountVisitor.h"
  21. #include "CacheManager.h"
  22. #include "CountManager.h"
  23. #include "Resumer.h"
  24.  
  25. extern CProxy_Reader readers;
  26. extern int n_readers;
  27. extern double decomp_tolerance;
  28. extern int max_particles_per_tp; // for OCT decomposition
  29. extern int max_particles_per_leaf; // for local tree build
  30. extern int decomp_type;
  31. extern int tree_type;
  32. extern int num_iterations;
  33. extern int flush_period;
  34. extern CProxy_TreeElement<CentroidData> centroid_calculator;
  35. extern CProxy_CacheManager<CentroidData> centroid_cache;
  36. extern CProxy_Resumer<CentroidData> centroid_resumer;
  37. extern CProxy_CountManager count_manager;
  38. extern CProxy_Driver<CentroidData> centroid_driver;
  39.  
  40. template <typename Data>
  41. struct Comparator {
  42.   bool operator() (const std::pair<Key, Data>& a, const std::pair<Key, Data>& b) {return a.first < b.first;}
  43. };
  44.  
  45. template <typename Data>
  46. class Driver : public CBase_Driver<Data> {
  47. public:
  48.   CProxy_CacheManager<Data> cache_manager;
  49.   std::vector<std::pair<Key, Data>> storage;
  50.   bool storage_sorted;
  51.  
  52.   Driver(CProxy_CacheManager<Data> cache_manageri) : cache_manager(cache_manageri), storage_sorted(false) {}
  53.  
  54.   void countInts(int* intrn_counts) {
  55.     CkPrintf("%d node-part interactions, %d part-part interactions\n", intrn_counts[0], intrn_counts[1] / 2);
  56.   }
  57.  
  58.   void recvTE(std::pair<Key, Data> param) {
  59.     storage.emplace_back(param);
  60.   }
  61.   void loadCache(CkCallback cb) {
  62.     CkPrintf("Broadcasting top %d levels to caches\n", num_share_levels);
  63.     Comparator<Data> comp;
  64.     std::sort(storage.begin(), storage.end(), comp);
  65.     int send_size = storage.size();
  66.     if (num_share_levels >= 0) {
  67.       std::pair<Key, Data> to_search (1 << (LOG_BRANCH_FACTOR * num_share_levels), Data());
  68.       send_size = std::lower_bound(storage.begin(), storage.end(), to_search, comp) - storage.begin();
  69.     }
  70.     cache_manager.recvStarterPack(storage.data(), send_size, cb);
  71.   }
  72.  
  73.   void load(Config config, CkCallback cb) {
  74.     total_start_time = CkWallTimer();
  75.     makeNewTree(0);
  76.     cb.send();
  77.   }
  78.   void makeNewTree(int it) {
  79.     // useful particle keys
  80.     smallest_particle_key = Utility::removeLeadingZeros(Key(1));
  81.     largest_particle_key = (~Key(0));
  82.  
  83.     // load Tipsy data and build universe
  84.     start_time = CkWallTimer();
  85.     CkReductionMsg* result;
  86.     if (it == 0) {
  87.       readers.load(input_file, CkCallbackResumeThread((void*&)result));
  88.       CkPrintf("[Driver, %d] Loading Tipsy data and building universe: %lf seconds\n", it, CkWallTimer() - start_time);
  89.     }
  90.     else {
  91.       readers.computeUniverseBoundingBox(CkCallbackResumeThread((void*&)result));
  92.       CkPrintf("[Driver, %d] Rebuilding universe: %lf seconds\n", it, CkWallTimer() - start_time);
  93.     }
  94.     universe = *((BoundingBox*)result->getData());
  95.     delete result;
  96.  
  97. #ifdef DEBUG
  98.     std::cout << "[Driver] Universal bounding box: " << universe << " with volume " << universe.box.volume() << std::endl;
  99. #endif
  100.  
  101.     // assign keys and sort particles locally
  102.     start_time = CkWallTimer();
  103.     readers.assignKeys(universe, CkCallbackResumeThread());
  104.     CkPrintf("[Driver, %d] Assigning keys and sorting particles: %lf seconds\n", it, CkWallTimer() - start_time);
  105.  
  106.     start_time = CkWallTimer();
  107.     findOctSplitters();
  108.     std::sort(splitters.begin(), splitters.end());
  109.     CkPrintf("[Driver, %d] Finding and sorting splitters: %lf seconds\n", it, CkWallTimer() - start_time);
  110.     readers.setSplitters(splitters, CkCallbackResumeThread());
  111.    
  112.     // create treepieces
  113.     CkWaitQD();
  114.     treepieces = CProxy_TreePiece<CentroidData>::ckNew(CkCallbackResumeThread(), universe.n_particles, n_treepieces, centroid_calculator, centroid_resumer, centroid_cache, centroid_driver, n_treepieces);
  115.     CkWaitQD();
  116.     CkPrintf("[Driver, %d] Created %d TreePieces\n", it, n_treepieces);
  117.  
  118.     // flush particles to home TreePieces
  119.     start_time = CkWallTimer();
  120.     readers.flush(universe.n_particles, n_treepieces, treepieces);
  121.     CkStartQD(CkCallbackResumeThread());
  122.     CkPrintf("[Driver, %d] Flushing particles to TreePieces: %lf seconds\n", it, CkWallTimer() - start_time);
  123.  
  124. #ifdef DEBUG
  125.     // check if all treepieces have received the right number of particles
  126.     treepieces.check(CkCallbackResumeThread());
  127. #endif
  128.  
  129.     // free splitter memory
  130.     splitters.resize(0);
  131.  
  132.   }
  133.  
  134.   void sortStorage() {
  135.     Comparator<Data> comp;
  136.     std::sort(storage.begin(), storage.end(), comp);
  137.     storage_sorted = true;
  138.   }
  139.  
  140.   template <typename Visitor>
  141.   void prefetch(Data nodewide_data, int cm_index, TEHolder<Data> te_holder, CkCallback cb) {
  142.     // do traversal on the root, send everything
  143.     if (!storage_sorted) sortStorage();
  144.     std::queue<int> node_indices; // better for cache. plus no requirement here on order
  145.     node_indices.push(0);
  146.     std::vector<std::pair<Key, Data>> to_send;
  147.     Visitor v;
  148.     Comparator<Data> comp;
  149.     typename std::vector<std::pair<Key, Data> >::iterator it;
  150.  
  151.     while (node_indices.size()) {
  152.       std::pair<Key, Data> node = storage[node_indices.front()];
  153.       node_indices.pop();
  154.       to_send.push_back(node);
  155.  
  156.       Node<Data> dummy_node1, dummy_node2;
  157.       dummy_node1.data = node.second;
  158.       dummy_node2.data = nodewide_data;
  159.       if (v.cell(SourceNode<Data>(&dummy_node1), TargetNode<Data>(&dummy_node2))) {
  160.  
  161.         for (int i = 0; i < BRANCH_FACTOR; i++) {
  162.           Key key = node.first * BRANCH_FACTOR + i;
  163.           it = std::lower_bound(storage.begin(), storage.end(), std::make_pair(key, Data()), comp);
  164.           if (it != storage.end() && it->first == key) {
  165.             node_indices.push(it - storage.begin());
  166.           }
  167.         }
  168.       }
  169.     }
  170.     cache_manager[cm_index].recvStarterPack(to_send.data(), to_send.size(), cb);
  171.   }
  172.   void request(Key* request_list, int list_size, int cm_index, TEHolder<Data> te_holder, CkCallback cb) {
  173.     if (!storage_sorted) sortStorage();
  174.     Comparator<Data> comp;
  175.     typename std::vector<std::pair<Key, Data> >::iterator it;
  176.     std::vector<std::pair<Key, Data>> to_send;
  177.     for (int i = 0; i < list_size; i++) {
  178.       Key key = request_list[i];
  179.       it = std::lower_bound(storage.begin(), storage.end(), std::make_pair(key, Data()), comp);
  180.       if (it != storage.end() && it->first == key) {
  181.         to_send.push_back(*it);
  182.       }
  183.     }
  184.     cache_manager[cm_index].recvStarterPack(to_send.data(), to_send.size(), cb);
  185.   }
  186.  
  187.   void run(CkCallback cb, int num_iterations) {
  188.     for (int it = 0; it < num_iterations; it++) {
  189.       // start local tree build in TreePieces
  190.       start_time = CkWallTimer();
  191.       treepieces.build(true);
  192.       CkWaitQD();
  193.       CkPrintf("[Driver, %d] Local tree build: %lf seconds\n", it, CkWallTimer() - start_time);
  194.       start_time = CkWallTimer();
  195.       centroid_cache.startParentPrefetch(this->thisProxy, centroid_calculator, CkCallback::ignore);
  196.       //centroid_cache.template startPrefetch<GravityVisitor>(this->thisProxy, centroid_calculator, CkCallback::ignore);
  197.       //centroid_driver.loadCache(CkCallbackResumeThread());
  198.       CkWaitQD();
  199.       CkPrintf("[Driver, %d] TE cache loading: %lf seconds\n", it, CkWallTimer() - start_time);
  200.  
  201.       // perform downward and upward traversals (Barnes-Hut)
  202.       start_time = CkWallTimer();
  203.       //treepieces.template startDown<GravityVisitor>();
  204.       treepieces.template startUpAndDown<DensityVisitor>();
  205.       CkWaitQD();
  206.       treepieces.template startDown<PressureVisitor>();
  207.       CkWaitQD();
  208. #if DELAYLOCAL
  209.       //treepieces.processLocal(CkCallbackResumeThread());
  210. #endif
  211.       CkPrintf("[Driver, %d] Traversal done: %lf seconds\n", it, CkWallTimer() - start_time);
  212.       //start_time = CkWallTimer();
  213.       //treepieces.interact(CkCallbackResumeThread());
  214.       //CkPrintf("[Driver, %d] Interactions done: %lf seconds\n", it, CkWallTimer() - start_time);
  215.       //count_manager.sum(CkCallback(CkReductionTarget(Main, terminate), thisProxy));
  216.       start_time = CkWallTimer();
  217.       bool complete_rebuild = (it % flush_period == flush_period-1);
  218.       treepieces.perturb(0.1, complete_rebuild); // 0.1s for example
  219.       CkWaitQD();
  220.       CkPrintf("[Driver, %d] Perturbations done: %lf seconds\n", it, CkWallTimer() - start_time);
  221.       if (complete_rebuild) {
  222.         treepieces.ckDestroy();
  223.         makeNewTree(it+1);
  224.       }
  225.       centroid_cache.destroy(true);
  226.       centroid_resumer.destroy();
  227.       storage.resize(0);
  228.       storage_sorted = false;
  229.       CkWaitQD();
  230.     }
  231.     cb.send();
  232.   }
  233.  
  234. private:
  235.   double total_start_time;
  236.   double start_time;
  237.   BoundingBox universe;
  238.   Key smallest_particle_key;
  239.   Key largest_particle_key;
  240.  
  241.   std::vector<Splitter> splitters;
  242.  
  243.   CProxy_TreePiece<CentroidData> treepieces; // cannot be a global variable
  244.   int n_treepieces;
  245.  
  246.   void findOctSplitters() {
  247.     BufferedVec<Key> keys;
  248.  
  249.     // initial splitter keys (first and last)
  250.     keys.add(Key(1)); // 0000...1
  251.     keys.add(~Key(0)); // 1111...1
  252.     keys.buffer();
  253.  
  254.     int decomp_particle_sum = 0; // to check if all particles are decomposed
  255.  
  256.     // main decomposition loop
  257.     while (keys.size() != 0) {
  258.       // send splitters to Readers for histogramming
  259.       CkReductionMsg *msg;
  260.       readers.countOct(keys.get(), CkCallbackResumeThread((void*&)msg));
  261.       int* counts = (int*)msg->getData();
  262.       int n_counts = msg->getSize() / sizeof(int);
  263.  
  264.       // check counts and create splitters if necessary
  265.       Real threshold = (DECOMP_TOLERANCE * Real(max_particles_per_tp));
  266.       for (int i = 0; i < n_counts; i++) {
  267.         Key from = keys.get(2*i);
  268.         Key to = keys.get(2*i+1);
  269.  
  270.         int n_particles = counts[i];
  271.         if ((Real)n_particles > threshold) {
  272.           // create 8 more splitter key pairs to go one level deeper
  273.           // leading zeros will be removed in Reader::count()
  274.           // to compare splitter key with particle keys
  275.           keys.add(from << 3);
  276.           keys.add((from << 3) + 1);
  277.  
  278.           keys.add((from << 3) + 1);
  279.           keys.add((from << 3) + 2);
  280.  
  281.           keys.add((from << 3) + 2);
  282.           keys.add((from << 3) + 3);
  283.  
  284.           keys.add((from << 3) + 3);
  285.           keys.add((from << 3) + 4);
  286.  
  287.           keys.add((from << 3) + 4);
  288.           keys.add((from << 3) + 5);
  289.  
  290.           keys.add((from << 3) + 5);
  291.           keys.add((from << 3) + 6);
  292.  
  293.           keys.add((from << 3) + 6);
  294.           keys.add((from << 3) + 7);
  295.  
  296.           keys.add((from << 3) + 7);
  297.           if (to == (~Key(0)))
  298.             keys.add(~Key(0));
  299.           else
  300.             keys.add(to << 3);
  301.         }
  302.         else {
  303.           // create and store splitter
  304.           Splitter sp(Utility::removeLeadingZeros(from),
  305.               Utility::removeLeadingZeros(to), from, n_particles);
  306.           splitters.push_back(sp);
  307.  
  308.           // add up number of particles to check if all are flushed
  309.           decomp_particle_sum += n_particles;
  310.         }
  311.       }
  312.  
  313.       keys.buffer();
  314.       delete msg;
  315.     }
  316.  
  317.     if (decomp_particle_sum != universe.n_particles) {
  318.       CkPrintf("[Driver] ERROR! Only %d particles out of %d decomposed\n",
  319.           decomp_particle_sum, universe.n_particles);
  320.       CkAbort("Decomposition error");
  321.     }
  322.  
  323.     // determine number of TreePieces
  324.     // override input from user if there was one
  325.     n_treepieces = splitters.size();
  326.   }
  327. };
  328.  
  329. #endif // SIMPLE_DRIVER_H_
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top