Advertisement
Guest User

Untitled

a guest
Jun 18th, 2019
96
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 11.32 KB | None | 0 0
  1. #ifndef SIMPLE_DRIVER_H_
  2. #define SIMPLE_DRIVER_H_
  3.  
  4. #include "simple.decl.h"
  5. #include "common.h"
  6. #include <algorithm>
  7. #include <vector>
  8.  
  9. #include <numeric>
  10. #include "Reader.h"
  11. #include "Splitter.h"
  12. #include "TreePiece.h"
  13. #include "BoundingBox.h"
  14. #include "BufferedVec.h"
  15. #include "Utility.h"
  16. #include "TreeElement.h"
  17. #include "DensityVisitor.h"
  18. #include "GravityVisitor.h"
  19. #include "PressureVisitor.h"
  20. #include "CountVisitor.h"
  21. #include "CacheManager.h"
  22. #include "CountManager.h"
  23. #include "Resumer.h"
  24.  
  25. extern CProxy_Reader readers;
  26. extern int n_readers;
  27. extern double decomp_tolerance;
  28. extern int max_particles_per_tp; // for OCT decomposition
  29. extern int max_particles_per_leaf; // for local tree build
  30. extern int decomp_type;
  31. extern int tree_type;
  32. extern int num_iterations;
  33. extern int flush_period;
  34. extern CProxy_TreeElement<CentroidData> centroid_calculator;
  35. extern CProxy_CacheManager<CentroidData> centroid_cache;
  36. extern CProxy_Resumer<CentroidData> centroid_resumer;
  37. extern CProxy_CountManager count_manager;
  38. extern CProxy_Driver<CentroidData> centroid_driver;
  39.  
  40. template <typename Data>
  41. struct Comparator {
  42. bool operator() (const std::pair<Key, Data>& a, const std::pair<Key, Data>& b) {return a.first < b.first;}
  43. };
  44.  
  45. template <typename Data>
  46. class Driver : public CBase_Driver<Data> {
  47. public:
  48. CProxy_CacheManager<Data> cache_manager;
  49. std::vector<std::pair<Key, Data>> storage;
  50. bool storage_sorted;
  51.  
  52. Driver(CProxy_CacheManager<Data> cache_manageri) : cache_manager(cache_manageri), storage_sorted(false) {}
  53.  
  54. void countInts(int* intrn_counts) {
  55. CkPrintf("%d node-part interactions, %d part-part interactions\n", intrn_counts[0], intrn_counts[1] / 2);
  56. }
  57.  
  58. void recvTE(std::pair<Key, Data> param) {
  59. storage.emplace_back(param);
  60. }
  61. void loadCache(CkCallback cb) {
  62. CkPrintf("Broadcasting top %d levels to caches\n", num_share_levels);
  63. Comparator<Data> comp;
  64. std::sort(storage.begin(), storage.end(), comp);
  65. int send_size = storage.size();
  66. if (num_share_levels >= 0) {
  67. std::pair<Key, Data> to_search (1 << (LOG_BRANCH_FACTOR * num_share_levels), Data());
  68. send_size = std::lower_bound(storage.begin(), storage.end(), to_search, comp) - storage.begin();
  69. }
  70. cache_manager.recvStarterPack(storage.data(), send_size, cb);
  71. }
  72.  
  73. void load(Config config, CkCallback cb) {
  74. total_start_time = CkWallTimer();
  75. makeNewTree(0);
  76. cb.send();
  77. }
  78. void makeNewTree(int it) {
  79. // useful particle keys
  80. smallest_particle_key = Utility::removeLeadingZeros(Key(1));
  81. largest_particle_key = (~Key(0));
  82.  
  83. // load Tipsy data and build universe
  84. start_time = CkWallTimer();
  85. CkReductionMsg* result;
  86. if (it == 0) {
  87. readers.load(input_file, CkCallbackResumeThread((void*&)result));
  88. CkPrintf("[Driver, %d] Loading Tipsy data and building universe: %lf seconds\n", it, CkWallTimer() - start_time);
  89. }
  90. else {
  91. readers.computeUniverseBoundingBox(CkCallbackResumeThread((void*&)result));
  92. CkPrintf("[Driver, %d] Rebuilding universe: %lf seconds\n", it, CkWallTimer() - start_time);
  93. }
  94. universe = *((BoundingBox*)result->getData());
  95. delete result;
  96.  
  97. #ifdef DEBUG
  98. std::cout << "[Driver] Universal bounding box: " << universe << " with volume " << universe.box.volume() << std::endl;
  99. #endif
  100.  
  101. // assign keys and sort particles locally
  102. start_time = CkWallTimer();
  103. readers.assignKeys(universe, CkCallbackResumeThread());
  104. CkPrintf("[Driver, %d] Assigning keys and sorting particles: %lf seconds\n", it, CkWallTimer() - start_time);
  105.  
  106. start_time = CkWallTimer();
  107. findOctSplitters();
  108. std::sort(splitters.begin(), splitters.end());
  109. CkPrintf("[Driver, %d] Finding and sorting splitters: %lf seconds\n", it, CkWallTimer() - start_time);
  110. readers.setSplitters(splitters, CkCallbackResumeThread());
  111.  
  112. // create treepieces
  113. CkWaitQD();
  114. treepieces = CProxy_TreePiece<CentroidData>::ckNew(CkCallbackResumeThread(), universe.n_particles, n_treepieces, centroid_calculator, centroid_resumer, centroid_cache, centroid_driver, n_treepieces);
  115. CkWaitQD();
  116. CkPrintf("[Driver, %d] Created %d TreePieces\n", it, n_treepieces);
  117.  
  118. // flush particles to home TreePieces
  119. start_time = CkWallTimer();
  120. readers.flush(universe.n_particles, n_treepieces, treepieces);
  121. CkStartQD(CkCallbackResumeThread());
  122. CkPrintf("[Driver, %d] Flushing particles to TreePieces: %lf seconds\n", it, CkWallTimer() - start_time);
  123.  
  124. #ifdef DEBUG
  125. // check if all treepieces have received the right number of particles
  126. treepieces.check(CkCallbackResumeThread());
  127. #endif
  128.  
  129. // free splitter memory
  130. splitters.resize(0);
  131.  
  132. }
  133.  
  134. void sortStorage() {
  135. Comparator<Data> comp;
  136. std::sort(storage.begin(), storage.end(), comp);
  137. storage_sorted = true;
  138. }
  139.  
  140. template <typename Visitor>
  141. void prefetch(Data nodewide_data, int cm_index, TEHolder<Data> te_holder, CkCallback cb) {
  142. // do traversal on the root, send everything
  143. if (!storage_sorted) sortStorage();
  144. std::queue<int> node_indices; // better for cache. plus no requirement here on order
  145. node_indices.push(0);
  146. std::vector<std::pair<Key, Data>> to_send;
  147. Visitor v;
  148. Comparator<Data> comp;
  149. typename std::vector<std::pair<Key, Data> >::iterator it;
  150.  
  151. while (node_indices.size()) {
  152. std::pair<Key, Data> node = storage[node_indices.front()];
  153. node_indices.pop();
  154. to_send.push_back(node);
  155.  
  156. Node<Data> dummy_node1, dummy_node2;
  157. dummy_node1.data = node.second;
  158. dummy_node2.data = nodewide_data;
  159. if (v.cell(SourceNode<Data>(&dummy_node1), TargetNode<Data>(&dummy_node2))) {
  160.  
  161. for (int i = 0; i < BRANCH_FACTOR; i++) {
  162. Key key = node.first * BRANCH_FACTOR + i;
  163. it = std::lower_bound(storage.begin(), storage.end(), std::make_pair(key, Data()), comp);
  164. if (it != storage.end() && it->first == key) {
  165. node_indices.push(it - storage.begin());
  166. }
  167. }
  168. }
  169. }
  170. cache_manager[cm_index].recvStarterPack(to_send.data(), to_send.size(), cb);
  171. }
  172. void request(Key* request_list, int list_size, int cm_index, TEHolder<Data> te_holder, CkCallback cb) {
  173. if (!storage_sorted) sortStorage();
  174. Comparator<Data> comp;
  175. typename std::vector<std::pair<Key, Data> >::iterator it;
  176. std::vector<std::pair<Key, Data>> to_send;
  177. for (int i = 0; i < list_size; i++) {
  178. Key key = request_list[i];
  179. it = std::lower_bound(storage.begin(), storage.end(), std::make_pair(key, Data()), comp);
  180. if (it != storage.end() && it->first == key) {
  181. to_send.push_back(*it);
  182. }
  183. }
  184. cache_manager[cm_index].recvStarterPack(to_send.data(), to_send.size(), cb);
  185. }
  186.  
  187. void run(CkCallback cb, int num_iterations) {
  188. for (int it = 0; it < num_iterations; it++) {
  189. // start local tree build in TreePieces
  190. start_time = CkWallTimer();
  191. treepieces.build(true);
  192. CkWaitQD();
  193. CkPrintf("[Driver, %d] Local tree build: %lf seconds\n", it, CkWallTimer() - start_time);
  194. start_time = CkWallTimer();
  195. centroid_cache.startParentPrefetch(this->thisProxy, centroid_calculator, CkCallback::ignore);
  196. //centroid_cache.template startPrefetch<GravityVisitor>(this->thisProxy, centroid_calculator, CkCallback::ignore);
  197. //centroid_driver.loadCache(CkCallbackResumeThread());
  198. CkWaitQD();
  199. CkPrintf("[Driver, %d] TE cache loading: %lf seconds\n", it, CkWallTimer() - start_time);
  200.  
  201. // perform downward and upward traversals (Barnes-Hut)
  202. start_time = CkWallTimer();
  203. //treepieces.template startDown<GravityVisitor>();
  204. treepieces.template startUpAndDown<DensityVisitor>();
  205. CkWaitQD();
  206. treepieces.template startDown<PressureVisitor>();
  207. CkWaitQD();
  208. #if DELAYLOCAL
  209. //treepieces.processLocal(CkCallbackResumeThread());
  210. #endif
  211. CkPrintf("[Driver, %d] Traversal done: %lf seconds\n", it, CkWallTimer() - start_time);
  212. //start_time = CkWallTimer();
  213. //treepieces.interact(CkCallbackResumeThread());
  214. //CkPrintf("[Driver, %d] Interactions done: %lf seconds\n", it, CkWallTimer() - start_time);
  215. //count_manager.sum(CkCallback(CkReductionTarget(Main, terminate), thisProxy));
  216. start_time = CkWallTimer();
  217. bool complete_rebuild = (it % flush_period == flush_period-1);
  218. treepieces.perturb(0.1, complete_rebuild); // 0.1s for example
  219. CkWaitQD();
  220. CkPrintf("[Driver, %d] Perturbations done: %lf seconds\n", it, CkWallTimer() - start_time);
  221. if (complete_rebuild) {
  222. treepieces.ckDestroy();
  223. makeNewTree(it+1);
  224. }
  225. centroid_cache.destroy(true);
  226. centroid_resumer.destroy();
  227. storage.resize(0);
  228. storage_sorted = false;
  229. CkWaitQD();
  230. }
  231. cb.send();
  232. }
  233.  
  234. private:
  235. double total_start_time;
  236. double start_time;
  237. BoundingBox universe;
  238. Key smallest_particle_key;
  239. Key largest_particle_key;
  240.  
  241. std::vector<Splitter> splitters;
  242.  
  243. CProxy_TreePiece<CentroidData> treepieces; // cannot be a global variable
  244. int n_treepieces;
  245.  
  246. void findOctSplitters() {
  247. BufferedVec<Key> keys;
  248.  
  249. // initial splitter keys (first and last)
  250. keys.add(Key(1)); // 0000...1
  251. keys.add(~Key(0)); // 1111...1
  252. keys.buffer();
  253.  
  254. int decomp_particle_sum = 0; // to check if all particles are decomposed
  255.  
  256. // main decomposition loop
  257. while (keys.size() != 0) {
  258. // send splitters to Readers for histogramming
  259. CkReductionMsg *msg;
  260. readers.countOct(keys.get(), CkCallbackResumeThread((void*&)msg));
  261. int* counts = (int*)msg->getData();
  262. int n_counts = msg->getSize() / sizeof(int);
  263.  
  264. // check counts and create splitters if necessary
  265. Real threshold = (DECOMP_TOLERANCE * Real(max_particles_per_tp));
  266. for (int i = 0; i < n_counts; i++) {
  267. Key from = keys.get(2*i);
  268. Key to = keys.get(2*i+1);
  269.  
  270. int n_particles = counts[i];
  271. if ((Real)n_particles > threshold) {
  272. // create 8 more splitter key pairs to go one level deeper
  273. // leading zeros will be removed in Reader::count()
  274. // to compare splitter key with particle keys
  275. keys.add(from << 3);
  276. keys.add((from << 3) + 1);
  277.  
  278. keys.add((from << 3) + 1);
  279. keys.add((from << 3) + 2);
  280.  
  281. keys.add((from << 3) + 2);
  282. keys.add((from << 3) + 3);
  283.  
  284. keys.add((from << 3) + 3);
  285. keys.add((from << 3) + 4);
  286.  
  287. keys.add((from << 3) + 4);
  288. keys.add((from << 3) + 5);
  289.  
  290. keys.add((from << 3) + 5);
  291. keys.add((from << 3) + 6);
  292.  
  293. keys.add((from << 3) + 6);
  294. keys.add((from << 3) + 7);
  295.  
  296. keys.add((from << 3) + 7);
  297. if (to == (~Key(0)))
  298. keys.add(~Key(0));
  299. else
  300. keys.add(to << 3);
  301. }
  302. else {
  303. // create and store splitter
  304. Splitter sp(Utility::removeLeadingZeros(from),
  305. Utility::removeLeadingZeros(to), from, n_particles);
  306. splitters.push_back(sp);
  307.  
  308. // add up number of particles to check if all are flushed
  309. decomp_particle_sum += n_particles;
  310. }
  311. }
  312.  
  313. keys.buffer();
  314. delete msg;
  315. }
  316.  
  317. if (decomp_particle_sum != universe.n_particles) {
  318. CkPrintf("[Driver] ERROR! Only %d particles out of %d decomposed\n",
  319. decomp_particle_sum, universe.n_particles);
  320. CkAbort("Decomposition error");
  321. }
  322.  
  323. // determine number of TreePieces
  324. // override input from user if there was one
  325. n_treepieces = splitters.size();
  326. }
  327. };
  328.  
  329. #endif // SIMPLE_DRIVER_H_
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement