Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- <?#include <iostream>
- #include <pqxx/pqxx>
- #include <unistd.h>
- #include "jcomms.h"
- #include <algorithm>
- #include <boost/lexical_cast.hpp>
- using namespace pqxx;
- using namespace std;
- using boost::bad_lexical_cast;
- using boost::lexical_cast;
- #define MAX_THREADS 100 /* Maximum number of threads of each type (customer and box) */
- /**
- * Contains the data needed to be passed to the thread for each customer.
- */
- struct cust_thread_data
- {
- string customerDb;
- };
- /**
- * Contains the data needed to be passed to each box for processing.
- */
- struct box_thread_data
- {
- jcomms *jcommsConnection;
- vector<std::string> parameters;
- std::string dbName;
- std::string schema;
- bool completed; /* Flag to tell controlling thread whether the box thread has completed it's work */
- };
- /**
- * Function that will be run in a new thread for each box. This is NOT threadsafe if you have two threads running simultaniously for a single box.
- *
- * @param threadarg An instance of the box_thread_data struct containing all the data we need.
- */
- void *processBox (void *threadarg)
- {
- struct box_thread_data *my_data;
- my_data = (struct box_thread_data *) threadarg;
- std::string schema = my_data->schema;
- jcomms &jcommsConn = *my_data->jcommsConnection;
- vector<std::string> wantedParams = my_data->parameters;
- std::string dbName = my_data->dbName;
- connection customerDb("dbname=" + dbName + " user=dexdyne password=mik4-sskqu hostaddr=127.0.0.1 port=5432");
- if (customerDb.is_open())
- {
- cout << "Connected to the customer database" << endl;
- }
- else
- {
- cout << "Something went wrong. Please don't panic and find the nearest exit." << endl;
- }
- std::cout << "Working on box: " << jcommsConn.url << endl;
- std::cout << "Params after jcomms connection made: " << jcommsConn.currentParams.size() << " URL: " <<
- jcommsConn.url << ":" << jcommsConn.port << std::endl;
- std::sort(wantedParams.begin(), wantedParams.end());
- std::vector<std::string> currentParams = jcommsConn.currentParams;
- std::sort(currentParams.begin(), currentParams.end());
- cout << "Seeing if we need to reregister" << endl;
- cout << wantedParams.size() << ":" << jcommsConn.currentParams.size() << endl;
- if (wantedParams.size() != jcommsConn.currentParams.size())
- {
- cout << "Registering Parameters" << endl;
- jcommsConn.registerParams(wantedParams);
- }
- else if (wantedParams != currentParams)
- {
- cout << "Registering Parameters" << endl;
- jcommsConn.registerParams(wantedParams);
- }
- cout << "Getting registered param values." << endl;
- std::map<std::string, double> params = jcommsConn.getRegisteredParamValues();
- typedef std::map<std::string, double>::iterator it_type;
- for (it_type iterator = params.begin(); iterator != params.end(); ++iterator) {
- std::cout << "Inserting Value: " << iterator->first << ":" << iterator->second << std::endl;
- string updateQuery = "UPDATE " + schema + ".\"liveValues\" SET last_written=NOW(), value=" +
- lexical_cast<std::string>(iterator->second) + " WHERE name='" + iterator->first + "'";
- work W(customerDb);
- W.exec(updateQuery);
- W.commit();
- }
- std::map<std::string, double> toSetValues;
- std::string sql;
- string myQuery = "SELECT table_name FROM information_schema.tables WHERE table_schema = '" +
- schema + "' AND table_name = 'toWriteValues'";
- nontransaction M(customerDb);
- result T(M.exec(myQuery));
- M.commit();
- if ((int) T.size() != 0)
- {
- sql = "SELECT \"name\", value FROM " + schema + ".\"toWriteValues\" WHERE inserted=FALSE";
- nontransaction N(customerDb);
- result R(N.exec(sql));
- N.commit();
- for (result::const_iterator c = R.begin(); c != R.end(); ++ c)
- {
- cout << "Processing insert value with name: " << c[0].as<string>() << endl;
- toSetValues[c[0].as<string>()] = c[1].as<double>();
- sql = "UPDATE " + schema + ".\"toWriteValues\" SET inserted=TRUE WHERE \"name\"='" + c[0].as<string>() + "'";
- work W(customerDb);
- W.exec(sql);
- W.commit();
- }
- jcommsConn.setParams(toSetValues);
- }
- std::cout << "Params at end of run: " << jcommsConn.currentParams.size() << std::endl;
- std::cout << "Disconencting from database." << endl;
- customerDb.disconnect();
- std::cout << "----------------------------------- Finished RUN ------------------------------" << std::endl;
- my_data->completed = true;
- pthread_detach(pthread_self());
- }
- /**
- * Function that will run in a separate thread for each customer. Not convinced this actually needs to be in a thread anymore though.
- *
- * @param threadarg An instance of the cust_thread_data struct containing all the data we need.
- */
- void *processCustomer (void *threadarg)
- {
- struct cust_thread_data *my_data;
- my_data = (struct cust_thread_data *) threadarg;
- string dbName = my_data->customerDb;
- cout << "Working on DB: " << dbName;
- const char *sql;
- int rc;
- std::map<std::string, pthread_t> threads;
- std::map<std::string, box_thread_data> td;
- int i = 0;
- std::map<std::string, jcomms> Connections;
- try
- {
- connection customerDb("dbname=" + dbName + " user=dexdyne password=mik4-sskqu hostaddr=127.0.0.1 port=5432");
- if (customerDb.is_open())
- {
- cout << "Connected to the customer database" << endl;
- }
- else
- {
- cout << "Something went wrong. Please don't panic and find the nearest exit." << endl;
- }
- while (true)
- {
- sql = "SELECT \"schemaName\", vpnip FROM installations";
- nontransaction N(customerDb);
- result R(N.exec(sql));
- N.commit();
- for (result::const_iterator c = R.begin(); c != R.end(); ++ c)
- {
- try
- {
- cout << "Processing box with schema: " << c[0].as<string>() << endl;
- // Decide if there are any params on the box that we are interested in.
- string myQuery = "SELECT table_name FROM information_schema.tables WHERE table_schema = '" +
- c[0].as<string>() + "' AND table_name = 'liveValues'";
- nontransaction M(customerDb);
- result T(M.exec(myQuery));
- M.commit();
- if ((int) T.size() == 0)
- {
- continue;
- }
- cout << "Table called liveValues? " << T.size() << endl;
- myQuery = "SELECT name FROM " + c[0].as<string>() +
- ".\"liveValues\" WHERE last_accessed > (NOW() - interval '3 minutes')";
- nontransaction N(customerDb);
- result R(N.exec(myQuery));
- N.commit();
- cout << "Params we are interested in: " << R.size() << endl;
- if ((int) R.size() > 0)
- {
- cout << "We are interested in this box." << endl;
- std::vector<std::string> wantedParams;
- for (result::const_iterator ci = R.begin(); ci != R.end(); ++ ci)
- {
- wantedParams.push_back(ci[0].as<string>());
- }
- if (Connections.find(c[1].as<string>()) == Connections.end())
- {
- Connections.insert(
- pair<std::string, jcomms>(c[1].as<string>(), jcomms(c[1].as<string>(), 80)));
- cout << "starting jcomms session" << endl;
- }
- else
- {
- cout << "jcomms session already started" << endl;
- }
- jcomms &jcommsConn = Connections.find(c[1].as<string>())->second;
- if ((td.find(c[1].as<string>()) == td.end()) || td[c[1].as<string>()].completed == true)
- {
- td[c[1].as<string>()].jcommsConnection = &jcommsConn;
- td[c[1].as<string>()].parameters = wantedParams;
- td[c[1].as<string>()].schema = c[0].as<string>();
- td[c[1].as<string>()].dbName = dbName;
- td[c[1].as<string>()].completed = false;
- cout << "Starting box process thread." << endl;
- rc = pthread_create(&threads[c[1].as<string>()], NULL, processBox,
- (void *) &td[c[1].as<string>()]);
- if (rc)
- {
- cout << "Error:unable to create thread," << rc << endl;
- exit(-1);
- }
- }
- else
- {
- cout << "Thread for box already running." << endl;
- }
- }
- else
- {
- cout << "We are not interested in this box." << endl;
- }
- }
- catch (int n)
- {
- cout << "There was an error." << n << endl;
- }
- }
- sleep(10);
- i++;
- }
- }
- catch (const std::exception &e)
- {
- cout << "There is a fire in the building!!! Please walk calmly to the exit and don't panic." << endl << endl;
- cerr << e.what() << std::endl;
- pthread_detach(pthread_self());
- }
- pthread_detach(pthread_self());
- }
- /**
- * Main entry function for c++.
- */
- int main ()
- {
- char const *sql;
- //pthread_t threads[NUM_THREADS];
- int rc;
- void *status;
- try
- {
- struct cust_thread_data td[100];
- connection C("dbname=enterprise user=dexdyne password=mik4-sskqu \
- hostaddr=127.0.0.1 port=5432");
- if (C.is_open())
- {
- cout << "Opened database successfully: " << C.dbname() << endl;
- }
- else
- {
- cout << "Can't open database" << endl;
- return 1;
- }
- // Create an SQL statement.
- sql = "SELECT name, \"middlewareName\" FROM customers";
- nontransaction N(C);
- result R(N.exec(sql));
- int i = 0;
- //cout << (int)R.size() << endl;
- pthread_t threads[MAX_THREADS];
- for (result::const_iterator c = R.begin(); c != R.end(); ++ c)
- {
- cout << "Processing customer: " << c[0].as<string>() << " With database: " << c[1].as<string>() << endl;
- td[i].customerDb = c[1].as<string>();
- rc = pthread_create(&threads[i], NULL, processCustomer, (void *) &td[0]);
- if (rc)
- {
- cout << "Error:unable to create thread," << rc << endl;
- exit(-1);
- }
- i++;
- }
- /**
- * @FIXME: MAH 03/12/15
- * There should really be some way here of detecting unexpectedly closing threads to re-start them.
- */
- pthread_join(threads[0], NULL);
- }
- catch (const std::exception &e)
- {
- cerr << e.what() << std::endl;
- return 1;
- }
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement