Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <ctype.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <list>
- #include <mutex>
- #include <string>
- #include <unordered_map>
- #include <unordered_set>
- #include <vector>
- #include <condition_variable>
- #include <functional>
- #include <future>
- #include <optional>
- #include <thread>
- struct notification_queue {
- /**
- * Work queue for a single thread
- * to implement task stealing
- */
- private:
- std::list<std::function<void()>> work;
- std::mutex mtx;
- std::condition_variable has_work;
- std::atomic<bool> done;
- public:
- void push(std::function<void()> task) {
- std::unique_lock<std::mutex> lck(mtx);
- work.push_back(task);
- has_work.notify_one();
- }
- std::optional<std::function<void()>> pop() {
- // hold until we get a job in or are told to finish
- std::unique_lock<std::mutex> lck(mtx);
- has_work.wait(lck, [=] { return work.size() || done; });
- if (!work.size())
- return {};
- auto ret = work.front();
- work.pop_front();
- return ret;
- }
- std::optional<std::function<void()>> try_pop() {
- std::unique_lock<std::mutex> lck(mtx, std::try_to_lock);
- if (!lck || !work.size())
- return {};
- auto ret = work.front();
- work.pop_front();
- return ret;
- }
- void set_done(bool arg) {
- done = arg;
- has_work.notify_all();
- }
- };
- /**
- * Thread pool implementation that assumes that its
- * operations run on threadsafe data structures,
- */
- struct workers {
- private:
- std::vector<std::thread> threads;
- std::vector<notification_queue> work_queues;
- std::atomic<int> _counter;
- int thread_count;
- public:
- workers(int thread_count) {
- this->thread_count = thread_count;
- this->_counter = 0;
- work_queues = std::vector<notification_queue>(thread_count);
- }
- void add_task(std::function<void()> task) {
- auto copy = _counter++;
- work_queues.at(copy % thread_count).push(task);
- }
- void do_work() {
- for (int i = 0; i < thread_count; i++) {
- threads.push_back(std::thread([=] {
- while (true) {
- std::optional<std::function<void()>> task;
- for (int j = 0; j < thread_count; j++) {
- if (j == i)
- continue;
- task = work_queues.at(j).try_pop();
- // if we manage to steal work leave and execute the function
- if (task.has_value()) {
- break;
- }
- }
- // check if we managed to steal a function
- if (task.has_value()) {
- task.value()();
- continue;
- }
- // we are getting work from our own queue
- // hold whilst we need to get the task
- task = work_queues.at(i).pop();
- if (!task.has_value()) {
- break;
- }
- task.value()();
- }
- }));
- }
- }
- /**
- * Tells the threads to begin finishing
- * the work that they have to do
- */
- void setFinish() {
- for (int i = 0; i < thread_count; i++) {
- work_queues.at(i).set_done(true);
- threads.at(i).join();
- }
- }
- };
- struct ts_unordered_map {
- private:
- std::unordered_map<std::string, std::list<std::string>> _results;
- std::mutex mtx;
- public:
- // thread safe wrapper to determine whether a given key is in the map
- bool contains(std::string arg) {
- std::unique_lock<std::mutex> lck(mtx);
- return _results.find(arg) != _results.end();
- }
- // threadsafe wrapper for adding new kv pairs to the map
- bool insert(const std::string &arg, std::list<std::string> values) {
- std::unique_lock<std::mutex> lck(mtx);
- _results.insert({arg, values});
- return true;
- }
- // thread safe wrapper for adding new values to a bucket
- void append(std::string target, std::string value) {
- std::unique_lock<std::mutex> lck(mtx);
- auto bucket = _results.find(target);
- // check if the bucket exists, if note exit
- if (bucket == _results.end())
- return;
- (bucket->second).push_back(value);
- }
- // return a pointer to a given bucket, not threadsafe but used in a single threaded
- //context
- std::list<std::string> *leak(std::string arg) { return &_results[arg]; }
- };
- std::vector<std::string> dirs;
- // work queue functionality is now in workers class
- ts_unordered_map theTable;
- void arg_binder(std::string filename, workers *threads);
- std::string dirName(const char *c_str) {
- std::string s = c_str; // s takes ownership of the string content by
- // allocating memory for it
- if (s.back() != '/') {
- s += '/';
- }
- return s;
- }
- std::pair<std::string, std::string> parseFile(const char *c_file) {
- std::string file = c_file;
- std::string::size_type pos = file.rfind('.');
- if (pos == std::string::npos) {
- return {file, ""};
- } else {
- return {file.substr(0, pos), file.substr(pos + 1)};
- }
- }
- // open file using the directory search path constructed in main()
- static FILE *openFile(const char *file) {
- FILE *fd;
- for (unsigned int i = 0; i < dirs.size(); i++) {
- std::string path = dirs[i] + file;
- fd = fopen(path.c_str(), "r");
- if (fd != NULL)
- return fd; // return the first file that successfully opens
- }
- return NULL;
- }
- /**
- * Generates specific tasks for a thread pool
- * given a filename, adds all of its dependencies as tasks
- */
- void generate_tasks(std::string filename, workers *threads) {
- char buf[4096], name[4096];
- // 1. open the file
- FILE *fd = openFile(filename.c_str());
- if (fd == NULL) {
- fprintf(stderr, "Error opening %s\n", filename.c_str());
- exit(-1);
- }
- while (fgets(buf, sizeof(buf), fd) != NULL) {
- char *p = buf;
- // 2a. skip leading whitespace
- while (isspace((int)*p)) {
- p++;
- }
- // 2b. if match #include
- if (strncmp(p, "#include", 8) != 0) {
- continue;
- }
- p += 8; // point to first character past #include
- // 2bi. skip leading whitespace
- while (isspace((int)*p)) {
- p++;
- }
- if (*p != '"') {
- continue;
- }
- // 2bii. next character is a "
- p++; // skip "
- // 2bii. collect remaining characters of file name
- char *q = name;
- while (*p != '\0') {
- if (*p == '"') {
- break;
- }
- *q++ = *p++;
- }
- *q = '\0';
- // 2bii. append file name to dependency list
- theTable.append(filename, name);
- // 2bii. if file name not already in table ...
- if (theTable.contains(name)) {
- continue;
- }
- // ... insert mapping from file name to empty list in table ...
- theTable.insert(name, {});
- // add a new task to the work queue for the threads
- arg_binder(name, threads);
- }
- // 3. close file
- fclose(fd);
- }
- /**
- * Binds arguements to a function
- * so threads simply execute functions
- * without passing args
- * mutually recurses with generate tasks
- */
- void arg_binder(std::string filename, workers *threads) {
- // add a single unit of work to the work queue
- threads->add_task([=]() { generate_tasks(filename, threads); });
- }
- // iteratively print dependencies
- static void printDependencies(std::unordered_set<std::string> *printed,
- std::list<std::string> *toProcess, FILE *fd) {
- if (!printed || !toProcess || !fd)
- return;
- // 1. while there is still a file in the toProcess list
- while (toProcess->size() > 0) {
- // 2. fetch next file to process
- std::string name = toProcess->front();
- toProcess->pop_front();
- // 3. lookup file in the table, yielding list of dependencies
- std::list<std::string> *ll = theTable.leak(name);
- // 4. iterate over dependencies
- for (auto iter = ll->begin(); iter != ll->end(); iter++) {
- // 4a. if filename is already in the printed table, continue
- if (printed->find(*iter) != printed->end()) {
- continue;
- }
- // 4b. print filename
- fprintf(fd, " %s", iter->c_str());
- // 4c. insert into printed
- printed->insert(*iter);
- // 4d. append to toProcess
- toProcess->push_back(*iter);
- }
- }
- }
- int main(int argc, char *argv[]) {
- // 1. look up CPATH in environment
- char *cpath = getenv("CPATH");
- char *crawl_str = getenv("CRAWLER_THREADS");
- int thread_count;
- if (!crawl_str)
- thread_count = 2;
- else
- thread_count = strtol(crawl_str, NULL, 10);
- if (thread_count <= 0) {
- fprintf(stderr,
- "ERR: %d is an invalid number of threads check CRAWLER_THREADS\n",
- thread_count);
- return -1;
- }
- // determine the number of -Idir arguments
- int i;
- for (i = 1; i < argc; i++) {
- if (strncmp(argv[i], "-I", 2) != 0)
- break;
- }
- int start = i;
- // 2. start assembling dirs vector
- dirs.push_back(dirName("./")); // always search current directory first
- for (i = 1; i < start; i++) {
- dirs.push_back(dirName(argv[i] + 2 /* skip -I */));
- }
- if (cpath != NULL) {
- std::string str(cpath);
- std::string::size_type last = 0;
- std::string::size_type next = 0;
- while ((next = str.find(":", last)) != std::string::npos) {
- dirs.push_back(str.substr(last, next - last));
- last = next + 1;
- }
- dirs.push_back(str.substr(last));
- }
- // 2. finished assembling dirs vector
- workers threads(thread_count);
- threads.do_work();
- // 3. for each file argument ...
- for (i = start; i < argc; i++) {
- std::pair<std::string, std::string> pair = parseFile(argv[i]);
- if (pair.second != "c" && pair.second != "y" && pair.second != "l") {
- fprintf(stderr, "Illegal extension: %s - must be .c, .y or .l\n",
- pair.second.c_str());
- return -1;
- }
- std::string obj = pair.first + ".o";
- // 3a. insert mapping from file.o to file.ext
- theTable.insert(obj, {argv[i]});
- // 3b. insert mapping from file.ext to empty list
- theTable.insert(argv[i], {});
- // 3c. add file to the work queue
- arg_binder(argv[i], &threads);
- }
- threads.setFinish();
- // 5. for each file argument
- for (i = start; i < argc; i++) {
- // 5a. create hash table in which to track file names already printed
- std::unordered_set<std::string> printed;
- // 5b. create list to track dependencies yet to print
- std::list<std::string> toProcess;
- std::pair<std::string, std::string> pair = parseFile(argv[i]);
- std::string obj = pair.first + ".o";
- // 5c. print "foo.o:" ...
- printf("%s:", obj.c_str());
- // 5c. ... insert "foo.o" into hash table and append to list
- printed.insert(obj);
- toProcess.push_back(obj);
- // 5d. invoke
- printDependencies(&printed, &toProcess, stdout);
- printf("\n");
- }
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement