Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <curl/curl.h>
- #include <event2/event.h>
- #include <iostream>
- #include <list>
- #include <sstream>
- #include <vector>
- namespace {
- constexpr const char* kUrl =
- "https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/"
- "BigBuckBunny.mp4";
- constexpr int kRequestCount = 5;
- constexpr int kHttpVersion = CURL_HTTP_VERSION_2_0;
- void TimeoutEvent(evutil_socket_t, short, void* handle);
- template <typename T>
- T* CheckNotNull(T* p) {
- if (!p) {
- throw std::runtime_error("Unexpected null pointer.");
- }
- return p;
- }
- void Check(int code) {
- if (code < 0) {
- std::stringstream stream;
- stream << "Unexpected error code: " << code;
- throw std::runtime_error(stream.str().c_str());
- }
- }
- void Check(CURLcode code) {
- if (code != CURLE_OK) {
- throw std::runtime_error(curl_easy_strerror(code));
- }
- }
- void Check(CURLMcode code) {
- if (code != CURLM_OK) {
- throw std::runtime_error(curl_multi_strerror(code));
- }
- }
- struct CurlMultiDeleter {
- void operator()(CURLM* handle) const noexcept {
- Check(curl_multi_cleanup(handle));
- }
- };
- struct CurlHandleDeleter {
- void operator()(CURL* handle) const noexcept {
- Check(curl_multi_remove_handle(multi_handle, handle));
- curl_easy_cleanup(handle);
- }
- CURLM* multi_handle;
- };
- struct EventBaseDeleter {
- void operator()(event_base* event_loop) const noexcept {
- event_base_free(event_loop);
- }
- };
- struct EventDeleter {
- void operator()(event* event) const noexcept {
- Check(event_del(event));
- event_free(event);
- }
- };
- class CurlGlobalInitializer {
- public:
- CurlGlobalInitializer() { Check(curl_global_init(CURL_GLOBAL_DEFAULT)); }
- ~CurlGlobalInitializer() noexcept { curl_global_cleanup(); }
- };
- struct MultiContext {
- std::unique_ptr<CURLM, CurlMultiDeleter> handle{
- CheckNotNull(curl_multi_init())};
- event_base* event_loop;
- std::unique_ptr<event, EventDeleter> timeout_event{
- CheckNotNull(event_new(event_loop, /*fd=*/-1,
- /*events=*/0, TimeoutEvent, handle.get()))};
- };
- struct HandleContext {
- std::unique_ptr<CURL, CurlHandleDeleter> handle;
- int bytes_read = 0;
- event_base* event_loop;
- MultiContext* context;
- };
- auto MakeHandle(CURLM* multi_handle) {
- CURL* handle = CheckNotNull(curl_easy_init());
- if (CURLMcode code = curl_multi_add_handle(multi_handle, handle);
- code != CURLM_OK) {
- curl_easy_cleanup(handle);
- throw std::runtime_error(curl_multi_strerror(code));
- }
- return std::unique_ptr<CURL, CurlHandleDeleter>(
- handle, CurlHandleDeleter{multi_handle});
- }
- void ProcessEvents(CURLM* multi_handle) {
- CURLMsg* message;
- do {
- int message_count;
- message = curl_multi_info_read(multi_handle, &message_count);
- if (message && message->msg == CURLMSG_DONE) {
- std::cerr << "TRANSFER DONE " << curl_easy_strerror(message->data.result)
- << '\n';
- }
- } while (message != nullptr);
- }
- size_t WriteCallback(char* /*ptr*/, size_t size, size_t nmemb, void* userdata) {
- auto* context = static_cast<HandleContext*>(userdata);
- if (context->bytes_read != -1) {
- context->bytes_read += size * nmemb;
- }
- if (context->bytes_read >= 16 * 1024 * 1024) {
- context->bytes_read = -1;
- std::cerr << "PAUSING " << context << '\n';
- return CURL_WRITEFUNC_PAUSE;
- }
- return size * nmemb;
- }
- void TimeoutEvent(evutil_socket_t, short, void* handle) {
- int running_handles;
- Check(curl_multi_socket_action(handle, CURL_SOCKET_TIMEOUT, 0,
- &running_handles));
- ProcessEvents(handle);
- }
- void SocketEvent(evutil_socket_t fd, short event, void* handle) {
- int running_handles;
- Check(
- curl_multi_socket_action(handle, fd,
- ((event & EV_READ) ? CURL_CSELECT_IN : 0) |
- ((event & EV_WRITE) ? CURL_CSELECT_OUT : 0),
- &running_handles));
- ProcessEvents(handle);
- }
- int SocketCallback(CURL*, curl_socket_t socket, int what, void* userp,
- void* socketp) {
- auto* context = reinterpret_cast<MultiContext*>(userp);
- if (what == CURL_POLL_REMOVE) {
- auto* data = reinterpret_cast<event*>(socketp);
- if (data) {
- event_free(data);
- }
- } else {
- auto* data = reinterpret_cast<event*>(socketp);
- if (data) {
- event_free(data);
- }
- data = CheckNotNull(
- event_new(context->event_loop, socket,
- static_cast<short>(((what & CURL_POLL_IN) ? EV_READ : 0) |
- ((what & CURL_POLL_OUT) ? EV_WRITE : 0) |
- EV_PERSIST),
- SocketEvent, context->handle.get()));
- Check(curl_multi_assign(context->handle.get(), socket, data));
- Check(event_add(data, /*timeout=*/nullptr));
- }
- return 0;
- }
- int TimerCallback(CURLM*, long timeout_ms, void* userp) {
- auto* http = reinterpret_cast<MultiContext*>(userp);
- if (timeout_ms == -1) {
- Check(event_del(http->timeout_event.get()));
- } else {
- timeval tv = {
- .tv_sec = static_cast<decltype(tv.tv_sec)>(timeout_ms / 1000),
- .tv_usec = static_cast<decltype(tv.tv_usec)>(timeout_ms % 1000 * 1000)};
- Check(event_add(http->timeout_event.get(), &tv));
- }
- return 0;
- }
- } // namespace
- int main() {
- CurlGlobalInitializer initializer;
- std::cerr << curl_version() << '\n';
- std::unique_ptr<event_base, EventBaseDeleter> event_loop(
- CheckNotNull(event_base_new()));
- MultiContext context{.event_loop = event_loop.get()};
- Check(curl_multi_setopt(context.handle.get(), CURLMOPT_SOCKETFUNCTION,
- SocketCallback));
- Check(curl_multi_setopt(context.handle.get(), CURLMOPT_SOCKETDATA, &context));
- Check(curl_multi_setopt(context.handle.get(), CURLMOPT_TIMERFUNCTION,
- TimerCallback));
- Check(curl_multi_setopt(context.handle.get(), CURLMOPT_TIMERDATA, &context));
- std::list<HandleContext> handle_contexts{kRequestCount};
- for (auto& handle_context : handle_contexts) {
- handle_context.context = &context;
- handle_context.event_loop = event_loop.get();
- CURL* handle =
- (handle_context.handle = MakeHandle(context.handle.get())).get();
- Check(curl_easy_setopt(handle, CURLOPT_URL, kUrl));
- Check(curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, WriteCallback));
- Check(curl_easy_setopt(handle, CURLOPT_WRITEDATA, &handle_context));
- Check(curl_easy_setopt(handle, CURLOPT_HTTP_VERSION, kHttpVersion));
- }
- struct Data {
- MultiContext* context;
- event_base* event_loop;
- std::list<HandleContext>* handle_contexts;
- } data = {.context = &context,
- .event_loop = event_loop.get(),
- .handle_contexts = &handle_contexts};
- {
- timeval tv{.tv_sec = 3};
- Check(event_base_once(
- event_loop.get(), /*fd=*/-1, /*events=*/EV_TIMEOUT,
- [](evutil_socket_t, short, void* userdata) {
- auto* context = static_cast<Data*>(userdata);
- for (int i = 0; i < kRequestCount; i++) {
- HandleContext& handle_context =
- context->handle_contexts->emplace_back();
- handle_context.context = context->context;
- handle_context.event_loop = context->event_loop;
- CURL* handle = (handle_context.handle =
- MakeHandle(context->context->handle.get()))
- .get();
- Check(curl_easy_setopt(handle, CURLOPT_URL, kUrl));
- Check(
- curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, WriteCallback));
- Check(curl_easy_setopt(handle, CURLOPT_WRITEDATA, &handle_context));
- Check(curl_easy_setopt(handle, CURLOPT_HTTP_VERSION, kHttpVersion));
- timeval tv{.tv_sec = 4};
- Check(event_base_once(
- context->event_loop, /*fd=*/-1, /*events=*/EV_TIMEOUT,
- [](evutil_socket_t, short, void* userdata) {
- auto* context = static_cast<HandleContext*>(userdata);
- std::cerr << "TIMEOUT DELAYED TRANSFER " << context << '\n';
- Check(curl_multi_remove_handle(context->context->handle.get(),
- context->handle.get()));
- },
- &handle_context, &tv));
- }
- },
- &data, &tv));
- }
- {
- timeval tv{.tv_sec = 15};
- Check(event_base_once(
- event_loop.get(), /*fd=*/-1, /*events=*/EV_TIMEOUT,
- [](evutil_socket_t, short, void* userdata) {
- auto* context = static_cast<Data*>(userdata);
- for (int i = 0; i < kRequestCount; i++) {
- HandleContext& handle_context =
- context->handle_contexts->emplace_back();
- handle_context.context = context->context;
- handle_context.event_loop = context->event_loop;
- CURL* handle = (handle_context.handle =
- MakeHandle(context->context->handle.get()))
- .get();
- Check(curl_easy_setopt(handle, CURLOPT_URL, kUrl));
- Check(
- curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, WriteCallback));
- Check(curl_easy_setopt(handle, CURLOPT_WRITEDATA, &handle_context));
- Check(curl_easy_setopt(handle, CURLOPT_HTTP_VERSION, kHttpVersion));
- timeval tv{.tv_sec = 4};
- Check(event_base_once(
- context->event_loop, /*fd=*/-1, /*events=*/EV_TIMEOUT,
- [](evutil_socket_t, short, void* userdata) {
- auto* context = static_cast<HandleContext*>(userdata);
- Check(curl_easy_pause(context->handle.get(), CURLPAUSE_CONT));
- std::cerr << "RESUMING TRANSFER " << context->handle.get()
- << '\n';
- },
- &handle_context, &tv));
- }
- },
- &data, &tv));
- }
- for (auto& handle_context : handle_contexts) {
- timeval tv{.tv_sec = 2};
- Check(event_base_once(
- event_loop.get(), /*fd=*/-1, /*events=*/EV_TIMEOUT,
- [](evutil_socket_t, short, void* userdata) {
- auto* context = static_cast<HandleContext*>(userdata);
- Check(curl_easy_pause(context->handle.get(), CURLPAUSE_CONT));
- std::cerr << "RESUMING TRANSFER " << context->handle.get() << '\n';
- },
- &handle_context, &tv));
- }
- Check(event_base_loop(event_loop.get(), /*flags=*/0));
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement