Advertisement
Guest User

Untitled

a guest
Apr 7th, 2016
91
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 11.82 KB | None | 0 0
  1.  
  2.  <?#include <iostream>
  3. #include <pqxx/pqxx>
  4. #include <unistd.h>
  5. #include "jcomms.h"
  6. #include <algorithm>
  7. #include <boost/lexical_cast.hpp>
  8. using namespace pqxx;
  9. using namespace std;
  10. using boost::bad_lexical_cast;
  11. using boost::lexical_cast;
  12. #define MAX_THREADS 100 /* Maximum number of threads of each type (customer and box) */
  13. /**
  14.  * Contains the data needed to be passed to the thread for each customer.
  15.  */
  16. struct cust_thread_data
  17. {
  18.     string customerDb;
  19. };
  20. /**
  21.  * Contains the data needed to be passed to each box for processing.
  22.  */
  23. struct box_thread_data
  24. {
  25.     jcomms              *jcommsConnection;
  26.     vector<std::string> parameters;
  27.     std::string         dbName;
  28.     std::string         schema;
  29.     bool                completed; /* Flag to tell controlling thread whether the box thread has completed it's work */
  30. };
  31. /**
  32.  * Function that will be run in a new thread for each box. This is NOT threadsafe if you have two threads running simultaniously for a single box.
  33.  *
  34.  * @param threadarg    An instance of the box_thread_data struct containing all the data we need.
  35.  */
  36. void *processBox (void *threadarg)
  37. {
  38.     struct box_thread_data *my_data;
  39.     my_data = (struct box_thread_data *) threadarg;
  40.     std::string         schema       = my_data->schema;
  41.     jcomms              &jcommsConn  = *my_data->jcommsConnection;
  42.     vector<std::string> wantedParams = my_data->parameters;
  43.     std::string         dbName       = my_data->dbName;
  44.     connection customerDb("dbname=" + dbName + " user=dexdyne password=mik4-sskqu hostaddr=127.0.0.1 port=5432");
  45.     if (customerDb.is_open())
  46.     {
  47.         cout << "Connected to the customer database" << endl;
  48.     }
  49.     else
  50.     {
  51.         cout << "Something went wrong. Please don't panic and find the nearest exit." << endl;
  52.     }
  53.     std::cout << "Working on box: " << jcommsConn.url << endl;
  54.     std::cout << "Params after jcomms connection made: " << jcommsConn.currentParams.size() << " URL: " <<
  55.     jcommsConn.url << ":" << jcommsConn.port << std::endl;
  56.     std::sort(wantedParams.begin(), wantedParams.end());
  57.     std::vector<std::string> currentParams = jcommsConn.currentParams;
  58.     std::sort(currentParams.begin(), currentParams.end());
  59.     cout << "Seeing if we need to reregister" << endl;
  60.     cout << wantedParams.size() << ":" << jcommsConn.currentParams.size() << endl;
  61.     if (wantedParams.size() != jcommsConn.currentParams.size())
  62.     {
  63.         cout << "Registering Parameters" << endl;
  64.         jcommsConn.registerParams(wantedParams);
  65.     }
  66.     else if (wantedParams != currentParams)
  67.     {
  68.         cout << "Registering Parameters" << endl;
  69.         jcommsConn.registerParams(wantedParams);
  70.     }
  71.     cout << "Getting registered param values." << endl;
  72.     std::map<std::string, double> params = jcommsConn.getRegisteredParamValues();
  73.     typedef std::map<std::string, double>::iterator it_type;
  74.     for (it_type iterator = params.begin(); iterator != params.end(); ++iterator) {
  75.         std::cout << "Inserting Value: " << iterator->first << ":" << iterator->second << std::endl;
  76.         string updateQuery = "UPDATE " + schema + ".\"liveValues\" SET last_written=NOW(), value=" +
  77.                              lexical_cast<std::string>(iterator->second) + " WHERE name='" + iterator->first + "'";
  78.         work W(customerDb);
  79.         W.exec(updateQuery);
  80.         W.commit();
  81.     }
  82.     std::map<std::string, double> toSetValues;
  83.     std::string sql;
  84.     string myQuery = "SELECT table_name FROM   information_schema.tables WHERE  table_schema = '" +
  85.                      schema + "' AND    table_name = 'toWriteValues'";
  86.     nontransaction M(customerDb);
  87.     result T(M.exec(myQuery));
  88.     M.commit();
  89.     if ((int) T.size() != 0)
  90.     {
  91.         sql = "SELECT \"name\", value FROM " + schema + ".\"toWriteValues\" WHERE inserted=FALSE";
  92.         nontransaction N(customerDb);
  93.         result         R(N.exec(sql));
  94.         N.commit();
  95.         for (result::const_iterator c = R.begin(); c != R.end(); ++ c)
  96.         {
  97.             cout << "Processing insert value with name: " << c[0].as<string>() << endl;
  98.             toSetValues[c[0].as<string>()] = c[1].as<double>();
  99.             sql = "UPDATE " + schema + ".\"toWriteValues\" SET inserted=TRUE WHERE \"name\"='" + c[0].as<string>() + "'";
  100.             work W(customerDb);
  101.             W.exec(sql);
  102.             W.commit();
  103.         }
  104.         jcommsConn.setParams(toSetValues);
  105.     }
  106.     std::cout << "Params at end of run: " << jcommsConn.currentParams.size() << std::endl;
  107.     std::cout << "Disconencting from database." << endl;
  108.     customerDb.disconnect();
  109.     std::cout << "----------------------------------- Finished RUN ------------------------------" << std::endl;
  110.     my_data->completed = true;
  111.     pthread_detach(pthread_self());
  112. }
  113. /**
  114.  * Function that will run in a separate thread for each customer. Not convinced this actually needs to be in a thread anymore though.
  115.  *
  116.  * @param threadarg    An instance of the cust_thread_data struct containing all the data we need.
  117.  */
  118. void *processCustomer (void *threadarg)
  119. {
  120.     struct cust_thread_data *my_data;
  121.     my_data = (struct cust_thread_data *) threadarg;
  122.     string dbName = my_data->customerDb;
  123.     cout << "Working on DB: " << dbName;
  124.     const char *sql;
  125.     int        rc;
  126.     std::map<std::string, pthread_t>       threads;
  127.     std::map<std::string, box_thread_data> td;
  128.     int i = 0;
  129.     std::map<std::string, jcomms> Connections;
  130.     try
  131.     {
  132.         connection customerDb("dbname=" + dbName + " user=dexdyne password=mik4-sskqu hostaddr=127.0.0.1 port=5432");
  133.         if (customerDb.is_open())
  134.         {
  135.             cout << "Connected to the customer database" << endl;
  136.         }
  137.         else
  138.         {
  139.             cout << "Something went wrong. Please don't panic and find the nearest exit." << endl;
  140.         }
  141.         while (true)
  142.         {
  143.             sql = "SELECT \"schemaName\", vpnip FROM installations";
  144.             nontransaction N(customerDb);
  145.             result R(N.exec(sql));
  146.             N.commit();
  147.             for (result::const_iterator c = R.begin(); c != R.end(); ++ c)
  148.             {
  149.                 try
  150.                 {
  151.                     cout << "Processing box with schema: " << c[0].as<string>() << endl;
  152.                     // Decide if there are any params on the box that we are interested in.
  153.                     string myQuery = "SELECT table_name FROM   information_schema.tables WHERE  table_schema = '" +
  154.                                      c[0].as<string>() + "' AND    table_name = 'liveValues'";
  155.                     nontransaction M(customerDb);
  156.                     result T(M.exec(myQuery));
  157.                     M.commit();
  158.                     if ((int) T.size() == 0)
  159.                     {
  160.                         continue;
  161.                     }
  162.                     cout << "Table called liveValues? " << T.size() << endl;
  163.                     myQuery = "SELECT name FROM " + c[0].as<string>() +
  164.                               ".\"liveValues\" WHERE last_accessed > (NOW() - interval '3 minutes')";
  165.                     nontransaction N(customerDb);
  166.                     result         R(N.exec(myQuery));
  167.                     N.commit();
  168.                     cout << "Params we are interested in: " << R.size() << endl;
  169.                     if ((int) R.size() > 0)
  170.                     {
  171.                         cout << "We are interested in this box." << endl;
  172.                         std::vector<std::string> wantedParams;
  173.                         for (result::const_iterator ci = R.begin(); ci != R.end(); ++ ci)
  174.                         {
  175.                             wantedParams.push_back(ci[0].as<string>());
  176.                         }
  177.                         if (Connections.find(c[1].as<string>()) == Connections.end())
  178.                         {
  179.                             Connections.insert(
  180.                                     pair<std::string, jcomms>(c[1].as<string>(), jcomms(c[1].as<string>(), 80)));
  181.                             cout << "starting jcomms session" << endl;
  182.                         }
  183.                         else
  184.                         {
  185.                             cout << "jcomms session already started" << endl;
  186.                         }
  187.                         jcomms &jcommsConn = Connections.find(c[1].as<string>())->second;
  188.                         if ((td.find(c[1].as<string>()) == td.end()) || td[c[1].as<string>()].completed == true)
  189.                         {
  190.                             td[c[1].as<string>()].jcommsConnection = &jcommsConn;
  191.                             td[c[1].as<string>()].parameters       = wantedParams;
  192.                             td[c[1].as<string>()].schema           = c[0].as<string>();
  193.                             td[c[1].as<string>()].dbName           = dbName;
  194.                             td[c[1].as<string>()].completed        = false;
  195.                             cout << "Starting box process thread." << endl;
  196.                             rc = pthread_create(&threads[c[1].as<string>()], NULL, processBox,
  197.                                                 (void *) &td[c[1].as<string>()]);
  198.                             if (rc)
  199.                             {
  200.                                 cout << "Error:unable to create thread," << rc << endl;
  201.                                 exit(-1);
  202.                             }
  203.                         }
  204.                         else
  205.                         {
  206.                             cout << "Thread for box already running." << endl;
  207.                         }
  208.                     }
  209.                     else
  210.                     {
  211.                         cout << "We are not interested in this box." << endl;
  212.                     }
  213.                 }
  214.                 catch (int n)
  215.                 {
  216.                     cout << "There was an error." << n << endl;
  217.                 }
  218.             }
  219.             sleep(10);
  220.             i++;
  221.         }
  222.     }
  223.     catch (const std::exception &e)
  224.     {
  225.         cout << "There is a fire in the building!!! Please walk calmly to the exit and don't panic." << endl << endl;
  226.         cerr << e.what() << std::endl;
  227.         pthread_detach(pthread_self());
  228.     }
  229.     pthread_detach(pthread_self());
  230. }
  231. /**
  232.  * Main entry function for c++.
  233.  */
  234. int main ()
  235. {
  236.     char const *sql;
  237.     //pthread_t threads[NUM_THREADS];
  238.     int rc;
  239.     void *status;
  240.     try
  241.     {
  242.         struct cust_thread_data td[100];
  243.         connection C("dbname=enterprise user=dexdyne password=mik4-sskqu \
  244.                      hostaddr=127.0.0.1 port=5432");
  245.         if (C.is_open())
  246.         {
  247.             cout << "Opened database successfully: " << C.dbname() << endl;
  248.         }
  249.         else
  250.         {
  251.             cout << "Can't open database" << endl;
  252.             return 1;
  253.         }
  254.         // Create an SQL statement.
  255.         sql = "SELECT name, \"middlewareName\" FROM customers";
  256.         nontransaction N(C);
  257.         result R(N.exec(sql));
  258.         int i = 0;
  259.         //cout << (int)R.size() << endl;
  260.         pthread_t threads[MAX_THREADS];
  261.         for (result::const_iterator c = R.begin(); c != R.end(); ++ c)
  262.         {
  263.             cout << "Processing customer: " << c[0].as<string>() << " With database: " << c[1].as<string>() << endl;
  264.             td[i].customerDb = c[1].as<string>();
  265.             rc = pthread_create(&threads[i], NULL, processCustomer, (void *) &td[0]);
  266.             if (rc)
  267.             {
  268.                 cout << "Error:unable to create thread," << rc << endl;
  269.                 exit(-1);
  270.             }
  271.             i++;
  272.         }
  273.         /**
  274.          * @FIXME: MAH 03/12/15
  275.          *     There should really be some way here of detecting unexpectedly closing threads to re-start them.
  276.          */
  277.         pthread_join(threads[0], NULL);
  278.     }
  279.     catch (const std::exception &e)
  280.     {
  281.         cerr << e.what() << std::endl;
  282.         return 1;
  283.     }
  284.     return 0;
  285. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement