zhangsongcui

Simple log collector using libuv

Jan 22nd, 2018
171
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 5.82 KB | None | 0 0
  1. #include <string>
  2. #include <cstdio>
  3. #include <unordered_map>
  4. #if __has_include(<optional>)
  5. #   include <optional>
  6. #else
  7. #   include <experimental/optional>
  8. namespace std {
  9.     using std::experimental::optional;
  10. }
  11. #endif
  12. #include <utility>
  13. #include <cassert>
  14. #include <boost/coroutine2/all.hpp>
  15. #include <set>
  16. #include "uvw.hpp"
  17.  
  18. struct async {
  19.     using coro_t = boost::coroutines2::coroutine<void>;
  20.  
  21.     template <typename EventType, typename EmitterType>
  22.     EventType await(std::shared_ptr<EmitterType>& emitter) {
  23.         int errorCode = 0;
  24.         std::optional<EventType> okEvent;
  25.  
  26.         typename EmitterType::template Connection<EventType> ok;
  27.  
  28.         auto err = emitter->template once<uvw::ErrorEvent>([&](uvw::ErrorEvent &error, auto&) {
  29.             emitter->template erase(ok);
  30.             errorCode = error.code();
  31.             (*this->pull)();
  32.         });
  33.         ok = emitter->template once<EventType>([&](EventType& event, auto&) {
  34.             // These events are emitted in main routine
  35.             emitter->template erase(err);
  36.             okEvent = std::move(event);
  37.  
  38.             // Continue async code
  39.             (*this->pull)();
  40.         });
  41.  
  42.         // Goes back to the main routine, which eventually gives the control back to libuv
  43.         (*this->push)();
  44.        
  45.         // `(*this->pull)()` inside the event goes here
  46.         assert((okEvent && !errorCode) || (!okEvent && errorCode));
  47.         return errorCode ? throw uvw::ErrorEvent(errorCode) : std::move(*okEvent);
  48.     }
  49.    
  50.     // The callback should be noexcept
  51.     template <typename Callback>
  52.     static void run(Callback&& cb) {
  53.         // We cannot use std::make_unique here or we must make the constructor public
  54.         auto runner = std::unique_ptr<async>(new async(std::move(cb)));
  55.         runner->instanceHolder = std::move(runner);
  56.     }
  57.    
  58. private:
  59.     async(std::function<void (async&)>&& _cb): callback(std::move(_cb)) {
  60.         // Go into the subroutine here
  61.         this->pull = coro_t::pull_type([&](coro_t::push_type& _push) {
  62.             this->push = coro_t::push_type(std::move(_push));
  63.  
  64.             // All async code runs in the subroutine
  65.             this->callback(*this);
  66.            
  67.             // async instance is destructed here
  68.             this->instanceHolder = nullptr;
  69.         });
  70.     }
  71.    
  72.     async() = delete;
  73.     async(const async&) = delete;
  74.  
  75. private:
  76.     std::optional<coro_t::push_type> push;
  77.     std::optional<coro_t::pull_type> pull;
  78.    
  79. private:
  80.     // callback holder, to prevent lambda object from being destructed unexpectly
  81.     std::function<void (async&)> callback;
  82.     // instance holder, to prevent async instance itself from being destructed unexpectly
  83.     std::unique_ptr<async> instanceHolder;
  84. };
  85.  
  86. using namespace std::literals;
  87.  
  88. int main(int argc, char *argv[]) {
  89.     using namespace uvw;
  90.  
  91.     if (argc != 2) {
  92.         fputs("Usage: ./main FOLDER_TO_LISTEN\n", stderr);
  93.         return 1;
  94.     }
  95.     const std::string path2listen = argv[1];
  96.    
  97.     auto loop = Loop::getDefault();
  98.     auto fsEventHandle = loop->resource<FsEventHandle>();
  99.     std::unordered_map<std::string, size_t> progresses;
  100.  
  101.     fsEventHandle->on<FsEventEvent>([&](FsEventEvent& event, FsEventHandle&) {
  102.         // This function will return at the first await call.
  103.         // It's expected behavior and unavoidable. Do NOT use its local variables after that.
  104.         async::run([&, filename = std::string(event.filename)](async& runner) noexcept {
  105.             // We should carefully hold other "closure variables" because the variables outside may have been destructed.
  106.             // Local variables are safe to use
  107.             const char *str;
  108.             switch (event.flags) {
  109.                 case (int)FsEventHandle::Watch::RENAME:
  110.                     str = "RENAME";
  111.                     break;
  112.  
  113.                 case (int)FsEventHandle::Watch::CHANGE:
  114.                     str = "CHANGE";
  115.                     break;
  116.  
  117.                 case (int)FsEventHandle::Watch::RENAME | (int)FsEventHandle::Watch::CHANGE:
  118.                     str = "RENAME | CHANGE";
  119.                     break;
  120.  
  121.                 default:
  122.                     str = "";
  123.                     break;
  124.             }
  125.             std::printf("[Detected changes] %s: %s\n", filename.c_str(), str);
  126.  
  127.             auto fileReq = loop->resource<FileReq>();
  128.  
  129.             try {
  130.                 fileReq->open(path2listen + "/" + filename, Flags<FileReq::FileOpen>(FileReq::FileOpen::RDONLY), S_IRUSR);
  131.                 runner.await<FsEvent<FsReq::Type::OPEN>>(fileReq); // Do NOT use `this->event` after the first await!
  132.  
  133.                 fileReq->stat();
  134.                 const auto fileSize = runner.await<FsEvent<FsReq::Type::FSTAT>>(fileReq).stat.st_size;
  135.  
  136.                 const auto iter = progresses.try_emplace(filename, 0).first;
  137.                 const auto progress = std::exchange(iter->second, fileSize);
  138.  
  139.                 const auto appendedSize = static_cast<unsigned int>(fileSize - progress);
  140.                 if (appendedSize) { // The changed contents may have been handled by other event.
  141.                     fileReq->read(progress, appendedSize);
  142.                     const auto readEvent = runner.await<FsEvent<FsReq::Type::READ>>(fileReq);
  143.                     const auto fileContent = std::string(readEvent.data.get(), readEvent.size);
  144.                     std::printf("[Contents appended]: %s\n", fileContent.c_str());
  145.                 }
  146.  
  147.                 fileReq->close();
  148.                 runner.await<FsEvent<FsReq::Type::CLOSE>>(fileReq);
  149.             } catch (ErrorEvent& e) {
  150.                 std::fprintf(stderr, "Error happens: %s\n", e.what());
  151.                 progresses.erase(filename);
  152.                 fileReq->closeSync();
  153.             }
  154.         });
  155.     });
  156.  
  157.     fsEventHandle->start(path2listen);
  158.     loop->run();
  159. }
Add Comment
Please, Sign In to add comment