daily pastebin goal
38%
SHARE
TWEET

AsyncSQL.cpp

CFlyPGunP Feb 10th, 2019 (edited) 187 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. #ifndef __WIN32__
  2. #include <sys/time.h>
  3. #endif
  4.  
  5. #include <cstdlib>
  6. #include <cstring>
  7. #include <chrono>
  8. #include "AsyncSQL.h"
  9.  
  10. #ifndef __WIN32__
  11. void * AsyncSQLThread(CAsyncSQL* sql)
  12. #else
  13. unsigned int __stdcall AsyncSQLThread(CAsyncSQL* sql)
  14. #endif
  15. {
  16.  
  17.     if (!sql->Connect())
  18.         return NULL;
  19.  
  20.     sql->ChildLoop();
  21.     return NULL;
  22. }
  23.  
  24.  
  25. CAsyncSQL::CAsyncSQL()
  26.     : m_stHost(""), m_stUser(""), m_stPassword(""), m_stDB(""), m_stLocale(""),
  27.     m_iMsgCount(0), m_iPort(0), m_bEnd(false),
  28.     m_mtxQuery(), m_mtxResult(),
  29.     m_iQueryFinished(0), m_ulThreadID(0), m_bConnected(false),
  30.     m_iCopiedQuery(0)
  31. {
  32.     memset( &m_hDB, 0, sizeof(m_hDB) );
  33.  
  34.     m_aiPipe[0] = 0;
  35.     m_aiPipe[1] = 0;
  36. }
  37.  
  38. CAsyncSQL::~CAsyncSQL()
  39. {
  40.     Quit();
  41.     Destroy();
  42. }
  43.  
  44. void CAsyncSQL::Destroy()
  45. {
  46.     if (m_hDB.host)
  47.     {
  48.         sys_log(0, "AsyncSQL: closing mysql connection.");
  49.         mysql_close(&m_hDB);
  50.         m_hDB.host = NULL;
  51.     }
  52.     if(m_pThread)
  53.         delete m_pThread;
  54. }
  55.  
  56.  
  57. bool CAsyncSQL::QueryLocaleSet()
  58. {
  59.     if (0 == m_stLocale.length())
  60.     {
  61.         sys_err("m_stLocale == 0");
  62.         return true;
  63.     }
  64.  
  65.     else if (m_stLocale == "ascii")
  66.     {
  67.         sys_err("m_stLocale == ascii");
  68.         return true;
  69.     }
  70.  
  71.     if (mysql_set_character_set(&m_hDB, m_stLocale.c_str()))
  72.     {
  73.         sys_err("cannot set locale %s by 'mysql_set_character_set', errno %u %s", m_stLocale.c_str(), mysql_errno(&m_hDB) , mysql_error(&m_hDB));
  74.         return false;
  75.     }
  76.  
  77.     sys_log(0, "\t--mysql_set_character_set(%s)", m_stLocale.c_str());
  78.  
  79.     return true;
  80. }
  81.  
  82. bool CAsyncSQL::Connect()
  83. {
  84.     if (0 == mysql_init(&m_hDB))
  85.     {
  86.         fprintf(stderr, "mysql_init failed on [%s]\n", m_stDB.c_str());
  87.         return false;
  88.     }
  89.  
  90.     if (!m_stLocale.empty())
  91.     {
  92.         if (mysql_options(&m_hDB, MYSQL_SET_CHARSET_NAME, m_stLocale.c_str()) != 0)
  93.         {
  94.             fprintf(stderr, "mysql_option failed : MYSQL_SET_CHARSET_NAME %s ", mysql_error(&m_hDB));
  95.         }
  96.     }
  97.  
  98.     if (!mysql_real_connect(&m_hDB, m_stHost.c_str(), m_stUser.c_str(), m_stPassword.c_str(), m_stDB.c_str(), m_iPort, NULL, CLIENT_MULTI_STATEMENTS))
  99.     {
  100.         fprintf(stderr, "mysql_real_connect: %s\n", mysql_error(&m_hDB));
  101.         return false;
  102.     }
  103.  
  104.     bool reconnect = true;
  105.  
  106.     if (0 != mysql_options(&m_hDB, MYSQL_OPT_RECONNECT, &reconnect))
  107.         fprintf(stderr, "mysql_option: %s\n", mysql_error(&m_hDB));
  108.  
  109.     fprintf(stdout, "AsyncSQL: connected to %s (reconnect %d)\n", m_stHost.c_str(), m_hDB.reconnect);
  110.  
  111.     // The db cache finds the locale in the LOCALE table of the common db, and then modifies the character set.
  112.     // Therefore, when making the initial connection,
  113.     // it is forced to set the character set to "euckr" even though the character set can not be determined because the locale is not known.
  114.     // (If you uncomment the following, you will not be able to access the database where mysql does not have euckr installed.)
  115.     //while (!QueryLocaleSet());
  116.     m_ulThreadID = mysql_thread_id(&m_hDB);
  117.  
  118.     m_bConnected = true;
  119.     return true;
  120. }
  121.  
  122. bool CAsyncSQL::Setup(CAsyncSQL * sql, bool bNoThread)
  123. {
  124.     return Setup(sql->m_stHost.c_str(),
  125.             sql->m_stUser.c_str(),
  126.             sql->m_stPassword.c_str(),
  127.             sql->m_stDB.c_str(),
  128.             sql->m_stLocale.c_str(),
  129.             bNoThread,
  130.             sql->m_iPort);
  131. }
  132.  
  133. bool CAsyncSQL::Setup(const char * c_pszHost, const char * c_pszUser,
  134.                       const char * c_pszPassword, const char * c_pszDB,
  135.                       const char * c_pszLocale, bool bNoThread, int iPort)
  136. {
  137.     m_stHost = c_pszHost;
  138.     m_stUser = c_pszUser;
  139.     m_stPassword = c_pszPassword;
  140.     m_stDB = c_pszDB;
  141.     m_iPort = iPort;
  142.  
  143.     if (c_pszLocale)
  144.     {
  145.         m_stLocale = c_pszLocale;
  146.         sys_log(0, "AsyncSQL: locale %s", m_stLocale.c_str());
  147.     }
  148.  
  149.     if (!bNoThread)
  150.     {
  151.  
  152.         if (!mysql_thread_safe())
  153.         {
  154.             fprintf(stderr, "FATAL ERROR!! mysql client library was not compiled with thread safety\n");
  155.             return false;
  156.         }
  157.  
  158.         m_pThread = new std::thread(AsyncSQLThread, this);
  159.         return true;
  160.     }
  161.     else
  162.         return Connect();
  163. }
  164.  
  165. void CAsyncSQL::Quit()
  166. {
  167.     m_bEnd = true;
  168.     m_sem.Release();
  169.  
  170.     if(m_pThread)
  171.         m_pThread->join();
  172. }
  173.  
  174. SQLMsg * CAsyncSQL::DirectQuery(const char * c_pszQuery)
  175. {
  176.     if (m_ulThreadID != mysql_thread_id(&m_hDB))
  177.     {
  178.         sys_err("MySQL connection was reconnected. querying locale set");
  179.         while (!QueryLocaleSet());
  180.         m_ulThreadID = mysql_thread_id(&m_hDB);
  181.     }
  182.  
  183.     SQLMsg * p = new SQLMsg;
  184.  
  185.     p->m_pkSQL = &m_hDB;
  186.     p->iID = ++m_iMsgCount;
  187.     p->stQuery = c_pszQuery;
  188.  
  189.     if (mysql_real_query(&m_hDB, p->stQuery.c_str(), p->stQuery.length()))
  190.     {
  191.         char buf[1024];
  192.  
  193.         snprintf(buf, sizeof(buf),
  194.                 "AsyncSQL::DirectQuery : mysql_query error: %s\nquery: %s",
  195.                 mysql_error(&m_hDB), p->stQuery.c_str());
  196.  
  197.         sys_err(buf);
  198.         p->uiSQLErrno = mysql_errno(&m_hDB);
  199.     }
  200.  
  201.     p->Store();
  202.     return p;
  203. }
  204.  
  205. void CAsyncSQL::AsyncQuery(const char * c_pszQuery)
  206. {
  207.     SQLMsg * p = new SQLMsg;
  208.  
  209.     p->m_pkSQL = &m_hDB;
  210.     p->iID = ++m_iMsgCount;
  211.     p->stQuery = c_pszQuery;
  212.  
  213.     PushQuery(p);
  214. }
  215.  
  216. void CAsyncSQL::ReturnQuery(const char * c_pszQuery, void*  pvUserData)
  217. {
  218.     SQLMsg * p = new SQLMsg;
  219.  
  220.     p->m_pkSQL = &m_hDB;
  221.     p->iID = ++m_iMsgCount;
  222.     p->stQuery = c_pszQuery;
  223.     p->bReturn = true;
  224.     p->pvUserData = pvUserData;
  225.  
  226.     PushQuery(p);
  227. }
  228.  
  229. void CAsyncSQL::PushResult(SQLMsg * p)
  230. {
  231.     std::lock_guard<std::mutex> g(m_mtxResult);
  232.     m_queue_result.push(p);
  233. }
  234.  
  235. bool CAsyncSQL::PopResult(SQLMsg ** pp)
  236. {
  237.     std::lock_guard<std::mutex> g(m_mtxResult);
  238.     if (m_queue_result.empty())
  239.     {
  240.         return false;
  241.     }
  242.  
  243.     *pp = m_queue_result.front();
  244.     m_queue_result.pop();
  245.     return true;
  246. }
  247.  
  248. void CAsyncSQL::PushQuery(SQLMsg * p)
  249. {
  250.     std::lock_guard<std::mutex> g(m_mtxQuery);
  251.     m_queue_query.push(p);
  252.     m_sem.Release();
  253. }
  254.  
  255. bool CAsyncSQL::PeekQuery(SQLMsg ** pp)
  256. {
  257.     std::lock_guard<std::mutex> g(m_mtxQuery);
  258.  
  259.     if (m_queue_query.empty())
  260.     {
  261.         return false;
  262.     }
  263.  
  264.     *pp = m_queue_query.front();
  265.     return true;
  266. }
  267.  
  268. void CAsyncSQL::PopQuery(int iID)
  269. {
  270.     std::lock_guard<std::mutex> g(m_mtxQuery);
  271.     m_queue_query.pop();
  272. }
  273.  
  274. bool CAsyncSQL::PeekQueryFromCopyQueue(SQLMsg ** pp)
  275. {
  276.     if (m_queue_query_copy.empty())
  277.         return false;
  278.  
  279.     *pp = m_queue_query_copy.front();
  280.     return true;
  281. }
  282.  
  283. int CAsyncSQL::CopyQuery()
  284. {
  285.     std::lock_guard<std::mutex> g(m_mtxQuery);
  286.     if (m_queue_query.empty())
  287.         return -1;
  288.  
  289.     while (!m_queue_query.empty())
  290.     {
  291.         m_queue_query_copy.push(m_queue_query.front());
  292.         m_queue_query.pop();
  293.     }
  294.     return m_queue_query_copy.size();
  295. }
  296.  
  297. void CAsyncSQL::PopQueryFromCopyQueue()
  298. {
  299.     m_queue_query_copy.pop();
  300. }
  301. int CAsyncSQL::GetCopiedQueryCount()
  302. {
  303.     return m_iCopiedQuery;
  304. }
  305. void CAsyncSQL::ResetCopiedQueryCount()
  306. {
  307.     m_iCopiedQuery = 0;
  308. }
  309.  
  310. void CAsyncSQL::AddCopiedQueryCount(int iCopiedQuery)
  311. {
  312.     m_iCopiedQuery += iCopiedQuery;
  313. }
  314.  
  315. DWORD CAsyncSQL::CountQuery()
  316. {
  317.     return m_queue_query.size();
  318. }
  319.  
  320. DWORD CAsyncSQL::CountResult()
  321. {
  322.     return m_queue_result.size();
  323. }
  324.  
  325. class cProfiler
  326. {
  327. private:
  328.     std::chrono::time_point<std::chrono::steady_clock> m_start;
  329.     std::chrono::duration<float>& m_duration;
  330. public:
  331.     cProfiler(std::chrono::duration<float>& duration):
  332.         m_duration(duration)
  333.     {
  334.         m_start = std::chrono::steady_clock::now();
  335.     }
  336.  
  337.     ~cProfiler()
  338.     {
  339.         m_duration = (std::chrono::steady_clock::now() - m_start);
  340.     }
  341. };
  342.  
  343. void CAsyncSQL::ChildLoop()
  344. {
  345.     while (!m_bEnd)
  346.     {
  347.         m_sem.Wait();
  348.  
  349.         int count = CopyQuery();
  350.  
  351.         if (count <= 0)
  352.             continue;
  353.  
  354.         AddCopiedQueryCount(count);
  355.  
  356.         SQLMsg * p;
  357.         std::chrono::duration<float> timer;
  358.         while (count--)
  359.         {
  360.             {// Start time check
  361.                
  362.                 cProfiler profiler(timer);
  363.  
  364.                 if (!PeekQueryFromCopyQueue(&p))
  365.                     continue;
  366.  
  367.                 if (m_ulThreadID != mysql_thread_id(&m_hDB))
  368.                 {
  369.                     sys_err("MySQL connection was reconnected. querying locale set");
  370.                     while (!QueryLocaleSet());
  371.                     m_ulThreadID = mysql_thread_id(&m_hDB);
  372.                 }
  373.  
  374.                 if (mysql_real_query(&m_hDB, p->stQuery.c_str(), p->stQuery.length()))
  375.                 {
  376.                     p->uiSQLErrno = mysql_errno(&m_hDB);
  377.  
  378.                     sys_err("AsyncSQL: query failed: %s (query: %s errno: %d)",
  379.                             mysql_error(&m_hDB), p->stQuery.c_str(), p->uiSQLErrno);
  380.  
  381.                     switch (p->uiSQLErrno)
  382.                     {
  383.                         case CR_SOCKET_CREATE_ERROR:
  384.                         case CR_CONNECTION_ERROR:
  385.                         case CR_IPSOCK_ERROR:
  386.                         case CR_UNKNOWN_HOST:
  387.                         case CR_SERVER_GONE_ERROR:
  388.                         case CR_CONN_HOST_ERROR:
  389.                         case ER_NOT_KEYFILE:
  390.                         case ER_CRASHED_ON_USAGE:
  391.                         case ER_CANT_OPEN_FILE:
  392.                         case ER_HOST_NOT_PRIVILEGED:
  393.                         case ER_HOST_IS_BLOCKED:
  394.                         case ER_PASSWORD_NOT_ALLOWED:
  395.                         case ER_PASSWORD_NO_MATCH:
  396.                         case ER_CANT_CREATE_THREAD:
  397.                         case ER_INVALID_USE_OF_NULL:
  398.                             m_sem.Release();
  399.                             sys_err("AsyncSQL: retrying");
  400.                             continue;
  401.                     }
  402.                 }
  403.             }
  404.             // Output to Log if it takes more than 0.5 seconds
  405.             if (timer.count() > 0.5)
  406.                 sys_log(0, "[QUERY : LONG INTERVAL(OverSec %f)] : %s", static_cast<float>(timer.count()), p->stQuery.c_str());
  407.  
  408.             PopQueryFromCopyQueue();
  409.  
  410.             if (p->bReturn)
  411.             {
  412.                 p->Store();
  413.                 PushResult(p);
  414.             }
  415.             else
  416.                 delete p;
  417.  
  418.             ++m_iQueryFinished;
  419.         }
  420.     }
  421.  
  422.     SQLMsg * p;
  423.  
  424.     while (PeekQuery(&p))
  425.     {
  426.         if (m_ulThreadID != mysql_thread_id(&m_hDB))
  427.         {
  428.             sys_err("MySQL connection was reconnected. querying locale set");
  429.             while (!QueryLocaleSet());
  430.             m_ulThreadID = mysql_thread_id(&m_hDB);
  431.         }
  432.  
  433.         if (mysql_real_query(&m_hDB, p->stQuery.c_str(), p->stQuery.length()))
  434.         {
  435.             p->uiSQLErrno = mysql_errno(&m_hDB);
  436.  
  437.             sys_err("AsyncSQL::ChildLoop : mysql_query error: %s:\nquery: %s",
  438.                     mysql_error(&m_hDB), p->stQuery.c_str());
  439.  
  440.             switch (p->uiSQLErrno)
  441.             {
  442.                 case CR_SOCKET_CREATE_ERROR:
  443.                 case CR_CONNECTION_ERROR:
  444.                 case CR_IPSOCK_ERROR:
  445.                 case CR_UNKNOWN_HOST:
  446.                 case CR_SERVER_GONE_ERROR:
  447.                 case CR_CONN_HOST_ERROR:
  448.                 case ER_NOT_KEYFILE:
  449.                 case ER_CRASHED_ON_USAGE:
  450.                 case ER_CANT_OPEN_FILE:
  451.                 case ER_HOST_NOT_PRIVILEGED:
  452.                 case ER_HOST_IS_BLOCKED:
  453.                 case ER_PASSWORD_NOT_ALLOWED:
  454.                 case ER_PASSWORD_NO_MATCH:
  455.                 case ER_CANT_CREATE_THREAD:
  456.                 case ER_INVALID_USE_OF_NULL:
  457.                     continue;
  458.             }
  459.         }
  460.  
  461.         sys_log(0, "QUERY_FLUSH: %s", p->stQuery.c_str());
  462.  
  463.         PopQuery(p->iID);
  464.  
  465.         if (p->bReturn)
  466.         {
  467.             p->Store();
  468.             PushResult(p);
  469.         }
  470.         else
  471.             delete p;
  472.  
  473.         ++m_iQueryFinished;
  474.     }
  475. }
  476.  
  477. int CAsyncSQL::CountQueryFinished()
  478. {
  479.     return m_iQueryFinished;
  480. }
  481.  
  482. void CAsyncSQL::ResetQueryFinished()
  483. {
  484.     m_iQueryFinished = 0;
  485. }
  486.  
  487. MYSQL * CAsyncSQL::GetSQLHandle()
  488. {
  489.     return &m_hDB;
  490. }
  491.  
  492. size_t CAsyncSQL::EscapeString(char* dst, size_t dstSize, const char *src, size_t srcSize)
  493. {
  494.     if (0 == srcSize)
  495.     {
  496.         memset(dst, 0, dstSize);
  497.         return 0;
  498.     }
  499.  
  500.     if (0 == dstSize)
  501.         return 0;
  502.  
  503.     if (dstSize < srcSize * 2 + 1)
  504.     {
  505.         // Copy only 256 bytes to log when \0 is not attached
  506.         char tmp[256];
  507.         size_t tmpLen = sizeof(tmp) > srcSize ? srcSize : sizeof(tmp);
  508.         strlcpy(tmp, src, tmpLen);
  509.  
  510.         sys_err("FATAL ERROR!! not enough buffer size (dstSize %u srcSize %u src%s: %s)",
  511.                 dstSize, srcSize, tmpLen != srcSize ? "(trimmed to 255 characters)" : "", tmp);
  512.  
  513.         dst[0] = '\0';
  514.         return 0;
  515.     }
  516.  
  517.     return mysql_real_escape_string(GetSQLHandle(), dst, src, srcSize);
  518. }
  519.  
  520. void CAsyncSQL2::SetLocale(const std::string & stLocale)
  521. {
  522.     m_stLocale = stLocale;
  523.     QueryLocaleSet();
  524. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top