Advertisement
CFlyPGunP

AsyncSQL.cpp

Feb 10th, 2019
623
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 11.02 KB | None | 0 0
  1. #ifndef __WIN32__
  2. #include <sys/time.h>
  3. #endif
  4.  
  5. #include <cstdlib>
  6. #include <cstring>
  7. #include <chrono>
  8. #include "AsyncSQL.h"
  9.  
  10. #ifndef __WIN32__
  11. void * AsyncSQLThread(CAsyncSQL* sql)
  12. #else
  13. unsigned int __stdcall AsyncSQLThread(CAsyncSQL* sql)
  14. #endif
  15. {
  16.  
  17.     if (!sql->Connect())
  18.         return NULL;
  19.  
  20.     sql->ChildLoop();
  21.     return NULL;
  22. }
  23.  
  24.  
  25. CAsyncSQL::CAsyncSQL()
  26.     : m_stHost(""), m_stUser(""), m_stPassword(""), m_stDB(""), m_stLocale(""),
  27.     m_iMsgCount(0), m_iPort(0), m_bEnd(false),
  28.     m_mtxQuery(), m_mtxResult(),
  29.     m_iQueryFinished(0), m_ulThreadID(0), m_bConnected(false),
  30.     m_iCopiedQuery(0),
  31.     m_pThread(nullptr)
  32. {
  33.     memset( &m_hDB, 0, sizeof(m_hDB) );
  34.  
  35.     m_aiPipe[0] = 0;
  36.     m_aiPipe[1] = 0;
  37. }
  38.  
  39. CAsyncSQL::~CAsyncSQL()
  40. {
  41.     Quit();
  42.     Destroy();
  43. }
  44.  
  45. void CAsyncSQL::Destroy()
  46. {
  47.     if (m_hDB.host)
  48.     {
  49.         sys_log(0, "AsyncSQL: closing mysql connection.");
  50.         mysql_close(&m_hDB);
  51.         m_hDB.host = NULL;
  52.     }
  53.     if(m_pThread)
  54.         delete m_pThread;
  55. }
  56.  
  57.  
  58. bool CAsyncSQL::QueryLocaleSet()
  59. {
  60.     if (0 == m_stLocale.length())
  61.     {
  62.         sys_err("m_stLocale == 0");
  63.         return true;
  64.     }
  65.  
  66.     else if (m_stLocale == "ascii")
  67.     {
  68.         sys_err("m_stLocale == ascii");
  69.         return true;
  70.     }
  71.  
  72.     if (mysql_set_character_set(&m_hDB, m_stLocale.c_str()))
  73.     {
  74.         sys_err("cannot set locale %s by 'mysql_set_character_set', errno %u %s", m_stLocale.c_str(), mysql_errno(&m_hDB) , mysql_error(&m_hDB));
  75.         return false;
  76.     }
  77.  
  78.     sys_log(0, "\t--mysql_set_character_set(%s)", m_stLocale.c_str());
  79.  
  80.     return true;
  81. }
  82.  
  83. bool CAsyncSQL::Connect()
  84. {
  85.     if (0 == mysql_init(&m_hDB))
  86.     {
  87.         fprintf(stderr, "mysql_init failed on [%s]\n", m_stDB.c_str());
  88.         return false;
  89.     }
  90.  
  91.     if (!m_stLocale.empty())
  92.     {
  93.         if (mysql_options(&m_hDB, MYSQL_SET_CHARSET_NAME, m_stLocale.c_str()) != 0)
  94.         {
  95.             fprintf(stderr, "mysql_option failed : MYSQL_SET_CHARSET_NAME %s ", mysql_error(&m_hDB));
  96.         }
  97.     }
  98.  
  99.     if (!mysql_real_connect(&m_hDB, m_stHost.c_str(), m_stUser.c_str(), m_stPassword.c_str(), m_stDB.c_str(), m_iPort, NULL, CLIENT_MULTI_STATEMENTS))
  100.     {
  101.         fprintf(stderr, "mysql_real_connect: %s\n", mysql_error(&m_hDB));
  102.         return false;
  103.     }
  104.  
  105.     bool reconnect = true;
  106.  
  107.     if (0 != mysql_options(&m_hDB, MYSQL_OPT_RECONNECT, &reconnect))
  108.         fprintf(stderr, "mysql_option: %s\n", mysql_error(&m_hDB));
  109.  
  110.     fprintf(stdout, "AsyncSQL: connected to %s (reconnect %d)\n", m_stHost.c_str(), m_hDB.reconnect);
  111.  
  112.     // The db cache finds the locale in the LOCALE table of the common db, and then modifies the character set.
  113.     // Therefore, when making the initial connection,
  114.     // it is forced to set the character set to "euckr" even though the character set can not be determined because the locale is not known.
  115.     // (If you uncomment the following, you will not be able to access the database where mysql does not have euckr installed.)
  116.     //while (!QueryLocaleSet());
  117.     m_ulThreadID = mysql_thread_id(&m_hDB);
  118.  
  119.     m_bConnected = true;
  120.     return true;
  121. }
  122.  
  123. bool CAsyncSQL::Setup(CAsyncSQL * sql, bool bNoThread)
  124. {
  125.     return Setup(sql->m_stHost.c_str(),
  126.             sql->m_stUser.c_str(),
  127.             sql->m_stPassword.c_str(),
  128.             sql->m_stDB.c_str(),
  129.             sql->m_stLocale.c_str(),
  130.             bNoThread,
  131.             sql->m_iPort);
  132. }
  133.  
  134. bool CAsyncSQL::Setup(const char * c_pszHost, const char * c_pszUser,
  135.                       const char * c_pszPassword, const char * c_pszDB,
  136.                       const char * c_pszLocale, bool bNoThread, int iPort)
  137. {
  138.     m_stHost = c_pszHost;
  139.     m_stUser = c_pszUser;
  140.     m_stPassword = c_pszPassword;
  141.     m_stDB = c_pszDB;
  142.     m_iPort = iPort;
  143.  
  144.     if (c_pszLocale)
  145.     {
  146.         m_stLocale = c_pszLocale;
  147.         sys_log(0, "AsyncSQL: locale %s", m_stLocale.c_str());
  148.     }
  149.  
  150.     if (!bNoThread)
  151.     {
  152.  
  153.         if (!mysql_thread_safe())
  154.         {
  155.             fprintf(stderr, "FATAL ERROR!! mysql client library was not compiled with thread safety\n");
  156.             return false;
  157.         }
  158.  
  159.         m_pThread = new std::thread(AsyncSQLThread, this);
  160.         return true;
  161.     }
  162.     else
  163.         return Connect();
  164. }
  165.  
  166. void CAsyncSQL::Quit()
  167. {
  168.     m_bEnd = true;
  169.     m_sem.Release();
  170.  
  171.     if(m_pThread)
  172.         m_pThread->join();
  173. }
  174.  
  175. SQLMsg * CAsyncSQL::DirectQuery(const char * c_pszQuery)
  176. {
  177.     if (m_ulThreadID != mysql_thread_id(&m_hDB))
  178.     {
  179.         sys_err("MySQL connection was reconnected. querying locale set");
  180.         while (!QueryLocaleSet());
  181.         m_ulThreadID = mysql_thread_id(&m_hDB);
  182.     }
  183.  
  184.     SQLMsg * p = new SQLMsg;
  185.  
  186.     p->m_pkSQL = &m_hDB;
  187.     p->iID = ++m_iMsgCount;
  188.     p->stQuery = c_pszQuery;
  189.  
  190.     if (mysql_real_query(&m_hDB, p->stQuery.c_str(), p->stQuery.length()))
  191.     {
  192.         char buf[1024];
  193.  
  194.         snprintf(buf, sizeof(buf),
  195.                 "AsyncSQL::DirectQuery : mysql_query error: %s\nquery: %s",
  196.                 mysql_error(&m_hDB), p->stQuery.c_str());
  197.  
  198.         sys_err(buf);
  199.         p->uiSQLErrno = mysql_errno(&m_hDB);
  200.     }
  201.  
  202.     p->Store();
  203.     return p;
  204. }
  205.  
  206. void CAsyncSQL::AsyncQuery(const char * c_pszQuery)
  207. {
  208.     SQLMsg * p = new SQLMsg;
  209.  
  210.     p->m_pkSQL = &m_hDB;
  211.     p->iID = ++m_iMsgCount;
  212.     p->stQuery = c_pszQuery;
  213.  
  214.     PushQuery(p);
  215. }
  216.  
  217. void CAsyncSQL::ReturnQuery(const char * c_pszQuery, void*  pvUserData)
  218. {
  219.     SQLMsg * p = new SQLMsg;
  220.  
  221.     p->m_pkSQL = &m_hDB;
  222.     p->iID = ++m_iMsgCount;
  223.     p->stQuery = c_pszQuery;
  224.     p->bReturn = true;
  225.     p->pvUserData = pvUserData;
  226.  
  227.     PushQuery(p);
  228. }
  229.  
  230. void CAsyncSQL::PushResult(SQLMsg * p)
  231. {
  232.     std::lock_guard<std::mutex> g(m_mtxResult);
  233.     m_queue_result.push(p);
  234. }
  235.  
  236. bool CAsyncSQL::PopResult(SQLMsg ** pp)
  237. {
  238.     std::lock_guard<std::mutex> g(m_mtxResult);
  239.     if (m_queue_result.empty())
  240.     {
  241.         return false;
  242.     }
  243.  
  244.     *pp = m_queue_result.front();
  245.     m_queue_result.pop();
  246.     return true;
  247. }
  248.  
  249. void CAsyncSQL::PushQuery(SQLMsg * p)
  250. {
  251.     std::lock_guard<std::mutex> g(m_mtxQuery);
  252.     m_queue_query.push(p);
  253.     m_sem.Release();
  254. }
  255.  
  256. bool CAsyncSQL::PeekQuery(SQLMsg ** pp)
  257. {
  258.     std::lock_guard<std::mutex> g(m_mtxQuery);
  259.  
  260.     if (m_queue_query.empty())
  261.     {
  262.         return false;
  263.     }
  264.  
  265.     *pp = m_queue_query.front();
  266.     return true;
  267. }
  268.  
  269. void CAsyncSQL::PopQuery(int iID)
  270. {
  271.     std::lock_guard<std::mutex> g(m_mtxQuery);
  272.     m_queue_query.pop();
  273. }
  274.  
  275. bool CAsyncSQL::PeekQueryFromCopyQueue(SQLMsg ** pp)
  276. {
  277.     if (m_queue_query_copy.empty())
  278.         return false;
  279.  
  280.     *pp = m_queue_query_copy.front();
  281.     return true;
  282. }
  283.  
  284. int CAsyncSQL::CopyQuery()
  285. {
  286.     std::lock_guard<std::mutex> g(m_mtxQuery);
  287.     if (m_queue_query.empty())
  288.         return -1;
  289.  
  290.     while (!m_queue_query.empty())
  291.     {
  292.         m_queue_query_copy.push(m_queue_query.front());
  293.         m_queue_query.pop();
  294.     }
  295.     return m_queue_query_copy.size();
  296. }
  297.  
  298. void CAsyncSQL::PopQueryFromCopyQueue()
  299. {
  300.     m_queue_query_copy.pop();
  301. }
  302. int CAsyncSQL::GetCopiedQueryCount()
  303. {
  304.     return m_iCopiedQuery;
  305. }
  306. void CAsyncSQL::ResetCopiedQueryCount()
  307. {
  308.     m_iCopiedQuery = 0;
  309. }
  310.  
  311. void CAsyncSQL::AddCopiedQueryCount(int iCopiedQuery)
  312. {
  313.     m_iCopiedQuery += iCopiedQuery;
  314. }
  315.  
  316. DWORD CAsyncSQL::CountQuery()
  317. {
  318.     return m_queue_query.size();
  319. }
  320.  
  321. DWORD CAsyncSQL::CountResult()
  322. {
  323.     return m_queue_result.size();
  324. }
  325.  
  326. class cProfiler
  327. {
  328. private:
  329.     std::chrono::time_point<std::chrono::steady_clock> m_start;
  330.     std::chrono::duration<float>& m_duration;
  331. public:
  332.     cProfiler(std::chrono::duration<float>& duration):
  333.         m_duration(duration)
  334.     {
  335.         m_start = std::chrono::steady_clock::now();
  336.     }
  337.  
  338.     ~cProfiler()
  339.     {
  340.         m_duration = (std::chrono::steady_clock::now() - m_start);
  341.     }
  342. };
  343.  
  344. void CAsyncSQL::ChildLoop()
  345. {
  346.     while (!m_bEnd)
  347.     {
  348.         m_sem.Wait();
  349.  
  350.         int count = CopyQuery();
  351.  
  352.         if (count <= 0)
  353.             continue;
  354.  
  355.         AddCopiedQueryCount(count);
  356.  
  357.         SQLMsg * p;
  358.         std::chrono::duration<float> timer;
  359.         while (count--)
  360.         {
  361.             {// Start time check
  362.                
  363.                 cProfiler profiler(timer);
  364.  
  365.                 if (!PeekQueryFromCopyQueue(&p))
  366.                     continue;
  367.  
  368.                 if (m_ulThreadID != mysql_thread_id(&m_hDB))
  369.                 {
  370.                     sys_err("MySQL connection was reconnected. querying locale set");
  371.                     while (!QueryLocaleSet());
  372.                     m_ulThreadID = mysql_thread_id(&m_hDB);
  373.                 }
  374.  
  375.                 if (mysql_real_query(&m_hDB, p->stQuery.c_str(), p->stQuery.length()))
  376.                 {
  377.                     p->uiSQLErrno = mysql_errno(&m_hDB);
  378.  
  379.                     sys_err("AsyncSQL: query failed: %s (query: %s errno: %d)",
  380.                             mysql_error(&m_hDB), p->stQuery.c_str(), p->uiSQLErrno);
  381.  
  382.                     switch (p->uiSQLErrno)
  383.                     {
  384.                         case CR_SOCKET_CREATE_ERROR:
  385.                         case CR_CONNECTION_ERROR:
  386.                         case CR_IPSOCK_ERROR:
  387.                         case CR_UNKNOWN_HOST:
  388.                         case CR_SERVER_GONE_ERROR:
  389.                         case CR_CONN_HOST_ERROR:
  390.                         case ER_NOT_KEYFILE:
  391.                         case ER_CRASHED_ON_USAGE:
  392.                         case ER_CANT_OPEN_FILE:
  393.                         case ER_HOST_NOT_PRIVILEGED:
  394.                         case ER_HOST_IS_BLOCKED:
  395.                         case ER_PASSWORD_NOT_ALLOWED:
  396.                         case ER_PASSWORD_NO_MATCH:
  397.                         case ER_CANT_CREATE_THREAD:
  398.                         case ER_INVALID_USE_OF_NULL:
  399.                             m_sem.Release();
  400.                             sys_err("AsyncSQL: retrying");
  401.                             continue;
  402.                     }
  403.                 }
  404.             }
  405.             // Output to Log if it takes more than 0.5 seconds
  406.             if (timer.count() > 0.5)
  407.                 sys_log(0, "[QUERY : LONG INTERVAL(OverSec %f)] : %s", static_cast<float>(timer.count()), p->stQuery.c_str());
  408.  
  409.             PopQueryFromCopyQueue();
  410.  
  411.             if (p->bReturn)
  412.             {
  413.                 p->Store();
  414.                 PushResult(p);
  415.             }
  416.             else
  417.                 delete p;
  418.  
  419.             ++m_iQueryFinished;
  420.         }
  421.     }
  422.  
  423.     SQLMsg * p;
  424.  
  425.     while (PeekQuery(&p))
  426.     {
  427.         if (m_ulThreadID != mysql_thread_id(&m_hDB))
  428.         {
  429.             sys_err("MySQL connection was reconnected. querying locale set");
  430.             while (!QueryLocaleSet());
  431.             m_ulThreadID = mysql_thread_id(&m_hDB);
  432.         }
  433.  
  434.         if (mysql_real_query(&m_hDB, p->stQuery.c_str(), p->stQuery.length()))
  435.         {
  436.             p->uiSQLErrno = mysql_errno(&m_hDB);
  437.  
  438.             sys_err("AsyncSQL::ChildLoop : mysql_query error: %s:\nquery: %s",
  439.                     mysql_error(&m_hDB), p->stQuery.c_str());
  440.  
  441.             switch (p->uiSQLErrno)
  442.             {
  443.                 case CR_SOCKET_CREATE_ERROR:
  444.                 case CR_CONNECTION_ERROR:
  445.                 case CR_IPSOCK_ERROR:
  446.                 case CR_UNKNOWN_HOST:
  447.                 case CR_SERVER_GONE_ERROR:
  448.                 case CR_CONN_HOST_ERROR:
  449.                 case ER_NOT_KEYFILE:
  450.                 case ER_CRASHED_ON_USAGE:
  451.                 case ER_CANT_OPEN_FILE:
  452.                 case ER_HOST_NOT_PRIVILEGED:
  453.                 case ER_HOST_IS_BLOCKED:
  454.                 case ER_PASSWORD_NOT_ALLOWED:
  455.                 case ER_PASSWORD_NO_MATCH:
  456.                 case ER_CANT_CREATE_THREAD:
  457.                 case ER_INVALID_USE_OF_NULL:
  458.                     continue;
  459.             }
  460.         }
  461.  
  462.         sys_log(0, "QUERY_FLUSH: %s", p->stQuery.c_str());
  463.  
  464.         PopQuery(p->iID);
  465.  
  466.         if (p->bReturn)
  467.         {
  468.             p->Store();
  469.             PushResult(p);
  470.         }
  471.         else
  472.             delete p;
  473.  
  474.         ++m_iQueryFinished;
  475.     }
  476. }
  477.  
  478. int CAsyncSQL::CountQueryFinished()
  479. {
  480.     return m_iQueryFinished;
  481. }
  482.  
  483. void CAsyncSQL::ResetQueryFinished()
  484. {
  485.     m_iQueryFinished = 0;
  486. }
  487.  
  488. MYSQL * CAsyncSQL::GetSQLHandle()
  489. {
  490.     return &m_hDB;
  491. }
  492.  
  493. size_t CAsyncSQL::EscapeString(char* dst, size_t dstSize, const char *src, size_t srcSize)
  494. {
  495.     if (0 == srcSize)
  496.     {
  497.         memset(dst, 0, dstSize);
  498.         return 0;
  499.     }
  500.  
  501.     if (0 == dstSize)
  502.         return 0;
  503.  
  504.     if (dstSize < srcSize * 2 + 1)
  505.     {
  506.         // Copy only 256 bytes to log when \0 is not attached
  507.         char tmp[256];
  508.         size_t tmpLen = sizeof(tmp) > srcSize ? srcSize : sizeof(tmp);
  509.         strlcpy(tmp, src, tmpLen);
  510.  
  511.         sys_err("FATAL ERROR!! not enough buffer size (dstSize %u srcSize %u src%s: %s)",
  512.                 dstSize, srcSize, tmpLen != srcSize ? "(trimmed to 255 characters)" : "", tmp);
  513.  
  514.         dst[0] = '\0';
  515.         return 0;
  516.     }
  517.  
  518.     return mysql_real_escape_string(GetSQLHandle(), dst, src, srcSize);
  519. }
  520.  
  521. void CAsyncSQL2::SetLocale(const std::string & stLocale)
  522. {
  523.     m_stLocale = stLocale;
  524.     QueryLocaleSet();
  525. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement