Advertisement
Guest User

Untitled

a guest
Apr 16th, 2023
87
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 10.52 KB | None | 0 0
  1. #include <curl/curl.h>
  2. #include <event2/event.h>
  3.  
  4. #include <iostream>
  5. #include <list>
  6. #include <sstream>
  7. #include <vector>
  8.  
  9. namespace {
  10.  
  11. constexpr const char* kUrl =
  12.     "https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/"
  13.     "BigBuckBunny.mp4";
  14.  
  15. constexpr int kRequestCount = 5;
  16. constexpr int kHttpVersion = CURL_HTTP_VERSION_2_0;
  17.  
  18. void TimeoutEvent(evutil_socket_t, short, void* handle);
  19.  
  20. template <typename T>
  21. T* CheckNotNull(T* p) {
  22.   if (!p) {
  23.     throw std::runtime_error("Unexpected null pointer.");
  24.   }
  25.   return p;
  26. }
  27.  
  28. void Check(int code) {
  29.   if (code < 0) {
  30.     std::stringstream stream;
  31.     stream << "Unexpected error code: " << code;
  32.     throw std::runtime_error(stream.str().c_str());
  33.   }
  34. }
  35.  
  36. void Check(CURLcode code) {
  37.   if (code != CURLE_OK) {
  38.     throw std::runtime_error(curl_easy_strerror(code));
  39.   }
  40. }
  41.  
  42. void Check(CURLMcode code) {
  43.   if (code != CURLM_OK) {
  44.     throw std::runtime_error(curl_multi_strerror(code));
  45.   }
  46. }
  47.  
  48. struct CurlMultiDeleter {
  49.   void operator()(CURLM* handle) const noexcept {
  50.     Check(curl_multi_cleanup(handle));
  51.   }
  52. };
  53.  
  54. struct CurlHandleDeleter {
  55.   void operator()(CURL* handle) const noexcept {
  56.     Check(curl_multi_remove_handle(multi_handle, handle));
  57.     curl_easy_cleanup(handle);
  58.   }
  59.  
  60.   CURLM* multi_handle;
  61. };
  62.  
  63. struct EventBaseDeleter {
  64.   void operator()(event_base* event_loop) const noexcept {
  65.     event_base_free(event_loop);
  66.   }
  67. };
  68.  
  69. struct EventDeleter {
  70.   void operator()(event* event) const noexcept {
  71.     Check(event_del(event));
  72.     event_free(event);
  73.   }
  74. };
  75.  
  76. class CurlGlobalInitializer {
  77.  public:
  78.   CurlGlobalInitializer() { Check(curl_global_init(CURL_GLOBAL_DEFAULT)); }
  79.   ~CurlGlobalInitializer() noexcept { curl_global_cleanup(); }
  80. };
  81.  
  82. struct MultiContext {
  83.   std::unique_ptr<CURLM, CurlMultiDeleter> handle{
  84.       CheckNotNull(curl_multi_init())};
  85.   event_base* event_loop;
  86.   std::unique_ptr<event, EventDeleter> timeout_event{
  87.       CheckNotNull(event_new(event_loop, /*fd=*/-1,
  88.                              /*events=*/0, TimeoutEvent, handle.get()))};
  89. };
  90.  
  91. struct HandleContext {
  92.   std::unique_ptr<CURL, CurlHandleDeleter> handle;
  93.   int bytes_read = 0;
  94.   event_base* event_loop;
  95.   MultiContext* context;
  96. };
  97.  
  98. auto MakeHandle(CURLM* multi_handle) {
  99.   CURL* handle = CheckNotNull(curl_easy_init());
  100.   if (CURLMcode code = curl_multi_add_handle(multi_handle, handle);
  101.       code != CURLM_OK) {
  102.     curl_easy_cleanup(handle);
  103.     throw std::runtime_error(curl_multi_strerror(code));
  104.   }
  105.   return std::unique_ptr<CURL, CurlHandleDeleter>(
  106.       handle, CurlHandleDeleter{multi_handle});
  107. }
  108.  
  109. void ProcessEvents(CURLM* multi_handle) {
  110.   CURLMsg* message;
  111.   do {
  112.     int message_count;
  113.     message = curl_multi_info_read(multi_handle, &message_count);
  114.     if (message && message->msg == CURLMSG_DONE) {
  115.       std::cerr << "TRANSFER DONE " << curl_easy_strerror(message->data.result)
  116.                 << '\n';
  117.     }
  118.   } while (message != nullptr);
  119. }
  120.  
  121. size_t WriteCallback(char* /*ptr*/, size_t size, size_t nmemb, void* userdata) {
  122.   auto* context = static_cast<HandleContext*>(userdata);
  123.   if (context->bytes_read != -1) {
  124.     context->bytes_read += size * nmemb;
  125.   }
  126.   if (context->bytes_read >= 16 * 1024 * 1024) {
  127.     context->bytes_read = -1;
  128.     std::cerr << "PAUSING " << context << '\n';
  129.     return CURL_WRITEFUNC_PAUSE;
  130.   }
  131.   return size * nmemb;
  132. }
  133.  
  134. void TimeoutEvent(evutil_socket_t, short, void* handle) {
  135.   int running_handles;
  136.   Check(curl_multi_socket_action(handle, CURL_SOCKET_TIMEOUT, 0,
  137.                                  &running_handles));
  138.   ProcessEvents(handle);
  139. }
  140.  
  141. void SocketEvent(evutil_socket_t fd, short event, void* handle) {
  142.   int running_handles;
  143.   Check(
  144.       curl_multi_socket_action(handle, fd,
  145.                                ((event & EV_READ) ? CURL_CSELECT_IN : 0) |
  146.                                    ((event & EV_WRITE) ? CURL_CSELECT_OUT : 0),
  147.                                &running_handles));
  148.   ProcessEvents(handle);
  149. }
  150.  
  151. int SocketCallback(CURL*, curl_socket_t socket, int what, void* userp,
  152.                    void* socketp) {
  153.   auto* context = reinterpret_cast<MultiContext*>(userp);
  154.   if (what == CURL_POLL_REMOVE) {
  155.     auto* data = reinterpret_cast<event*>(socketp);
  156.     if (data) {
  157.       event_free(data);
  158.     }
  159.   } else {
  160.     auto* data = reinterpret_cast<event*>(socketp);
  161.     if (data) {
  162.       event_free(data);
  163.     }
  164.     data = CheckNotNull(
  165.         event_new(context->event_loop, socket,
  166.                   static_cast<short>(((what & CURL_POLL_IN) ? EV_READ : 0) |
  167.                                      ((what & CURL_POLL_OUT) ? EV_WRITE : 0) |
  168.                                      EV_PERSIST),
  169.                   SocketEvent, context->handle.get()));
  170.     Check(curl_multi_assign(context->handle.get(), socket, data));
  171.     Check(event_add(data, /*timeout=*/nullptr));
  172.   }
  173.   return 0;
  174. }
  175.  
  176. int TimerCallback(CURLM*, long timeout_ms, void* userp) {
  177.   auto* http = reinterpret_cast<MultiContext*>(userp);
  178.   if (timeout_ms == -1) {
  179.     Check(event_del(http->timeout_event.get()));
  180.   } else {
  181.     timeval tv = {
  182.         .tv_sec = static_cast<decltype(tv.tv_sec)>(timeout_ms / 1000),
  183.         .tv_usec = static_cast<decltype(tv.tv_usec)>(timeout_ms % 1000 * 1000)};
  184.     Check(event_add(http->timeout_event.get(), &tv));
  185.   }
  186.   return 0;
  187. }
  188.  
  189. }  // namespace
  190.  
  191. int main() {
  192.   CurlGlobalInitializer initializer;
  193.  
  194.   std::cerr << curl_version() << '\n';
  195.  
  196.   std::unique_ptr<event_base, EventBaseDeleter> event_loop(
  197.       CheckNotNull(event_base_new()));
  198.   MultiContext context{.event_loop = event_loop.get()};
  199.   Check(curl_multi_setopt(context.handle.get(), CURLMOPT_SOCKETFUNCTION,
  200.                           SocketCallback));
  201.   Check(curl_multi_setopt(context.handle.get(), CURLMOPT_SOCKETDATA, &context));
  202.   Check(curl_multi_setopt(context.handle.get(), CURLMOPT_TIMERFUNCTION,
  203.                           TimerCallback));
  204.   Check(curl_multi_setopt(context.handle.get(), CURLMOPT_TIMERDATA, &context));
  205.  
  206.   std::list<HandleContext> handle_contexts{kRequestCount};
  207.   for (auto& handle_context : handle_contexts) {
  208.     handle_context.context = &context;
  209.     handle_context.event_loop = event_loop.get();
  210.  
  211.     CURL* handle =
  212.         (handle_context.handle = MakeHandle(context.handle.get())).get();
  213.     Check(curl_easy_setopt(handle, CURLOPT_URL, kUrl));
  214.     Check(curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, WriteCallback));
  215.     Check(curl_easy_setopt(handle, CURLOPT_WRITEDATA, &handle_context));
  216.     Check(curl_easy_setopt(handle, CURLOPT_HTTP_VERSION, kHttpVersion));
  217.   }
  218.   struct Data {
  219.     MultiContext* context;
  220.     event_base* event_loop;
  221.     std::list<HandleContext>* handle_contexts;
  222.   } data = {.context = &context,
  223.             .event_loop = event_loop.get(),
  224.             .handle_contexts = &handle_contexts};
  225.   {
  226.     timeval tv{.tv_sec = 3};
  227.     Check(event_base_once(
  228.         event_loop.get(), /*fd=*/-1, /*events=*/EV_TIMEOUT,
  229.         [](evutil_socket_t, short, void* userdata) {
  230.           auto* context = static_cast<Data*>(userdata);
  231.           for (int i = 0; i < kRequestCount; i++) {
  232.             HandleContext& handle_context =
  233.                 context->handle_contexts->emplace_back();
  234.             handle_context.context = context->context;
  235.             handle_context.event_loop = context->event_loop;
  236.  
  237.             CURL* handle = (handle_context.handle =
  238.                                 MakeHandle(context->context->handle.get()))
  239.                                .get();
  240.             Check(curl_easy_setopt(handle, CURLOPT_URL, kUrl));
  241.             Check(
  242.                 curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, WriteCallback));
  243.             Check(curl_easy_setopt(handle, CURLOPT_WRITEDATA, &handle_context));
  244.             Check(curl_easy_setopt(handle, CURLOPT_HTTP_VERSION, kHttpVersion));
  245.             timeval tv{.tv_sec = 4};
  246.             Check(event_base_once(
  247.                 context->event_loop, /*fd=*/-1, /*events=*/EV_TIMEOUT,
  248.                 [](evutil_socket_t, short, void* userdata) {
  249.                   auto* context = static_cast<HandleContext*>(userdata);
  250.                   std::cerr << "TIMEOUT DELAYED TRANSFER " << context << '\n';
  251.                   Check(curl_multi_remove_handle(context->context->handle.get(),
  252.                                                  context->handle.get()));
  253.                 },
  254.                 &handle_context, &tv));
  255.           }
  256.         },
  257.         &data, &tv));
  258.   }
  259.  
  260.   {
  261.     timeval tv{.tv_sec = 15};
  262.     Check(event_base_once(
  263.         event_loop.get(), /*fd=*/-1, /*events=*/EV_TIMEOUT,
  264.         [](evutil_socket_t, short, void* userdata) {
  265.           auto* context = static_cast<Data*>(userdata);
  266.           for (int i = 0; i < kRequestCount; i++) {
  267.             HandleContext& handle_context =
  268.                 context->handle_contexts->emplace_back();
  269.             handle_context.context = context->context;
  270.             handle_context.event_loop = context->event_loop;
  271.  
  272.             CURL* handle = (handle_context.handle =
  273.                                 MakeHandle(context->context->handle.get()))
  274.                                .get();
  275.             Check(curl_easy_setopt(handle, CURLOPT_URL, kUrl));
  276.             Check(
  277.                 curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, WriteCallback));
  278.             Check(curl_easy_setopt(handle, CURLOPT_WRITEDATA, &handle_context));
  279.             Check(curl_easy_setopt(handle, CURLOPT_HTTP_VERSION, kHttpVersion));
  280.             timeval tv{.tv_sec = 4};
  281.             Check(event_base_once(
  282.                 context->event_loop, /*fd=*/-1, /*events=*/EV_TIMEOUT,
  283.                 [](evutil_socket_t, short, void* userdata) {
  284.                   auto* context = static_cast<HandleContext*>(userdata);
  285.                   Check(curl_easy_pause(context->handle.get(), CURLPAUSE_CONT));
  286.                   std::cerr << "RESUMING TRANSFER " << context->handle.get()
  287.                             << '\n';
  288.                 },
  289.                 &handle_context, &tv));
  290.           }
  291.         },
  292.         &data, &tv));
  293.   }
  294.  
  295.   for (auto& handle_context : handle_contexts) {
  296.     timeval tv{.tv_sec = 2};
  297.     Check(event_base_once(
  298.         event_loop.get(), /*fd=*/-1, /*events=*/EV_TIMEOUT,
  299.         [](evutil_socket_t, short, void* userdata) {
  300.           auto* context = static_cast<HandleContext*>(userdata);
  301.           Check(curl_easy_pause(context->handle.get(), CURLPAUSE_CONT));
  302.           std::cerr << "RESUMING TRANSFER " << context->handle.get() << '\n';
  303.         },
  304.         &handle_context, &tv));
  305.   }
  306.   Check(event_base_loop(event_loop.get(), /*flags=*/0));
  307.  
  308.   return 0;
  309. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement