Advertisement
Guest User

Untitled

a guest
Aug 13th, 2017
62
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 24.84 KB | None | 0 0
  1. /**
  2. * Fetches data from the BuildingOS API and caches it in the db
  3. *
  4. * @author Tim Robert-Fitzgerald
  5. */
  6.  
  7. #define _XOPEN_SOURCE // for strptime
  8. #define _GNU_SOURCE // for strptime
  9. #define PRIORITY_METER "SELECT id, org_id, url, live_last_updated FROM meters WHERE source = 'buildingos' AND live_last_updated < (UNIX_TIMESTAMP() - 300) AND (for_orb > 0 OR bos_uuid IN (SELECT DISTINCT meter_uuid FROM relative_values WHERE permission = 'orb_server' AND meter_uuid != '')) AND id NOT IN (SELECT updating_meter FROM daemons WHERE target_res = 'live') ORDER BY live_last_updated ASC LIMIT 1"
  10. #define LIVE_TARGET_METER "SELECT id, org_id, url, live_last_updated FROM meters WHERE source = 'buildingos' AND ((gauges_using > 0 OR for_orb > 0 OR timeseries_using > 0) OR bos_uuid IN (SELECT DISTINCT meter_uuid FROM relative_values WHERE permission = 'orb_server' AND meter_uuid != '')) AND id NOT IN (SELECT updating_meter FROM daemons WHERE target_res = 'live') ORDER BY live_last_updated ASC LIMIT 1"
  11. #define QH_TARGET_METER "SELECT id, org_id, url, quarterhour_last_updated FROM meters WHERE source = 'buildingos' AND ((gauges_using > 0 OR for_orb > 0 OR timeseries_using > 0) OR bos_uuid IN (SELECT DISTINCT meter_uuid FROM relative_values WHERE permission = 'orb_server' AND meter_uuid != '')) AND id NOT IN (SELECT updating_meter FROM daemons WHERE target_res = 'live') ORDER BY quarterhour_last_updated ASC LIMIT 1"
  12. #define HOUR_TARGET_METER "SELECT id, org_id, url, hour_last_updated FROM meters WHERE source = 'buildingos' AND ((gauges_using > 0 OR for_orb > 0 OR timeseries_using > 0) OR bos_uuid IN (SELECT DISTINCT meter_uuid FROM relative_values WHERE permission = 'orb_server' AND meter_uuid != '')) AND id NOT IN (SELECT updating_meter FROM daemons WHERE target_res = 'live') ORDER BY hour_last_updated ASC LIMIT 1"
  13. #define MONTH_TARGET_METER "SELECT id, org_id, url, month_last_updated FROM meters WHERE source = 'buildingos' AND ((gauges_using > 0 OR for_orb > 0 OR timeseries_using > 0) OR bos_uuid IN (SELECT DISTINCT meter_uuid FROM relative_values WHERE permission = 'orb_server' AND meter_uuid != '')) AND id NOT IN (SELECT updating_meter FROM daemons WHERE target_res = 'live') ORDER BY month_last_updated ASC LIMIT 1"
  14. #define UPDATE_LIVE_TIMESTAMP "UPDATE meters SET live_last_updated = %d WHERE id = %d"
  15. #define UPDATE_QH_TIMESTAMP "UPDATE meters SET quarterhour_last_updated = %d WHERE id = %d"
  16. #define UPDATE_HOUR_TIMESTAMP "UPDATE meters SET hour_last_updated = %d WHERE id = %d"
  17. #define UPDATE_MONTH_TIMESTAMP "UPDATE meters SET month_last_updated = %d WHERE id = %d"
  18. #define TOKEN_URL "https://api.buildingos.com/o/token/" // where to get the token from
  19. #define ISO8601_FORMAT "%Y-%m-%dT%H:%M:%S%z"
  20. #define ISO8601_FORMAT_EST "%Y-%m-%dT%H:%M:%S-04:00"
  21. #define BUFFER_FILE "/root/meter_data.csv"
  22. #define SMALL_CONTAINER 255 // small fixed-size container for arrays
  23. #define MED_CONTAINER 510 // just double SMALL_CONTAINER
  24. #define LIVE_DATA_LIFESPAN 7200 // live data is stored for 2 hours i.e. 7200s
  25. #define QH_DATA_LIFESPAN 1209600 // 2 weeks
  26. #define HOUR_DATA_LIFESPAN 5184000 // 2 months
  27. #define MONTH_DATA_LIFESPAN 63113904 // 2 years
  28. #define UPDATE_CURRENT 1 // update the meters.current column with the current reading?
  29. #define READONLY_MODE 0 // if on (i.e. 1) the daemon will not make queries that update/insert/delete data by short circuiting if stmts
  30.  
  31. #include <stdio.h>
  32. #include <string.h>
  33. #include <unistd.h>
  34. #include <time.h>
  35. #include <mysql.h>
  36. #include <sys/types.h>
  37. #include <sys/wait.h>
  38. #include <sys/prctl.h>
  39. #include <stdlib.h>
  40. #include <signal.h>
  41. #include <syslog.h>
  42. #include <sys/stat.h>
  43. #include <curl/curl.h> // install with `apt-get install libcurl4-openssl-dev`
  44. #include <curl/easy.h>
  45. #include "./lib/cJSON/cJSON.h"
  46. #include "db.h"
  47.  
  48. static pid_t buildingosd_pid;
  49. // Stores page downloaded by http_request()
  50. struct MemoryStruct {
  51. char *memory;
  52. size_t size;
  53. };
  54.  
  55. /**
  56. * Utility function copied from https://stackoverflow.com/a/779960/2624391
  57. */
  58. char *str_replace(char *orig, char *rep, char *with) {
  59. char *result; // the return string
  60. char *ins; // the next insert point
  61. char *tmp; // varies
  62. int len_rep; // length of rep (the string to remove)
  63. int len_with; // length of with (the string to replace rep with)
  64. int len_front; // distance between rep and end of last rep
  65. int count; // number of replacements
  66.  
  67. // sanity checks and initialization
  68. if (!orig || !rep) {
  69. return NULL;
  70. }
  71. len_rep = strlen(rep);
  72. if (len_rep == 0) {
  73. return NULL; // empty rep causes infinite loop during count
  74. }
  75. if (!with) {
  76. with = "";
  77. }
  78. len_with = strlen(with);
  79.  
  80. // count the number of replacements needed
  81. ins = orig;
  82. for (count = 0; (tmp = strstr(ins, rep)); ++count) {
  83. ins = tmp + len_rep;
  84. }
  85.  
  86. tmp = result = malloc(strlen(orig) + (len_with - len_rep) * count + 1);
  87.  
  88. if (!result) {
  89. return NULL;
  90. }
  91.  
  92. // first time through the loop, all the variable are set correctly
  93. // from here on,
  94. // tmp points to the end of the result string
  95. // ins points to the next occurrence of rep in orig
  96. // orig points to the remainder of orig after "end of rep"
  97. while (count--) {
  98. ins = strstr(orig, rep);
  99. len_front = ins - orig;
  100. tmp = strncpy(tmp, orig, len_front) + len_front;
  101. tmp = strcpy(tmp, with) + len_with;
  102. orig += len_front + len_rep; // move to next "end of rep"
  103. }
  104. strcpy(tmp, orig);
  105. return result;
  106. }
  107.  
  108. /**
  109. * Signal handler
  110. * @param signo [description]
  111. */
  112. static void catch_signal(int signo) {
  113. int success = system("/var/www/html/oberlin/daemons/buildingosd -d"); // lol
  114. if (success == -1) {
  115. syslog(LOG_ERR, "Unable to relaunch self");
  116. }
  117. syslog(LOG_ERR, "Caught pipe #%d; exiting", signo);
  118. }
  119.  
  120. /**
  121. * Helper for http_request()
  122. */
  123. static size_t WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp) {
  124. size_t realsize = size * nmemb;
  125. struct MemoryStruct *mem = (struct MemoryStruct *)userp;
  126. mem->memory = realloc(mem->memory, mem->size + realsize + 1);
  127. if (mem->memory == NULL) {
  128. fprintf(stderr, "not enough memory (realloc returned NULL)n");
  129. return 0;
  130. }
  131. memcpy(&(mem->memory[mem->size]), contents, realsize);
  132. mem->size += realsize;
  133. mem->memory[mem->size] = 0;
  134. return realsize;
  135. }
  136.  
  137. /**
  138. * See https://curl.haxx.se/libcurl/c/postinmemory.html
  139. * @param url http://www.example.org/
  140. * @param post e.g. Field=1&Field=2&Field=3
  141. * @param custom_header 1 for a custom header, 0 for default
  142. * @param method 1 if POST, 0 if GET
  143. */
  144. struct MemoryStruct http_request(char *url, char *post, int custom_header, int method, char *api_token) {
  145. char header[SMALL_CONTAINER];
  146. if (custom_header) {
  147. snprintf(header, sizeof(header), "Authorization: Bearer %s", api_token);
  148. }
  149. CURL *curl;
  150. CURLcode res;
  151. struct MemoryStruct chunk;
  152. chunk.memory = malloc(1); /* will be grown as needed by realloc above */
  153. chunk.size = 0; /* no data at this point */
  154. curl_global_init(CURL_GLOBAL_ALL);
  155. curl = curl_easy_init();
  156. if (curl) {
  157. if (custom_header) {
  158. struct curl_slist *chunk = NULL; // https://curl.haxx.se/libcurl/c/httpcustomheader.html
  159. /* Add a custom header */
  160. chunk = curl_slist_append(chunk, header);
  161. res = curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk);
  162. }
  163. if (method == 1) {
  164. curl_easy_setopt(curl, CURLOPT_URL, url);
  165. curl_easy_setopt(curl, CURLOPT_POSTFIELDS, post); // if we don't provide POSTFIELDSIZE, libcurl will strlen() by itself
  166. curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, (long)strlen(post)); // Perform the request, res will get the return code
  167. } else {
  168. char full_url[SMALL_CONTAINER];
  169. strcpy(full_url, url);
  170. strcat(full_url, "?");
  171. strcat(full_url, post);
  172. curl_easy_setopt(curl, CURLOPT_URL, full_url);
  173. curl_easy_setopt(curl, CURLOPT_HTTPGET, 1L);
  174. }
  175. // curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0L); // https://curl.haxx.se/libcurl/c/CURLOPT_SSL_VERIFYPEER.html
  176. /* send all data to this function */
  177. curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
  178. /* we pass our 'chunk' struct to the callback function */
  179. curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)&chunk);
  180. /* some servers don't like requests that are made without a user-agent
  181. field, so we provide one */
  182. curl_easy_setopt(curl, CURLOPT_USERAGENT, "libcurl-agent/1.0");
  183. res = curl_easy_perform(curl);
  184. /* Check for errors */
  185. if (res != CURLE_OK) {
  186. syslog(LOG_ERR, "curl_easy_perform() failed: %s", curl_easy_strerror(res));
  187. exit(1);
  188. }
  189. curl_easy_cleanup(curl);
  190. curl_global_cleanup();
  191. }
  192. return chunk;//chunk.memory;
  193. // free(chunk.memory);
  194. }
  195.  
  196. /**
  197. * Execute before program termination
  198. */
  199. void cleanup(MYSQL *conn) {
  200. char query[SMALL_CONTAINER];
  201. snprintf(query, sizeof(query), "DELETE FROM daemons WHERE pid = %d", buildingosd_pid);
  202. if (READONLY_MODE == 0 && mysql_query(conn, query)) {
  203. syslog(LOG_ERR, "%s", mysql_error(conn));
  204. }
  205. closelog();
  206. mysql_close(conn);
  207. exit(1); // this might just kill the child, but since the mysql conn is closed, daemon will die
  208. }
  209.  
  210. /**
  211. * Handle errors
  212. */
  213. void error(const char *msg, MYSQL *conn) {
  214. syslog(LOG_ERR, "%s", msg);
  215. cleanup(conn);
  216. }
  217.  
  218. /**
  219. * Fetches a single record, terminating the program if there are no results
  220. */
  221. MYSQL_ROW fetch_row(MYSQL *conn, char *query) {
  222. MYSQL_RES *res;
  223. MYSQL_ROW row;
  224. if (mysql_query(conn, query)) {
  225. error(mysql_error(conn), conn);
  226. }
  227. res = mysql_store_result(conn);
  228. row = mysql_fetch_row(res);
  229. // mysql_free_result(res);
  230. if (row == NULL) {
  231. syslog(LOG_ERR, "QUERY '%s' RETURNED 0 ROWSn", query);
  232. cleanup(conn);
  233. }
  234. return row;
  235. }
  236.  
  237. /**
  238. * Sets the API token, fetching a new one if necessary
  239. * @param conn
  240. * @param org_id to get API credentials for
  241. */
  242. char *set_api_token(MYSQL *conn, char *org_id) {
  243. char query[SMALL_CONTAINER];
  244. MYSQL_ROW row;
  245. snprintf(query, sizeof(query), "SELECT api_id FROM orgs WHERE id = %s", org_id);
  246. int api_id = atoi(fetch_row(conn, query)[0]);
  247. snprintf(query, sizeof(query), "SELECT token, token_updated FROM api WHERE id = %d", api_id);
  248. row = fetch_row(conn, query);
  249. int update_token_at = atoi(row[1]) + 3595;
  250. time_t epoch = time(NULL);
  251. struct tm *tm = localtime(&epoch);
  252. int time = (int) mktime(tm);
  253. if (update_token_at > time) { // token still not expired
  254. return row[0]; // Invalid read of size 8
  255. } else { // amortized cost; need to get new API token
  256. snprintf(query, sizeof(query), "SELECT client_id, client_secret, username, password FROM api WHERE id = '%d'", api_id);
  257. row = fetch_row(conn, query);
  258. char post_data[MED_CONTAINER];
  259. snprintf(post_data, sizeof(post_data), "client_id=%s&client_secret=%s&username=%s&password=%s&grant_type=password", row[0], row[1], row[2], row[3]);
  260. struct MemoryStruct response = http_request(TOKEN_URL, post_data, 0, 1, "");
  261. cJSON *root = cJSON_Parse(response.memory);
  262. cJSON *access_token = cJSON_GetObjectItem(root, "access_token");
  263. char *api_token = access_token->valuestring;
  264. snprintf(query, sizeof(query), "UPDATE api SET token = '%s', token_updated = %d WHERE id = %d", api_token, time, api_id);
  265. if (mysql_query(conn, query)) { // do this even if READONLY_MODE is on bc it cant hurt to update the api token
  266. error(mysql_error(conn), conn);
  267. }
  268. free(response.memory);
  269. cJSON_Delete(root);
  270. return api_token;
  271. }
  272. }
  273.  
  274. /**
  275. * Updates a meter
  276. * this function has way too many parameters, but it's better than globals
  277. * @param conn
  278. * @param meter_id
  279. * @param meter_url
  280. * @param resolution
  281. * @param start_time the earlier date
  282. * @param end_time the later date
  283. */
  284. void update_meter(MYSQL *conn, int meter_id, char *meter_url, char *api_token, char *resolution, time_t start_time, time_t end_time, int verbose) {
  285. struct tm *ts;
  286. char iso8601_end_time[30];
  287. char iso8601_start_time[30];
  288. char query[SMALL_CONTAINER];
  289. ts = localtime(&end_time);
  290. strftime(iso8601_end_time, sizeof(iso8601_end_time), ISO8601_FORMAT_EST, ts);
  291. ts = localtime(&start_time);
  292. strftime(iso8601_start_time, sizeof(iso8601_start_time), ISO8601_FORMAT_EST, ts);
  293. // Make call to the API for meter data
  294. char post_data[SMALL_CONTAINER];
  295. char *encoded_iso8601_start_time = str_replace(iso8601_start_time, ":", "%3A");
  296. char *encoded_iso8601_end_time = str_replace(iso8601_end_time, ":", "%3A");
  297. snprintf(post_data, sizeof(post_data), "resolution=%s&start=%s&end=%s", resolution, encoded_iso8601_start_time, encoded_iso8601_end_time);
  298. free(encoded_iso8601_start_time);
  299. free(encoded_iso8601_end_time);
  300. struct MemoryStruct response = http_request(meter_url, post_data, 1, 0, api_token);
  301. cJSON *root = cJSON_Parse(response.memory);
  302. if (!cJSON_HasObjectItem(root, "data")) {
  303. error(response.memory, conn);
  304. }
  305. cJSON *data = cJSON_GetObjectItem(root, "data");
  306. // save new data
  307. FILE *buffer = fopen(BUFFER_FILE, "a");
  308. if (buffer == NULL) {
  309. error("Error opening meter_data buffer", conn);
  310. }
  311. int data_size = cJSON_GetArraySize(data);
  312. double last_non_null = -9999.0; // error value
  313. for (int i = 0; i < data_size; i++) {
  314. cJSON *data_point = cJSON_GetArrayItem(data, i);
  315. cJSON *data_point_val = cJSON_GetObjectItem(data_point, "value");
  316. cJSON *data_point_time = cJSON_GetObjectItem(data_point, "localtime");
  317. char val[10];
  318. if (data_point_val->type == 4) {
  319. val[0] = '\'; val[1] = 'N'; val[2] = ''; // https://stackoverflow.com/a/2675493
  320. } else {
  321. last_non_null = data_point_val->valuedouble;
  322. snprintf(val, sizeof(val), "%.3f", last_non_null);
  323. }
  324. // https://stackoverflow.com/a/1002631/2624391
  325. struct tm tm = {0};
  326. time_t epoch = 0;
  327. if (strptime(data_point_time->valuestring, ISO8601_FORMAT, &tm) != NULL) {
  328. tm.tm_isdst = -1; // Is DST on? 1 = yes, 0 = no, -1 = unknown
  329. epoch = mktime(&tm);
  330. } else {
  331. error("Unable to parse date", conn);
  332. }
  333. fprintf(buffer, "%d,%s,%d,"%s"n", meter_id, val, (int) epoch, resolution);
  334. if (verbose) {
  335. printf("%d,%s,%d,"%s"n", meter_id, val, (int) epoch, resolution);
  336. }
  337. }
  338. fclose(buffer);
  339. free(response.memory);
  340. cJSON_Delete(root);
  341. #if UPDATE_CURRENT == 1
  342. if (last_non_null != -9999.0 && strcmp(resolution, "live") == 0) {
  343. query[0] = '';
  344. snprintf(query, sizeof(query), "UPDATE meters SET current = %.3f WHERE id = %d", last_non_null, meter_id);
  345. if (READONLY_MODE == 0 && mysql_query(conn, query)) {
  346. error(mysql_error(conn), conn);
  347. }
  348. }
  349. #endif
  350. }
  351.  
  352. int main(int argc, char *argv[]) {
  353. int argv0size = strlen(argv[0]);
  354. // data fetched spans from start_time to end_time
  355. time_t end_time;
  356. time_t start_time;
  357. int opt;
  358. int data_lifespan;
  359. int move_back_amount;
  360. int secs_in_res;
  361. char *target_meter;
  362. char *update_timestamp_col;
  363. char tmp[SMALL_CONTAINER];
  364. // If the -o flag is set, the program will update a single meter instead of looping
  365. int o_flag = 0;
  366. // if the -d flag is set, the program will become a true daemon, disconnecting itself from the shell it was started in
  367. int d_flag = 0;
  368. // -v flag prints debugging information
  369. int v_flag = 0;
  370. // if the -r flag is set with one of "live", "quarterhour", "hour", or "month" the program will fetch the specified resolution
  371. // when fetching "live" data, the program will always fetch the newest data i.e. data spanning from the last recorded date in our db to now
  372. // when fetching other resolutions, the program will be checking that all of that data is there, because normally it is calculated based of min data by crons. if all the data is not there, it will fetch it
  373. char *r_flag = NULL;
  374. while ((opt = getopt (argc, argv, "r:odv")) != -1) {
  375. switch (opt) {
  376. case 'r': // "resolution"
  377. r_flag = optarg;
  378. break;
  379. case 'o': // run "once"
  380. o_flag = 1;
  381. break;
  382. case 'd': // "daemon"
  383. d_flag = 1;
  384. break;
  385. case 'v': // "verbose"
  386. v_flag = 1;
  387. break;
  388. }
  389. }
  390. // connect to db
  391. MYSQL *conn;
  392. conn = mysql_init(NULL);
  393. // Connect to database
  394. if (!mysql_real_connect(conn, DB_SERVER,
  395. DB_USER, DB_PASS, DB_NAME, 0, NULL, 0)) {
  396. error(mysql_error(conn), conn);
  397. }
  398. // interpret command line input
  399. if (d_flag) {
  400. if (v_flag) {
  401. printf("Can't use -d and -v at same time; ignoring -v flagn");
  402. v_flag = 0;
  403. }
  404. if (daemon(1, 0) == -1) { // http://man7.org/linux/man-pages/man3/daemon.3.html
  405. error("Failed to daemonize", conn);
  406. }
  407. }
  408. if (r_flag == NULL) {
  409. r_flag = "live";
  410. }
  411. int live_res = 0;
  412. if (strcmp(r_flag, "live") == 0) {
  413. target_meter = LIVE_TARGET_METER;
  414. update_timestamp_col = UPDATE_LIVE_TIMESTAMP;
  415. data_lifespan = LIVE_DATA_LIFESPAN;
  416. move_back_amount = 180; // meant to move meters back in the queue of what's being updated by update_meter() so they don't hold up everything if update_meter() keeps failing for some reason. note that if update_meter() does finish, it pushes the meter to the end of the queue by updating the last_updated_col to the current time otherwise the last_updated_col remains the current time minus this amount.
  417. secs_in_res = 60;
  418. live_res = 1;
  419. } else if (strcmp(r_flag, "quarterhour") == 0) {
  420. target_meter = QH_TARGET_METER;
  421. update_timestamp_col = UPDATE_QH_TIMESTAMP;
  422. data_lifespan = QH_DATA_LIFESPAN;
  423. move_back_amount = 480;
  424. secs_in_res = 900;
  425. } else if (strcmp(r_flag, "hour") == 0) {
  426. target_meter = HOUR_TARGET_METER;
  427. update_timestamp_col = UPDATE_HOUR_TIMESTAMP;
  428. data_lifespan = HOUR_DATA_LIFESPAN;
  429. move_back_amount = 900;
  430. secs_in_res = 3600;
  431. } else if (strcmp(r_flag, "month") == 0) {
  432. target_meter = MONTH_TARGET_METER;
  433. update_timestamp_col = UPDATE_MONTH_TIMESTAMP;
  434. data_lifespan = MONTH_DATA_LIFESPAN;
  435. move_back_amount = 86400;
  436. secs_in_res = 2592000;
  437. } else {
  438. printf("Please provide a proper resolution via the -r flagn");
  439. return 1;
  440. }
  441. buildingosd_pid = getpid(); // save this in a global so the children know
  442. // Insert record of daemon
  443. char query[SMALL_CONTAINER];
  444. snprintf(query, sizeof(query), "INSERT INTO daemons (pid, enabled, target_res) VALUES (%d, %d, '%s')", buildingosd_pid, 1, r_flag);
  445. if (READONLY_MODE == 0 && mysql_query(conn, query)) { // short circuit
  446. error(mysql_error(conn), conn);
  447. }
  448. openlog("buildingosd", LOG_PID, LOG_DAEMON);
  449. signal(SIGPIPE, catch_signal);
  450. snprintf(query, sizeof(query), "SELECT enabled FROM daemons WHERE pid = %d", buildingosd_pid); // dont modify query variable again
  451. while (1) {
  452. MYSQL_RES *res;
  453. MYSQL_ROW row;
  454. MYSQL_ROW meter;
  455. time_t now = time(NULL);
  456. if (READONLY_MODE == 0) {
  457. // if the daemon is 'enabled' in the db
  458. if (mysql_query(conn, query)) { // this line triggers a SIGPIPE?
  459. error(mysql_error(conn), conn);
  460. }
  461. res = mysql_use_result(conn);
  462. row = mysql_fetch_row(res);
  463. if (row == NULL) { // record of daemon does not exist
  464. error("I should not exist", conn);
  465. } else if (row[0][0] != '1') {
  466. // if enabled column turned off, exit
  467. if (d_flag) {
  468. error("Enabled column switched off", conn);
  469. } else {
  470. puts("Enabled column switched off");
  471. cleanup(conn);
  472. }
  473. }
  474. mysql_free_result(res);
  475. }
  476. // if (live_res) { // make sure the priority meters (i.e. the orbs) are always up to date
  477. // if (mysql_query(conn, PRIORITY_METER)) {
  478. // error(mysql_error(conn), conn);
  479. // }
  480. // res = mysql_store_result(conn);
  481. // meter = mysql_fetch_row(res);
  482. // mysql_free_result(res);
  483. // }
  484. // if (live_res == 0 || meter == NULL) { // if the orbs are up to date or we're collecting non-minute resolution data
  485. meter = fetch_row(conn, target_meter);
  486. // }
  487. char meter_url[SMALL_CONTAINER];
  488. meter_url[0] = '';
  489. int meter_id = atoi(meter[0]);
  490. char *org_id = meter[1];
  491. strcat(meter_url, meter[2]);
  492. strcat(meter_url, "/data");
  493. int last_updated = atoi(meter[3]);
  494. snprintf(tmp, sizeof(tmp), "UPDATE daemons SET updating_meter = %d WHERE pid = %d", meter_id, buildingosd_pid);
  495. if (READONLY_MODE == 0) {
  496. if (mysql_query(conn, tmp)) {
  497. error(mysql_error(conn), conn);
  498. }
  499. }
  500. if (last_updated > (now - secs_in_res)) { // if the least up to date meter was last updated secs_in_res seconds ago
  501. int sleep_time = (int) (secs_in_res / 10);
  502. now += sleep_time;
  503. if (live_res) {
  504. sleep(sleep_time);
  505. } else {
  506. return EXIT_SUCCESS;
  507. }
  508. }
  509. // Set start/end time
  510. if (live_res) {
  511. // if live res, fetch data spanning from the latest point recorded in the db to now
  512. end_time = now;
  513. snprintf(tmp, sizeof(tmp), "SELECT recorded FROM meter_data WHERE meter_id = %d AND resolution = '%s' ORDER BY recorded DESC LIMIT 1", meter_id, r_flag);
  514. if (mysql_query(conn, tmp)) {
  515. error(mysql_error(conn), conn);
  516. }
  517. res = mysql_store_result(conn);
  518. row = mysql_fetch_row(res);
  519. if (row == NULL) { // no data exists for this meter
  520. start_time = end_time - (time_t) data_lifespan;
  521. } else {
  522. start_time = (time_t) atoi(row[0]);
  523. }
  524. mysql_free_result(res);
  525. } else {
  526. // if other res, only make sure data goes back as far as it's supposed to
  527. // i.e. fetch data spanning from data_lifespan to the earliest point recorded in the db
  528. start_time = now - (time_t) data_lifespan;
  529. snprintf(tmp, sizeof(tmp), "SELECT recorded FROM meter_data WHERE meter_id = %d AND resolution = '%s' ORDER BY recorded ASC LIMIT 1", meter_id, r_flag);
  530. if (mysql_query(conn, tmp)) {
  531. error(mysql_error(conn), conn);
  532. }
  533. res = mysql_store_result(conn);
  534. row = mysql_fetch_row(res);
  535. if (row == NULL) { // no data exists for this meter
  536. end_time = now;
  537. mysql_free_result(res);
  538. } else {
  539. end_time = (time_t) atoi(row[0]);
  540. mysql_free_result(res);
  541. if (end_time < ((now - data_lifespan) + secs_in_res)) { // if the end time goes as far back as we store data for, mark meter as updated and continue
  542. snprintf(tmp, sizeof(tmp), update_timestamp_col, (int) now, meter_id);
  543. if (READONLY_MODE == 0 && mysql_query(conn, tmp)) {
  544. error(mysql_error(conn), conn);
  545. }
  546. continue;
  547. }
  548. }
  549. }
  550. pid_t childpid = fork();
  551. if (childpid == -1) {
  552. error("Failed to fork", conn);
  553. }
  554. else if (childpid > 0) {
  555. int status;
  556. waitpid(childpid, &status, 0);
  557. } else { // we are the child
  558. strncpy(argv[0], "bosd_child", argv0size);
  559. prctl(PR_SET_NAME, "bosd_child", NULL, NULL, NULL);
  560. signal(SIGPIPE, catch_signal);
  561. snprintf(tmp, sizeof(tmp), update_timestamp_col, (int) now - move_back_amount, meter_id);
  562. if (READONLY_MODE == 0 && mysql_query(conn, tmp)) {
  563. error(mysql_error(conn), conn);
  564. }
  565. update_meter(conn, meter_id, meter_url, set_api_token(conn, org_id), r_flag, start_time, end_time, v_flag);
  566. snprintf(tmp, sizeof(tmp), update_timestamp_col, (int) now, meter_id);
  567. if (READONLY_MODE == 0 && mysql_query(conn, tmp)) {
  568. error(mysql_error(conn), conn);
  569. }
  570. if (d_flag == 0) {
  571. printf("Updated meter %d (fetched data from %d to %d)n", meter_id, (int) start_time, (int) end_time);
  572. }
  573. exit(1);
  574. }
  575. if (o_flag == 1) {
  576. break;
  577. }
  578. }
  579. cleanup(conn);
  580. mysql_close(conn);
  581. return EXIT_SUCCESS;
  582. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement