Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <cstdio>
- #include <cassert>
- #include <cmath>
- #include <cstring>
- #include <cstdint>
- #include <pthread.h>
- #include <unistd.h>
- #include <sys/eventfd.h>
- #include <sys/time.h>
- #include <string>
- #include <set>
- #include <map>
- #include <queue>
- using namespace std;
- #include "netcdf.h"
- #include "suite.h"
- #include "utils.h"
- struct abstract_worker_t
- {
- string name;
- signed int target_state;
- uint64_t background_mask;
- virtual ~abstract_worker_t() { }
- abstract_worker_t() { target_state = 1; background_mask = 0; }
- virtual void do_work() = 0;
- bool can_fork() { return background_mask & (1 << target_state); }
- void set_fail() { if (target_state>0) target_state = -target_state; }
- void set_pass() { if (target_state>0) target_state = 0; }
- bool is_finished() { return target_state <= 0; }
- bool is_failed() { return target_state < 0; }
- };
- struct worker_thread_t
- {
- abstract_worker_t *worker;
- int event; // file decriptor as in eventfd(2)
- pthread_t thread;
- pthread_attr_t thread_attr;
- static void *main(void *thread_data)
- {
- worker_thread_t *self = (worker_thread_t*) thread_data;
- while(not self->worker->is_finished() and self->worker->can_fork())
- self->worker->do_work();
- uint64_t one = 1;
- size_t res = write(self->event, &one, sizeof(uint64_t));
- if (res != sizeof(uint64_t))
- fprintf(stderr, "[%s]: write(event) failed, res=%lu: %m\n", self->worker->name.c_str(), res);
- pthread_exit(NULL);
- }
- worker_thread_t(abstract_worker_t *worker)
- {
- this->worker = worker;
- }
- bool start()
- {
- if ((event = eventfd(0, EFD_NONBLOCK)) < 0)
- {
- fprintf(stderr, "[%s]: eventfd() failed: %m\n", worker->name.c_str());
- return false;
- }
- if (int error = pthread_attr_init(&thread_attr))
- {
- fprintf(stderr, "[%s]: pthread_attr_init() failed: %s\n", worker->name.c_str(), strerror(error));
- return false;
- }
- if (int error = pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE))
- {
- fprintf(stderr, "[%s]: pthread_attr_setdetachstate() failed: %s\n", worker->name.c_str(), strerror(error));
- return false;
- }
- if (int error = pthread_create(&thread, &thread_attr, main, this))
- {
- fprintf(stderr, "[%s]: pthread_create() failed: %s\n", worker->name.c_str(), strerror(error));
- return false;
- }
- return true;
- }
- };
- void dispatch(const set<abstract_worker_t *> &workers)
- {
- set<abstract_worker_t *> finished;
- set<worker_thread_t *> running;
- queue<abstract_worker_t *> waiting;
- for (auto it=workers.begin(); it != workers.end(); ++it)
- waiting.push(*it);
- while (not waiting.empty() or not running.empty())
- {
- map<int, unsigned> stat;
- for (auto it=running.begin(); it!=running.end(); ++it)
- stat[(*it)->worker->target_state] ++;
- printf("stage 1. waiting=%lu; finished=%lu; running=%lu {", waiting.size(), finished.size(), running.size());
- for (auto it=stat.begin(); it!=stat.end(); ++it)
- printf("%s%i=>%u", it==stat.begin() ? "" : ", ", it->first, it->second);
- printf("}\n");
- // Stage 1: process waiting queue once
- unsigned n = waiting.size();
- for (unsigned i=0; i<n and not waiting.empty(); ++i)
- {
- abstract_worker_t *w = waiting.front();
- waiting.pop();
- if (w->is_finished())
- finished.insert(w);
- else if(w->can_fork())
- {
- worker_thread_t *th = new worker_thread_t(w);
- if (th->start())
- running.insert(th);
- else
- {
- w->set_fail();
- finished.insert(w);
- }
- }
- else
- {
- w->do_work();
- waiting.push(w);
- }
- }
- // printf("stage 2. waiting: %lu; running: %lu; finished: %lu\n", waiting.size(), running.size(), finished.size());
- // Stage 2: find finished threads
- set<worker_thread_t *> finished_threads;
- for (auto it=running.begin(); it!=running.end(); ++it)
- {
- worker_thread_t *th = *it;
- uint64_t one;
- ssize_t res = read(th->event, &one, sizeof(uint64_t));
- if (res >= 0) // can read => thread finished
- finished_threads.insert(th);
- else if (errno!=EAGAIN and errno!=EWOULDBLOCK) // unexpected error, something is *really* wrong
- {
- fprintf(stderr, "[%s]: read(event) failed: %m\n", th->worker->name.c_str());
- exit(1);
- }
- }
- // printf("stage 3. waiting: %lu; running: %lu; finished: %lu\n", waiting.size(), running.size(), finished.size());
- // Stage 3: join finished threads
- for (auto it=finished_threads.begin(); it!=finished_threads.end(); ++it)
- {
- worker_thread_t *th = *it;
- void *status;
- running.erase(th);
- if (int res = pthread_join(th->thread, &status)) // something is *really* wrong
- {
- fprintf(stderr, "[%s]: pthread_join() failed: %s\n", th->worker->name.c_str(), strerror(res));
- exit(1);
- }
- waiting.push(th->worker);
- delete th;
- }
- // printf("stage 4. waiting: %lu; running: %lu; finished: %lu\n", waiting.size(), running.size(), finished.size());
- // Stage 4: sleep a second if nothing else can be done
- if (waiting.empty() and not running.empty())
- {
- fd_set readfds;
- int max_fd = 0;
- struct timeval timeout;
- FD_ZERO(&readfds);
- unsigned descriptor_amount = 0;
- for (auto it=running.begin(); it != running.end(); ++it)
- {
- worker_thread_t *th = *it;
- FD_SET(th->event, &readfds);
- max_fd = std::max(max_fd, th->event);
- ++descriptor_amount;
- }
- timeout.tv_sec = 1, timeout.tv_usec = 0;
- printf("sleep for a second, descriptor_amount=%d\n", descriptor_amount);
- int res = select (max_fd+1, &readfds, NULL, NULL, &timeout);
- if (res < 0) // shouldn't happen normally, let's terminate
- {
- fprintf(stderr, "select() failed: %m\n");
- exit(1);
- }
- }
- }
- }
- static uint64_t urandom(uint64_t n)
- {
- assert(n>0);
- FILE *fp = fopen("/dev/urandom", "r");
- assert(fp != NULL);
- uint64_t x;
- size_t res1 = fread(&x, sizeof(uint64_t), 1, fp);
- assert(res1==1);
- int res2 = fclose(fp);
- assert(res2==0);
- return x % n;
- }
- double busy_wait(uint64_t n)
- {
- double sum = 0;
- uint64_t x = 1 + (n>0 ? urandom(n) : 1);
- for (uint64_t i=0; i<x; ++i)
- {
- struct timeval tv;
- gettimeofday(&tv, NULL);
- sum += tv.tv_sec + 1e-6*tv.tv_usec;
- }
- return sum / x;
- }
- struct simple_worker_t : public abstract_worker_t
- {
- double factorial, sum;
- simple_worker_t(const string &name)
- {
- this->name = name;
- background_mask = urandom(0x10000000) | urandom(0x10000000);
- sum = 0;
- factorial = 1;
- }
- void do_work()
- {
- assert(not is_finished());
- if (target_state > 20) // enough calculations
- {
- double diff = fabs(sum - 2.718281828459045);
- if (diff < 1e-8)
- set_pass();
- else
- set_fail();
- }
- else
- {
- if (can_fork())
- busy_wait(urandom(1));
- sum += 1.0 / factorial;
- factorial *= target_state;
- target_state ++;
- }
- }
- };
- struct netcdf_worker_t : public abstract_worker_t
- {
- string path;
- vector<double> data;
- int file_id;
- int group_id;
- int dim_ids[2];
- int var_id;
- const size_t dim1_size = 500, dim2_size = 200;
- const std::string group_name = "group_test";
- const std::string dim1_name = "dim_1";
- const std::string dim2_name = "dim_2";
- const std::string var_name = "variable_test";
- netcdf_worker_t(unsigned n, const string &dir, unsigned background_mask)
- {
- this->background_mask = background_mask;
- char buf[80];
- snprintf(buf, 80, "%06u", n);
- path = dir + "/" + "junk_" + buf + ".nc";
- this->name = path;
- }
- void do_work()
- {
- switch (target_state)
- {
- case 1: // Generate some data
- {
- data.resize(dim1_size*dim2_size);
- for (unsigned i=0, rnd=urandom(0x12345678); i < data.size(); ++i)
- data[i] = i * rnd + i * i;
- break;
- }
- case 2: // Open file for writing
- {
- int status = nc_create(path.c_str(), NC_WRITE | NC_NETCDF4, &file_id);
- if (status != NC_NOERR)
- {
- fprintf(stderr, "%s: nc_create() failed: status=%d (%s)\n", name.c_str(), status, nc_strerror(status));
- set_fail();
- return;
- }
- break;
- }
- case 3: // Define a group
- {
- int status = nc_def_grp(file_id, group_name.c_str(), &group_id);
- if (status != NC_NOERR)
- {
- fprintf(stderr, "%s: nc_def_grp() failed: status=%d (%s)\n", name.c_str(), status, nc_strerror(status));
- set_fail();
- return;
- }
- break;
- }
- case 4: // Define one dimension
- {
- int status = nc_def_dim(group_id, dim1_name.c_str(), dim1_size, &dim_ids[0]);
- if (status != NC_NOERR)
- {
- fprintf(stderr, "%s: first nc_def_dim() failed: status=%d (%s)\n", name.c_str(), status, nc_strerror(status));
- set_fail();
- return;
- }
- break;
- }
- case 5: // Define one more dimension
- {
- int status = nc_def_dim(group_id, dim2_name.c_str(), dim2_size, &dim_ids[1]);
- if (status != NC_NOERR)
- {
- fprintf(stderr, "%s: second nc_def_dim() failed: status=%d (%s)\n", name.c_str(), status, nc_strerror(status));
- set_fail();
- return;
- }
- break;
- }
- case 6: // Define a variable
- {
- int status = nc_def_var(group_id, var_name.c_str(), NC_DOUBLE, 2, dim_ids, &var_id);
- if (status != NC_NOERR)
- {
- fprintf(stderr, "%s: nc_def_var() failed: status=%d (%s)\n", name.c_str(), status, nc_strerror(status));
- set_fail();
- return;
- }
- break;
- }
- case 7: // Write the data to the file
- {
- // NOTE this is what could cause trouble if writing data to multiple files simultaniously!
- const size_t start[2] = {0, 0};
- const size_t count[2] = {dim1_size, dim2_size};
- int status = nc_put_vara(group_id, var_id, start, count, data.data());
- if (status != NC_NOERR)
- {
- fprintf(stderr, "%s: nc_put_vara() failed: status=%d (%s)\n", name.c_str(), status, nc_strerror(status));
- set_fail();
- return;
- }
- break;
- }
- case 8: // Close the file
- {
- int status = nc_close(file_id);
- if (status != NC_NOERR)
- {
- fprintf(stderr, "%s: nc_close() failed: status=%d (%s)\n", name.c_str(), status, nc_strerror(status));
- set_fail();
- return;
- }
- break;
- }
- case 9: // All done
- set_pass();
- return;
- default:
- fprintf(stderr, "%s: invalid target_state=%d (1..9 expected)", name.c_str(), target_state);
- return;
- }
- ++target_state;
- }
- };
- void write_in_threads(test_suite_t *suite)
- {
- printf("__LINE__=%u\n", __LINE__);
- unsigned int N = 500;
- printf("__LINE__=%u\n", __LINE__);
- set<abstract_worker_t *> tasks;
- printf("__LINE__=%u\n", __LINE__);
- char dir[580], cmd[580];
- printf("__LINE__=%u\n", __LINE__);
- const char *username = getenv("USER");
- printf("__LINE__=%u\n", __LINE__);
- if (username == NULL)
- username = "NOBODY";
- printf("__LINE__=%u\n", __LINE__);
- string dir_suffix = "";
- printf("__LINE__=%u\n", __LINE__);
- uint64_t background_mask = 0;
- printf("__LINE__=%u\n", __LINE__);
- if (const char *bm_env = getenv("WRITE_IN_THREADS_BACKGROUND_MASK"))
- {
- printf("__LINE__=%u\n", __LINE__);
- for (unsigned x=0; bm_env[x] and x<=63; ++x)
- if (bm_env[x] == 'Y')
- background_mask |= (1<<x);
- printf("__LINE__=%u\n", __LINE__);
- dir_suffix = (string)"." + bm_env;
- printf("__LINE__=%u\n", __LINE__);
- }
- printf("__LINE__=%u\n", __LINE__);
- // prepare the working dir
- snprintf(dir, sizeof(dir), "/junk/%s/write-in-threads%s", username, dir_suffix.c_str());
- printf("__LINE__=%u\n", __LINE__);
- snprintf(cmd, sizeof(cmd), "test -d '%s' || mkdir -p '%s'", dir, dir);
- printf("__LINE__=%u\n", __LINE__);
- system(cmd); // don't care about the result
- printf("__LINE__=%u\n", __LINE__);
- printf("using background_mask=0x%03lx\n", background_mask);
- printf("__LINE__=%u\n", __LINE__);
- for (unsigned i=0; i<N; ++i)
- tasks.insert(new netcdf_worker_t(i, dir, background_mask));
- dispatch(tasks);
- unsigned fail = 0, pass = 0, unfinished = 0;
- for (auto it = tasks.begin(); it != tasks.end(); ++it)
- {
- abstract_worker_t *w = *it;
- if (not w->is_finished())
- ++ unfinished;
- else if (not w->is_failed())
- ++ pass;
- else
- ++ fail;
- delete w;
- }
- test_check_value(unfinished, 0);
- test_check_value(fail, 0);
- test_check_value(pass, N);
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement