zhangsongcui

Log collector using libuv V2 ( without uvw )

Jan 28th, 2018
225
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 5.43 KB | None | 0 0
  1. #include <string>
  2. #include <cstdio>
  3. #include <unordered_map>
  4. #include <utility>
  5. #if __has_include(<optional>)
  6. #   include <optional>
  7. #else
  8. #   include <experimental/optional>
  9. namespace std {
  10.     using std::experimental::optional;
  11. }
  12. #endif
  13. #include <stdexcept>
  14. #include <boost/coroutine2/all.hpp>
  15. #include <uv.h>
  16.  
  17. struct raii_fs_t: uv_fs_t {
  18.   ~raii_fs_t() {
  19.     uv_fs_req_cleanup(this);
  20.   }
  21. };
  22.  
  23. struct async {
  24.     using coro_t = boost::coroutines2::coroutine<void>;
  25.    
  26.     template <typename UVT, typename Func, typename... Args>
  27.     UVT await(std::nothrow_t, Func* func, uv_loop_t* loop, Args... args) noexcept {
  28.         UVT req;
  29.         req.data = this;
  30.        
  31.         func(loop, &req, args..., (uv_fs_cb)[] (auto *handle) {
  32.             const auto that = (async *)handle->data;
  33.             (*that->pull)();
  34.         });
  35.    
  36.         (*this->push)();
  37.         return req;
  38.     }
  39.  
  40.     template <typename UVT, typename Func, typename... Args>
  41.     UVT await(Func* func, uv_loop_t* loop, Args... args) {
  42.         UVT req = await<UVT>(std::nothrow, func, loop, args...);
  43.        
  44.         if (req.result < 0) {
  45.             throw std::runtime_error(uv_strerror((int)req.result));
  46.         }
  47.         return req;
  48.     }
  49.    
  50.     // The callback should be noexcept
  51.     template <typename Callback>
  52.     static void run(Callback&& cb) {
  53.         // We cannot use std::make_unique here or we must make the constructor public
  54.         auto runner = std::unique_ptr<async>(new async(std::move(cb)));
  55.         runner->instanceHolder = std::move(runner);
  56.     }
  57.    
  58. private:
  59.     async(std::function<void (async&)>&& _cb): callback(std::move(_cb)) {
  60.         // Go into the subroutine here
  61.         this->pull = coro_t::pull_type([&](coro_t::push_type& _push) {
  62.             this->push = coro_t::push_type(std::move(_push));
  63.  
  64.             // All async code runs in the subroutine
  65.             this->callback(*this);
  66.            
  67.             // async instance is destructed here
  68.             this->instanceHolder = nullptr;
  69.         });
  70.     }
  71.    
  72.     async() = delete;
  73.     async(const async&) = delete;
  74.  
  75. private:
  76.     std::optional<coro_t::push_type> push;
  77.     std::optional<coro_t::pull_type> pull;
  78.    
  79. private:
  80.     // callback holder, to prevent lambda object from being destructed unexpectly
  81.     std::function<void (async&)> callback;
  82.     // instance holder, to prevent async instance itself from being destructed unexpectly
  83.     std::unique_ptr<async> instanceHolder;
  84. };
  85.  
  86. template <typename Callback>
  87. inline int
  88. wrap_fs_event_start(uv_fs_event_t* handle,
  89.                     Callback&& cb,
  90.                     const char* path,
  91.                     unsigned int flags) noexcept {
  92.     handle->data = &cb;
  93.     return uv_fs_event_start(handle, [](uv_fs_event_t* handle, const char* filename, int events, int status) {
  94.         Callback& cb = *(Callback *)handle->data;
  95.         cb(handle, filename, events, status);
  96.     }, path, flags);
  97. }
  98.  
  99. int main(int argc, char *argv[]) {
  100. //    if (argc != 2) {
  101. //        fputs("Usage: ./main FOLDER_TO_LISTEN\n", stderr);
  102. //        return 1;
  103. //    }
  104. //    std::string path2listen = argv[1];
  105.     std::string path2listen = "/Users/Carter/test";
  106.    
  107.     std::unordered_map<std::string, size_t> progresses;
  108.    
  109.     auto loop = uv_default_loop();
  110.     uv_fs_event_t fsEventHandle;
  111.     uv_fs_event_init(loop, &fsEventHandle);
  112.     wrap_fs_event_start(&fsEventHandle, [&](uv_fs_event_t* /*handle*/, const char* file_name, int events, int /*status*/) {
  113.         async::run([&, events, filename = std::string(file_name)](async& runner) noexcept {
  114.             // We should carefully hold other "closure variables" because the variables outside may have been destructed.
  115.             // Local variables are safe to use
  116.             std::printf("[Detected changes] %s: %s\n", filename.c_str(), (events == UV_RENAME ? "RENAME" : "CHANGE"));
  117.            
  118.             uv_file file = 0;
  119.            
  120.             try {
  121.                 const std::string fullPath = path2listen + "/" + filename;
  122.                 file = (uv_file)runner.await<raii_fs_t>(uv_fs_open, loop, fullPath.c_str(), UV_FS_O_RDONLY, S_IRUSR).result;
  123.                 const auto fileSize = runner.await<raii_fs_t>(uv_fs_fstat, loop, file).statbuf.st_size;
  124.  
  125.                 const auto iter = progresses.insert({ filename, 0 }).first;
  126.                 const auto progress = std::exchange(iter->second, fileSize);
  127.                
  128.                 const auto appendedSize = static_cast<unsigned int>(fileSize - progress);
  129.                 if (appendedSize) { // The changed contents may have been handled by other event.
  130.                     std::string fileContent(appendedSize, '\0');
  131.                     auto buf = uv_buf_init(&fileContent[0], (unsigned int)fileContent.size());
  132.                     runner.await<raii_fs_t>(uv_fs_read, loop, file, &buf, 1, progress);
  133.                     std::printf("[Contents appended]: %s\n", fileContent.c_str());
  134.                 }
  135.                
  136.                 runner.await<raii_fs_t>(uv_fs_close, loop, file);
  137.             } catch (std::runtime_error& e) {
  138.                 std::fprintf(stderr, "Error happens: %s\n", e.what());
  139.                 progresses.erase(filename);
  140.                
  141.                 if (file) {
  142.                     raii_fs_t req;
  143.                     uv_fs_close(loop, &req, file, nullptr);
  144.                 }
  145.             }
  146.         });
  147.     }, path2listen.c_str(), 0);
  148.    
  149.     uv_run(loop, UV_RUN_DEFAULT);
  150. }
Advertisement
Add Comment
Please, Sign In to add comment