Advertisement
Guest User

Untitled

a guest
May 20th, 2017
103
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 14.72 KB | None | 0 0
  1. #include <cstdio>
  2. #include <cassert>
  3. #include <cmath>
  4. #include <cstring>
  5. #include <cstdint>
  6.  
  7. #include <pthread.h>
  8. #include <unistd.h>
  9.  
  10. #include <sys/eventfd.h>
  11. #include <sys/time.h>
  12.  
  13. #include <string>
  14. #include <set>
  15. #include <map>
  16. #include <queue>
  17. using namespace std;
  18.  
  19. #include "netcdf.h"
  20.  
  21. #include "suite.h"
  22. #include "utils.h"
  23.  
  24. struct abstract_worker_t
  25. {
  26.     string name;
  27.     signed int target_state;
  28.     uint64_t background_mask;
  29.  
  30.     virtual ~abstract_worker_t() { }
  31.     abstract_worker_t() { target_state = 1; background_mask = 0; }
  32.     virtual void do_work() = 0;
  33.     bool can_fork() { return background_mask & (1 << target_state); }
  34.     void set_fail() { if (target_state>0) target_state = -target_state; }
  35.     void set_pass() { if (target_state>0) target_state = 0; }
  36.     bool is_finished() { return target_state <= 0; }
  37.     bool is_failed() { return target_state < 0; }
  38. };
  39.  
  40. struct worker_thread_t
  41. {
  42.     abstract_worker_t *worker;
  43.     int event; // file decriptor as in eventfd(2)
  44.     pthread_t thread;
  45.     pthread_attr_t thread_attr;
  46.  
  47.     static void *main(void *thread_data)
  48.     {
  49.         worker_thread_t *self = (worker_thread_t*) thread_data;
  50.         while(not self->worker->is_finished() and self->worker->can_fork())
  51.             self->worker->do_work();
  52.         uint64_t one = 1;
  53.         size_t res = write(self->event, &one, sizeof(uint64_t));
  54.         if (res != sizeof(uint64_t))
  55.             fprintf(stderr, "[%s]: write(event) failed, res=%lu: %m\n", self->worker->name.c_str(), res);
  56.         pthread_exit(NULL);
  57.     }
  58.     worker_thread_t(abstract_worker_t *worker)
  59.     {
  60.         this->worker = worker;
  61.     }
  62.     bool start()
  63.     {
  64.         if ((event = eventfd(0, EFD_NONBLOCK)) < 0)
  65.         {
  66.             fprintf(stderr, "[%s]: eventfd() failed: %m\n", worker->name.c_str());
  67.             return false;
  68.         }
  69.         if (int error = pthread_attr_init(&thread_attr))
  70.         {
  71.             fprintf(stderr, "[%s]: pthread_attr_init() failed: %s\n", worker->name.c_str(), strerror(error));
  72.             return false;
  73.         }
  74.         if (int error = pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE))
  75.         {
  76.             fprintf(stderr, "[%s]: pthread_attr_setdetachstate() failed: %s\n", worker->name.c_str(), strerror(error));
  77.             return false;
  78.         }
  79.         if (int error = pthread_create(&thread, &thread_attr, main, this))
  80.         {
  81.             fprintf(stderr, "[%s]: pthread_create() failed: %s\n", worker->name.c_str(), strerror(error));
  82.             return false;
  83.         }
  84.  
  85.         return true;
  86.     }
  87. };
  88.  
  89. void dispatch(const set<abstract_worker_t *> &workers)
  90. {
  91.     set<abstract_worker_t *> finished;
  92.     set<worker_thread_t *> running;
  93.     queue<abstract_worker_t *> waiting;
  94.  
  95.     for (auto it=workers.begin(); it != workers.end(); ++it)
  96.         waiting.push(*it);
  97.  
  98.     while (not waiting.empty() or not running.empty())
  99.     {
  100.         map<int, unsigned> stat;
  101.         for (auto it=running.begin(); it!=running.end(); ++it)
  102.             stat[(*it)->worker->target_state] ++;
  103.  
  104.         printf("stage 1. waiting=%lu; finished=%lu; running=%lu {", waiting.size(), finished.size(), running.size());
  105.         for (auto it=stat.begin(); it!=stat.end(); ++it)
  106.             printf("%s%i=>%u", it==stat.begin() ? "" : ", ", it->first, it->second);
  107.         printf("}\n");
  108.  
  109.         // Stage 1: process waiting queue once
  110.         unsigned n = waiting.size();
  111.         for (unsigned i=0; i<n and not waiting.empty(); ++i)
  112.         {
  113.             abstract_worker_t *w = waiting.front();
  114.             waiting.pop();
  115.             if (w->is_finished())
  116.                 finished.insert(w);
  117.             else if(w->can_fork())
  118.             {
  119.                 worker_thread_t *th = new worker_thread_t(w);
  120.                 if (th->start())
  121.                     running.insert(th);
  122.                 else
  123.                 {
  124.                     w->set_fail();
  125.                     finished.insert(w);
  126.                 }
  127.             }
  128.             else
  129.             {
  130.                 w->do_work();
  131.                 waiting.push(w);
  132.             }
  133.         }
  134.  
  135.         // printf("stage 2. waiting: %lu; running: %lu; finished: %lu\n", waiting.size(), running.size(), finished.size());
  136.  
  137.         // Stage 2: find finished threads
  138.         set<worker_thread_t *> finished_threads;
  139.         for (auto it=running.begin(); it!=running.end(); ++it)
  140.         {
  141.             worker_thread_t *th = *it;
  142.             uint64_t one;
  143.             ssize_t res = read(th->event, &one, sizeof(uint64_t));
  144.             if (res >= 0) // can read => thread finished
  145.                 finished_threads.insert(th);
  146.             else if (errno!=EAGAIN and errno!=EWOULDBLOCK) // unexpected error, something is *really* wrong
  147.             {
  148.                 fprintf(stderr, "[%s]: read(event) failed: %m\n", th->worker->name.c_str());
  149.                 exit(1);
  150.             }
  151.         }
  152.  
  153.         // printf("stage 3. waiting: %lu; running: %lu; finished: %lu\n", waiting.size(), running.size(), finished.size());
  154.  
  155.         // Stage 3: join finished threads
  156.         for (auto it=finished_threads.begin(); it!=finished_threads.end(); ++it)
  157.         {
  158.             worker_thread_t *th = *it;
  159.             void *status;
  160.             running.erase(th);
  161.             if (int res = pthread_join(th->thread, &status)) // something is *really* wrong
  162.             {
  163.                 fprintf(stderr, "[%s]: pthread_join() failed: %s\n", th->worker->name.c_str(), strerror(res));
  164.                 exit(1);
  165.             }
  166.             waiting.push(th->worker);
  167.             delete th;
  168.         }
  169.  
  170.         // printf("stage 4. waiting: %lu; running: %lu; finished: %lu\n", waiting.size(), running.size(), finished.size());
  171.  
  172.         // Stage 4: sleep a second if nothing else can be done
  173.         if (waiting.empty() and not running.empty())
  174.         {
  175.             fd_set readfds;
  176.             int max_fd = 0;
  177.             struct timeval timeout;
  178.  
  179.             FD_ZERO(&readfds);
  180.             unsigned descriptor_amount = 0;
  181.             for (auto it=running.begin(); it != running.end(); ++it)
  182.             {
  183.                 worker_thread_t *th = *it;
  184.                 FD_SET(th->event, &readfds);
  185.                 max_fd = std::max(max_fd, th->event);
  186.                 ++descriptor_amount;
  187.             }
  188.  
  189.             timeout.tv_sec = 1, timeout.tv_usec = 0;
  190.             printf("sleep for a second, descriptor_amount=%d\n", descriptor_amount);
  191.             int res = select (max_fd+1, &readfds, NULL, NULL, &timeout);
  192.  
  193.             if (res < 0) // shouldn't happen normally, let's terminate
  194.             {
  195.                 fprintf(stderr, "select() failed: %m\n");
  196.                 exit(1);
  197.             }
  198.         }
  199.     }
  200. }
  201.  
  202. static uint64_t urandom(uint64_t n)
  203. {
  204.     assert(n>0);
  205.     FILE *fp = fopen("/dev/urandom", "r");
  206.     assert(fp != NULL);
  207.     uint64_t x;
  208.     size_t res1 = fread(&x, sizeof(uint64_t), 1, fp);
  209.     assert(res1==1);
  210.     int res2 = fclose(fp);
  211.     assert(res2==0);
  212.     return x % n;
  213. }
  214.  
  215. double busy_wait(uint64_t n)
  216. {
  217.     double sum = 0;
  218.     uint64_t x = 1 + (n>0 ? urandom(n) : 1);
  219.     for (uint64_t i=0; i<x; ++i)
  220.     {
  221.         struct timeval tv;
  222.         gettimeofday(&tv, NULL);
  223.         sum += tv.tv_sec + 1e-6*tv.tv_usec;
  224.     }
  225.     return sum / x;
  226. }
  227.  
  228. struct simple_worker_t : public abstract_worker_t
  229. {
  230.     double factorial, sum;
  231.     simple_worker_t(const string &name)
  232.     {
  233.         this->name = name;
  234.         background_mask = urandom(0x10000000) | urandom(0x10000000);
  235.         sum = 0;
  236.         factorial = 1;
  237.     }
  238.     void do_work()
  239.     {
  240.         assert(not is_finished());
  241.         if (target_state > 20) // enough calculations
  242.         {
  243.             double diff = fabs(sum - 2.718281828459045);
  244.             if (diff < 1e-8)
  245.                 set_pass();
  246.             else
  247.                 set_fail();
  248.         }
  249.         else
  250.         {
  251.             if (can_fork())
  252.                 busy_wait(urandom(1));
  253.             sum += 1.0 / factorial;
  254.             factorial *= target_state;
  255.             target_state ++;
  256.         }
  257.     }
  258. };
  259.  
  260. struct netcdf_worker_t : public abstract_worker_t
  261. {
  262.     string path;
  263.     vector<double> data;
  264.     int file_id;
  265.     int group_id;
  266.     int dim_ids[2];
  267.     int var_id;
  268.     const size_t dim1_size = 500, dim2_size = 200;
  269.     const std::string group_name = "group_test";
  270.     const std::string dim1_name = "dim_1";
  271.     const std::string dim2_name = "dim_2";
  272.     const std::string var_name = "variable_test";
  273.  
  274.     netcdf_worker_t(unsigned n, const string &dir, unsigned background_mask)
  275.     {
  276.         this->background_mask = background_mask;
  277.         char buf[80];
  278.         snprintf(buf, 80, "%06u", n);
  279.         path = dir + "/" + "junk_" + buf + ".nc";
  280.         this->name = path;
  281.     }
  282.  
  283.     void do_work()
  284.     {
  285.         switch (target_state)
  286.         {
  287.             case 1: // Generate some data
  288.             {
  289.                 data.resize(dim1_size*dim2_size);
  290.                 for (unsigned i=0, rnd=urandom(0x12345678); i < data.size(); ++i)
  291.                     data[i] = i * rnd + i * i;
  292.                 break;
  293.             }
  294.             case 2: // Open file for writing
  295.             {
  296.                 int status = nc_create(path.c_str(), NC_WRITE | NC_NETCDF4, &file_id);
  297.                 if (status != NC_NOERR)
  298.                 {
  299.                     fprintf(stderr, "%s: nc_create() failed: status=%d (%s)\n", name.c_str(), status, nc_strerror(status));
  300.                     set_fail();
  301.                     return;
  302.                 }
  303.                 break;
  304.             }
  305.             case 3: // Define a group
  306.             {
  307.                 int status = nc_def_grp(file_id, group_name.c_str(), &group_id);
  308.                 if (status != NC_NOERR)
  309.                 {
  310.                     fprintf(stderr, "%s: nc_def_grp() failed: status=%d (%s)\n", name.c_str(), status, nc_strerror(status));
  311.                     set_fail();
  312.                     return;
  313.                 }
  314.                 break;
  315.             }
  316.             case 4: // Define one dimension
  317.             {
  318.                 int status = nc_def_dim(group_id, dim1_name.c_str(), dim1_size, &dim_ids[0]);
  319.                 if (status != NC_NOERR)
  320.                 {
  321.                     fprintf(stderr, "%s: first nc_def_dim() failed: status=%d (%s)\n", name.c_str(), status, nc_strerror(status));
  322.                     set_fail();
  323.                     return;
  324.                 }
  325.                 break;
  326.             }
  327.             case 5: // Define one more dimension
  328.             {
  329.                 int status = nc_def_dim(group_id, dim2_name.c_str(), dim2_size, &dim_ids[1]);
  330.                 if (status != NC_NOERR)
  331.                 {
  332.                     fprintf(stderr, "%s: second nc_def_dim() failed: status=%d (%s)\n", name.c_str(), status, nc_strerror(status));
  333.                     set_fail();
  334.                     return;
  335.                 }
  336.                 break;
  337.             }
  338.             case 6: // Define a variable
  339.             {
  340.                 int status = nc_def_var(group_id, var_name.c_str(), NC_DOUBLE, 2, dim_ids, &var_id);
  341.                 if (status != NC_NOERR)
  342.                 {
  343.                     fprintf(stderr, "%s: nc_def_var() failed: status=%d (%s)\n", name.c_str(), status, nc_strerror(status));
  344.                     set_fail();
  345.                     return;
  346.                 }
  347.                 break;
  348.             }
  349.             case 7: // Write the data to the file
  350.             {
  351.                 // NOTE this is what could cause trouble if writing data to multiple files simultaniously!
  352.                 const size_t start[2] = {0, 0};
  353.                 const size_t count[2] = {dim1_size, dim2_size};
  354.                 int status = nc_put_vara(group_id, var_id, start, count, data.data());
  355.                 if (status != NC_NOERR)
  356.                 {
  357.                     fprintf(stderr, "%s: nc_put_vara() failed: status=%d (%s)\n", name.c_str(), status, nc_strerror(status));
  358.                     set_fail();
  359.                     return;
  360.                 }
  361.                 break;
  362.             }
  363.             case 8: // Close the file
  364.             {
  365.                 int status = nc_close(file_id);
  366.                 if (status != NC_NOERR)
  367.                 {
  368.                     fprintf(stderr, "%s: nc_close() failed: status=%d (%s)\n", name.c_str(), status, nc_strerror(status));
  369.                     set_fail();
  370.                     return;
  371.                 }
  372.                 break;
  373.             }
  374.             case 9: // All done
  375.                 set_pass();
  376.                 return;
  377.             default:
  378.                 fprintf(stderr, "%s: invalid target_state=%d (1..9 expected)", name.c_str(), target_state);
  379.                 return;
  380.         }
  381.         ++target_state;
  382.     }
  383. };
  384.  
  385. void write_in_threads(test_suite_t *suite)
  386. {
  387.     printf("__LINE__=%u\n", __LINE__);
  388.     unsigned int N = 500;
  389.     printf("__LINE__=%u\n", __LINE__);
  390.     set<abstract_worker_t *> tasks;
  391.     printf("__LINE__=%u\n", __LINE__);
  392.  
  393.     char dir[580], cmd[580];
  394.     printf("__LINE__=%u\n", __LINE__);
  395.  
  396.     const char *username = getenv("USER");
  397.     printf("__LINE__=%u\n", __LINE__);
  398.     if (username == NULL)
  399.         username = "NOBODY";
  400.     printf("__LINE__=%u\n", __LINE__);
  401.  
  402.     string dir_suffix = "";
  403.     printf("__LINE__=%u\n", __LINE__);
  404.  
  405.     uint64_t background_mask = 0;
  406.     printf("__LINE__=%u\n", __LINE__);
  407.     if (const char *bm_env = getenv("WRITE_IN_THREADS_BACKGROUND_MASK"))
  408.     {
  409.     printf("__LINE__=%u\n", __LINE__);
  410.         for (unsigned x=0; bm_env[x] and x<=63; ++x)
  411.             if (bm_env[x] == 'Y')
  412.                 background_mask |= (1<<x);
  413.     printf("__LINE__=%u\n", __LINE__);
  414.         dir_suffix = (string)"." + bm_env;
  415.     printf("__LINE__=%u\n", __LINE__);
  416.     }
  417.     printf("__LINE__=%u\n", __LINE__);
  418.  
  419.     // prepare the working dir
  420.     snprintf(dir, sizeof(dir), "/junk/%s/write-in-threads%s", username, dir_suffix.c_str());
  421.     printf("__LINE__=%u\n", __LINE__);
  422.     snprintf(cmd, sizeof(cmd), "test -d '%s' || mkdir -p '%s'", dir, dir);
  423.     printf("__LINE__=%u\n", __LINE__);
  424.     system(cmd); // don't care about the result
  425.     printf("__LINE__=%u\n", __LINE__);
  426.  
  427.     printf("using background_mask=0x%03lx\n", background_mask);
  428.     printf("__LINE__=%u\n", __LINE__);
  429.  
  430.     for (unsigned i=0; i<N; ++i)
  431.         tasks.insert(new netcdf_worker_t(i, dir, background_mask));
  432.  
  433.     dispatch(tasks);
  434.  
  435.     unsigned fail = 0, pass = 0, unfinished = 0;
  436.     for (auto it = tasks.begin(); it != tasks.end(); ++it)
  437.     {
  438.         abstract_worker_t *w = *it;
  439.         if (not w->is_finished())
  440.             ++ unfinished;
  441.         else if (not w->is_failed())
  442.             ++ pass;
  443.         else
  444.             ++ fail;
  445.         delete w;
  446.     }
  447.  
  448.     test_check_value(unfinished, 0);
  449.     test_check_value(fail, 0);
  450.     test_check_value(pass, N);
  451. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement