Advertisement
Guest User

Untitled

a guest
Jun 20th, 2017
87
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 9.12 KB | None | 0 0
  1. // database.h
  2. #pragma once
  3.  
  4. #ifndef DATABASE_H
  5. #define DATABASE_H
  6.  
  7. #include <boost/asio.hpp>
  8. #include <boost/unordered_map.hpp>
  9. #include <boost/shared_ptr.hpp>
  10. #include <boost/scoped_ptr.hpp>
  11. #include <boost/function.hpp>
  12. #include <boost/thread/mutex.hpp>
  13. #include <boost/thread/locks.hpp>
  14. #include <libpq-fe.h>
  15. #include <string>
  16. #include <queue>
  17. #include "query.h"
  18. #include "result.h"
  19. #include "notification_handler.h"
  20.  
  21.  
  22. void null_handler(result&);
  23.  
  24. class database
  25. {
  26. public:
  27.     database(boost::asio::io_service& io_service, std::string host, unsigned short port, std::string name, std::string user, std::string password);
  28.     ~database();
  29.    
  30.     result exec(const query& query);
  31.  
  32.     typedef boost::function<void(result&)> result_handler;
  33.     static boost::arg<1> result_placeholder;
  34.  
  35.     void async_exec(const query& query, result_handler handler = null_handler);
  36.  
  37.     bool connected() const;
  38.     std::string error_message() const;
  39.  
  40.     void add_notification_handler(const std::string& channel, boost::shared_ptr<notification_handler> handler);
  41.  
  42. private:
  43.     database(const database& other);
  44.     database& operator=(const database& other);
  45.  
  46.     std::string escape_connection_param(const std::string& escape_string);
  47.     std::string escape_string(const std::string& escape_string);
  48.  
  49.     void on_data_available(const boost::system::error_code& error);
  50.     void start_waiting_for_data();
  51.     void handle_notifications();
  52.     void handle_results();
  53.     void send_next_command();
  54.  
  55.     PGconn* connection_;
  56.     boost::asio::ip::tcp::socket socket_;
  57.  
  58.     boost::unordered_map<std::string, boost::shared_ptr<notification_handler> > notification_handlers_;
  59.  
  60.     boost::mutex exec_mutex_;
  61.     result_handler result_handler_;
  62.     std::queue<std::pair<query, result_handler> > exec_queue_;
  63. };
  64.  
  65. #endif
  66.  
  67. // database.cpp
  68. #include "database.h"
  69. #include <sstream>
  70. #include <algorithm>
  71. #include <iostream>
  72. #include <boost/bind.hpp>
  73. #include <boost/format.hpp>
  74. #include <cassert>
  75. #include "log.h"
  76.  
  77. boost::arg<1> database::result_placeholder;
  78.  
  79. void null_handler(result&)
  80. {
  81. }
  82.  
  83. void log_error_result(const PGresult* result)
  84. {
  85.     log::error("Database Error: %1%", PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY));
  86.     const char* detail = PQresultErrorField(result, PG_DIAG_MESSAGE_DETAIL);
  87.     if (detail)
  88.     {
  89.         log::error("%1%", detail);
  90.     }
  91. }
  92.  
  93. void notice_receiver(void* arg, const PGresult* result)
  94. {
  95.     log_error_result(result);
  96. }
  97.  
  98. database::database(boost::asio::io_service& io_service, std::string host, unsigned short port, std::string name, std::string user, std::string password)
  99.     : socket_(io_service), connection_(0)
  100. {
  101.     // Assuming TCP is used to connect to Postgres, this code would have to be changed for Unix socket descriptors
  102.  
  103.     std::stringstream ss;
  104.     ss << "host='" << escape_connection_param(host) << "' port=" << port << " dbname='" << escape_connection_param(name) << "' user='" << escape_connection_param(user) << "' password='" << escape_connection_param(password) << "'";
  105.     connection_ = PQconnectdb(ss.str().c_str());
  106.  
  107.     if (PQstatus(connection_) == CONNECTION_OK)
  108.     {
  109.         PQsetNoticeReceiver(connection_, notice_receiver, 0);
  110.  
  111.         if (PQsetnonblocking(connection_, 1) != 0)
  112.         {
  113.             throw std::runtime_error("PQsetnonblocking failed");
  114.         }
  115.  
  116.         socket_.assign(boost::asio::ip::tcp::v4(), PQsocket(connection_));
  117.  
  118.         start_waiting_for_data();
  119.  
  120.         log::success("Connected to database '%1%' as user '%2%' on %3%:%4%", name, user, host, port);
  121.         result res = exec(query::create("SELECT version()"));
  122.         log::info("Database Version: %1%", res.get(0, 0));
  123.     }
  124.     else
  125.     {
  126.         throw std::runtime_error(boost::str(boost::format("Database connection failed! %1%") % error_message()));
  127.     }
  128. }
  129.  
  130. database::~database()
  131. {
  132.     if (connection_)
  133.     {
  134.         PQfinish(connection_);
  135.     }
  136. }
  137.  
  138. bool database::connected() const
  139. {
  140.     return socket_.is_open();
  141. }
  142.  
  143. std::string database::error_message() const
  144. {
  145.     return PQerrorMessage(connection_);
  146. }
  147.  
  148. std::string database::escape_connection_param(const std::string& escape_string)
  149. {
  150.     std::string new_string;
  151.     size_t needed = escape_string.size() + std::count(escape_string.begin(), escape_string.end(), '\'') + std::count(escape_string.begin(), escape_string.end(), '\\');
  152.     new_string.reserve(needed);
  153.     for (std::string::const_iterator iter = escape_string.begin(); iter != escape_string.end(); ++iter)
  154.     {
  155.         switch(*iter)
  156.         {
  157.         case '\'':
  158.             new_string.append("\\'");
  159.             break;
  160.         case '\\':
  161.             new_string.append("\\\\");
  162.             break;
  163.         default:
  164.             new_string.append(1, *iter);
  165.         }
  166.     }
  167.     return new_string;
  168. }
  169.  
  170. void database::start_waiting_for_data()
  171. {
  172.     socket_.async_receive(boost::asio::null_buffers(), boost::bind(&database::on_data_available, this, boost::asio::placeholders::error));
  173. }
  174.  
  175. void database::on_data_available(const boost::system::error_code& error)
  176. {
  177.     if (!error)
  178.     {
  179.         if (PQconsumeInput(connection_) == 1)
  180.         {
  181.             if (PQisBusy(connection_) == 0)
  182.             {
  183.                 handle_notifications();
  184.  
  185.                 handle_results();
  186.             }
  187.  
  188.             start_waiting_for_data();
  189.         }
  190.         else
  191.         {
  192.             log::error("Database Error: %1%", error_message());
  193.         }
  194.     }
  195. }
  196.  
  197. void database::async_exec(const query& query, result_handler handler)
  198. {
  199.     if (query.valid())
  200.     {
  201.         boost::lock_guard<boost::mutex> lock(exec_mutex_);
  202.  
  203.         if (!result_handler_)
  204.         {
  205.             if (1 == PQsendQueryParams(connection_, query.command(), query.num_params(), query.param_types(), query.param_values(), query.param_lengths(), query.param_formats(), query.binary_results() ? 1 : 0))
  206.             {
  207.                 result_handler_ = handler;
  208.             }
  209.             else
  210.             {
  211.                 log::error("Database Error: %1%", error_message());
  212.             }
  213.         }
  214.         else
  215.         {
  216.             exec_queue_.push(std::make_pair(query, handler));
  217.         }
  218.     }
  219.     else
  220.     {
  221.         throw std::runtime_error("Invalid query object");
  222.     }
  223. }
  224.  
  225. result database::exec(const query& query)
  226. {
  227.     if (query.valid())
  228.     {
  229.         {
  230.             // Make sure asynchronous executions will get queued during the synchronous execution
  231.             boost::lock_guard<boost::mutex> lock(exec_mutex_);
  232.             result_handler_ = null_handler;
  233.         }
  234.  
  235.         PGresult *result = PQexecParams(connection_, query.command(), query.num_params(), query.param_types(), query.param_values(), query.param_lengths(), query.param_formats(), query.binary_results() ? 1 : 0);
  236.  
  237.         if (!result)
  238.         {
  239.             throw std::runtime_error(boost::str(boost::format("Fatal database error! %1%") % error_message()));
  240.         }
  241.  
  242.         ExecStatusType status = PQresultStatus(result);
  243.         if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK)
  244.         {
  245.             log_error_result(result);
  246.         }
  247.  
  248.         handle_notifications();
  249.        
  250.         boost::lock_guard<boost::mutex> lock(exec_mutex_);
  251.         result_handler_.clear();
  252.         send_next_command();
  253.  
  254.         return result;
  255.     }
  256.     else
  257.     {
  258.         throw std::runtime_error("Invalid query object");
  259.     }
  260. }
  261.  
  262. void database::handle_notifications()
  263. {
  264.     while (PGnotify *notify = PQnotifies(connection_))
  265.     {
  266.         boost::unordered_map<std::string, boost::shared_ptr<notification_handler> >::iterator handler = notification_handlers_.find(notify->relname);
  267.         if (handler != notification_handlers_.end())
  268.         {
  269.             (*handler->second)(notify->extra);
  270.         }
  271.         else
  272.         {
  273.             log::warning("Unhandled database notification: Channel = %1%, Message = %2%", notify->relname, notify->extra);
  274.         }
  275.  
  276.         PQfreemem(notify);
  277.     }
  278. }
  279.  
  280. void database::handle_results()
  281. {
  282.     if (PGresult *result = PQgetResult(connection_))
  283.     {
  284.         std::vector<::result> results;
  285.  
  286.         do
  287.         {
  288.             ExecStatusType status = PQresultStatus(result);
  289.             if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK)
  290.             {
  291.                 log_error_result(result);
  292.             }
  293.             else
  294.             {
  295.                 results.push_back(result);
  296.             }
  297.         } while (result = PQgetResult(connection_));
  298.  
  299.         result_handler handler;
  300.  
  301.         {
  302.             boost::lock_guard<boost::mutex> lock(exec_mutex_);
  303.  
  304.             assert (result_handler_ && "result_handler_ must be bound when a query result is ready");
  305.  
  306.             std::swap(handler, result_handler_);
  307.  
  308.             send_next_command();
  309.         }
  310.  
  311.         for (std::vector<::result>::iterator iter = results.begin(); iter != results.end(); ++iter)
  312.         {
  313.             handler(*iter);
  314.         }
  315.     }
  316. }
  317.  
  318. void database::send_next_command()
  319. {
  320.     if (!exec_queue_.empty())
  321.     {
  322.         std::pair<query, result_handler>& queued_command = exec_queue_.front();
  323.         query& query = queued_command.first;
  324.         result_handler& handler = queued_command.second;
  325.  
  326.         if (1 == PQsendQueryParams(connection_, query.command(), query.num_params(), query.param_types(), query.param_values(), query.param_lengths(), query.param_formats(), query.binary_results() ? 1 : 0))
  327.         {
  328.             result_handler_ = handler;
  329.         }
  330.         else
  331.         {
  332.             log::error("Database Error: %1%", error_message());
  333.         }
  334.  
  335.         exec_queue_.pop();
  336.     }
  337. }
  338.  
  339. std::string database::escape_string(const std::string& escape_string)
  340. {
  341.     int error = 0;
  342.     boost::scoped_ptr<char> buf(new char[escape_string.size() * 2 + 1]);
  343.     size_t new_size = PQescapeStringConn(connection_, buf.get(), escape_string.c_str(), escape_string.size(), &error);
  344.  
  345.     if (error)
  346.     {
  347.         log::error("Database Error: %1%", error_message());
  348.     }
  349.  
  350.     return std::string(buf.get(), buf.get() + new_size);
  351. }
  352.  
  353. void database::add_notification_handler(const std::string& channel, boost::shared_ptr<notification_handler> handler)
  354. {
  355.     notification_handlers_[channel] = handler;
  356.     exec(query::create("LISTEN " + escape_string(channel)));
  357. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement