Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /**
- * Fetches data from the BuildingOS API and caches it in the db
- *
- * @author Tim Robert-Fitzgerald
- */
- #define _XOPEN_SOURCE // for strptime
- #define _GNU_SOURCE // for strptime
- #define PRIORITY_METER "SELECT id, org_id, url, live_last_updated FROM meters WHERE source = 'buildingos' AND live_last_updated < (UNIX_TIMESTAMP() - 300) AND (for_orb > 0 OR bos_uuid IN (SELECT DISTINCT meter_uuid FROM relative_values WHERE permission = 'orb_server' AND meter_uuid != '')) AND id NOT IN (SELECT updating_meter FROM daemons WHERE target_res = 'live') ORDER BY live_last_updated ASC LIMIT 1"
- #define LIVE_TARGET_METER "SELECT id, org_id, url, live_last_updated FROM meters WHERE source = 'buildingos' AND ((gauges_using > 0 OR for_orb > 0 OR timeseries_using > 0) OR bos_uuid IN (SELECT DISTINCT meter_uuid FROM relative_values WHERE permission = 'orb_server' AND meter_uuid != '')) AND id NOT IN (SELECT updating_meter FROM daemons WHERE target_res = 'live') ORDER BY live_last_updated ASC LIMIT 1"
- #define QH_TARGET_METER "SELECT id, org_id, url, quarterhour_last_updated FROM meters WHERE source = 'buildingos' AND ((gauges_using > 0 OR for_orb > 0 OR timeseries_using > 0) OR bos_uuid IN (SELECT DISTINCT meter_uuid FROM relative_values WHERE permission = 'orb_server' AND meter_uuid != '')) AND id NOT IN (SELECT updating_meter FROM daemons WHERE target_res = 'live') ORDER BY quarterhour_last_updated ASC LIMIT 1"
- #define HOUR_TARGET_METER "SELECT id, org_id, url, hour_last_updated FROM meters WHERE source = 'buildingos' AND ((gauges_using > 0 OR for_orb > 0 OR timeseries_using > 0) OR bos_uuid IN (SELECT DISTINCT meter_uuid FROM relative_values WHERE permission = 'orb_server' AND meter_uuid != '')) AND id NOT IN (SELECT updating_meter FROM daemons WHERE target_res = 'live') ORDER BY hour_last_updated ASC LIMIT 1"
- #define MONTH_TARGET_METER "SELECT id, org_id, url, month_last_updated FROM meters WHERE source = 'buildingos' AND ((gauges_using > 0 OR for_orb > 0 OR timeseries_using > 0) OR bos_uuid IN (SELECT DISTINCT meter_uuid FROM relative_values WHERE permission = 'orb_server' AND meter_uuid != '')) AND id NOT IN (SELECT updating_meter FROM daemons WHERE target_res = 'live') ORDER BY month_last_updated ASC LIMIT 1"
- #define UPDATE_LIVE_TIMESTAMP "UPDATE meters SET live_last_updated = %d WHERE id = %d"
- #define UPDATE_QH_TIMESTAMP "UPDATE meters SET quarterhour_last_updated = %d WHERE id = %d"
- #define UPDATE_HOUR_TIMESTAMP "UPDATE meters SET hour_last_updated = %d WHERE id = %d"
- #define UPDATE_MONTH_TIMESTAMP "UPDATE meters SET month_last_updated = %d WHERE id = %d"
- #define TOKEN_URL "https://api.buildingos.com/o/token/" // where to get the token from
- #define ISO8601_FORMAT "%Y-%m-%dT%H:%M:%S%z"
- #define ISO8601_FORMAT_EST "%Y-%m-%dT%H:%M:%S-04:00"
- #define BUFFER_FILE "/root/meter_data.csv"
- #define SMALL_CONTAINER 255 // small fixed-size container for arrays
- #define MED_CONTAINER 510 // just double SMALL_CONTAINER
- #define LIVE_DATA_LIFESPAN 7200 // live data is stored for 2 hours i.e. 7200s
- #define QH_DATA_LIFESPAN 1209600 // 2 weeks
- #define HOUR_DATA_LIFESPAN 5184000 // 2 months
- #define MONTH_DATA_LIFESPAN 63113904 // 2 years
- #define UPDATE_CURRENT 1 // update the meters.current column with the current reading?
- #define READONLY_MODE 0 // if on (i.e. 1) the daemon will not make queries that update/insert/delete data by short circuiting if stmts
- #include <stdio.h>
- #include <string.h>
- #include <unistd.h>
- #include <time.h>
- #include <mysql.h>
- #include <sys/types.h>
- #include <sys/wait.h>
- #include <sys/prctl.h>
- #include <stdlib.h>
- #include <signal.h>
- #include <syslog.h>
- #include <sys/stat.h>
- #include <curl/curl.h> // install with `apt-get install libcurl4-openssl-dev`
- #include <curl/easy.h>
- #include "./lib/cJSON/cJSON.h"
- #include "db.h"
- static pid_t buildingosd_pid;
- // Stores page downloaded by http_request()
- struct MemoryStruct {
- char *memory;
- size_t size;
- };
- /**
- * Utility function copied from https://stackoverflow.com/a/779960/2624391
- */
- char *str_replace(char *orig, char *rep, char *with) {
- char *result; // the return string
- char *ins; // the next insert point
- char *tmp; // varies
- int len_rep; // length of rep (the string to remove)
- int len_with; // length of with (the string to replace rep with)
- int len_front; // distance between rep and end of last rep
- int count; // number of replacements
- // sanity checks and initialization
- if (!orig || !rep) {
- return NULL;
- }
- len_rep = strlen(rep);
- if (len_rep == 0) {
- return NULL; // empty rep causes infinite loop during count
- }
- if (!with) {
- with = "";
- }
- len_with = strlen(with);
- // count the number of replacements needed
- ins = orig;
- for (count = 0; (tmp = strstr(ins, rep)); ++count) {
- ins = tmp + len_rep;
- }
- tmp = result = malloc(strlen(orig) + (len_with - len_rep) * count + 1);
- if (!result) {
- return NULL;
- }
- // first time through the loop, all the variable are set correctly
- // from here on,
- // tmp points to the end of the result string
- // ins points to the next occurrence of rep in orig
- // orig points to the remainder of orig after "end of rep"
- while (count--) {
- ins = strstr(orig, rep);
- len_front = ins - orig;
- tmp = strncpy(tmp, orig, len_front) + len_front;
- tmp = strcpy(tmp, with) + len_with;
- orig += len_front + len_rep; // move to next "end of rep"
- }
- strcpy(tmp, orig);
- return result;
- }
- /**
- * Signal handler
- * @param signo [description]
- */
- static void catch_signal(int signo) {
- int success = system("/var/www/html/oberlin/daemons/buildingosd -d"); // lol
- if (success == -1) {
- syslog(LOG_ERR, "Unable to relaunch self");
- }
- syslog(LOG_ERR, "Caught pipe #%d; exiting", signo);
- }
- /**
- * Helper for http_request()
- */
- static size_t WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp) {
- size_t realsize = size * nmemb;
- struct MemoryStruct *mem = (struct MemoryStruct *)userp;
- mem->memory = realloc(mem->memory, mem->size + realsize + 1);
- if (mem->memory == NULL) {
- fprintf(stderr, "not enough memory (realloc returned NULL)n");
- return 0;
- }
- memcpy(&(mem->memory[mem->size]), contents, realsize);
- mem->size += realsize;
- mem->memory[mem->size] = 0;
- return realsize;
- }
- /**
- * See https://curl.haxx.se/libcurl/c/postinmemory.html
- * @param url http://www.example.org/
- * @param post e.g. Field=1&Field=2&Field=3
- * @param custom_header 1 for a custom header, 0 for default
- * @param method 1 if POST, 0 if GET
- */
- struct MemoryStruct http_request(char *url, char *post, int custom_header, int method, char *api_token) {
- char header[SMALL_CONTAINER];
- if (custom_header) {
- snprintf(header, sizeof(header), "Authorization: Bearer %s", api_token);
- }
- CURL *curl;
- CURLcode res;
- struct MemoryStruct chunk;
- chunk.memory = malloc(1); /* will be grown as needed by realloc above */
- chunk.size = 0; /* no data at this point */
- curl_global_init(CURL_GLOBAL_ALL);
- curl = curl_easy_init();
- if (curl) {
- if (custom_header) {
- struct curl_slist *chunk = NULL; // https://curl.haxx.se/libcurl/c/httpcustomheader.html
- /* Add a custom header */
- chunk = curl_slist_append(chunk, header);
- res = curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk);
- }
- if (method == 1) {
- curl_easy_setopt(curl, CURLOPT_URL, url);
- curl_easy_setopt(curl, CURLOPT_POSTFIELDS, post); // if we don't provide POSTFIELDSIZE, libcurl will strlen() by itself
- curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, (long)strlen(post)); // Perform the request, res will get the return code
- } else {
- char full_url[SMALL_CONTAINER];
- strcpy(full_url, url);
- strcat(full_url, "?");
- strcat(full_url, post);
- curl_easy_setopt(curl, CURLOPT_URL, full_url);
- curl_easy_setopt(curl, CURLOPT_HTTPGET, 1L);
- }
- // curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0L); // https://curl.haxx.se/libcurl/c/CURLOPT_SSL_VERIFYPEER.html
- /* send all data to this function */
- curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
- /* we pass our 'chunk' struct to the callback function */
- curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)&chunk);
- /* some servers don't like requests that are made without a user-agent
- field, so we provide one */
- curl_easy_setopt(curl, CURLOPT_USERAGENT, "libcurl-agent/1.0");
- res = curl_easy_perform(curl);
- /* Check for errors */
- if (res != CURLE_OK) {
- syslog(LOG_ERR, "curl_easy_perform() failed: %s", curl_easy_strerror(res));
- exit(1);
- }
- curl_easy_cleanup(curl);
- curl_global_cleanup();
- }
- return chunk;//chunk.memory;
- // free(chunk.memory);
- }
- /**
- * Execute before program termination
- */
- void cleanup(MYSQL *conn) {
- char query[SMALL_CONTAINER];
- snprintf(query, sizeof(query), "DELETE FROM daemons WHERE pid = %d", buildingosd_pid);
- if (READONLY_MODE == 0 && mysql_query(conn, query)) {
- syslog(LOG_ERR, "%s", mysql_error(conn));
- }
- closelog();
- mysql_close(conn);
- exit(1); // this might just kill the child, but since the mysql conn is closed, daemon will die
- }
- /**
- * Handle errors
- */
- void error(const char *msg, MYSQL *conn) {
- syslog(LOG_ERR, "%s", msg);
- cleanup(conn);
- }
- /**
- * Fetches a single record, terminating the program if there are no results
- */
- MYSQL_ROW fetch_row(MYSQL *conn, char *query) {
- MYSQL_RES *res;
- MYSQL_ROW row;
- if (mysql_query(conn, query)) {
- error(mysql_error(conn), conn);
- }
- res = mysql_store_result(conn);
- row = mysql_fetch_row(res);
- // mysql_free_result(res);
- if (row == NULL) {
- syslog(LOG_ERR, "QUERY '%s' RETURNED 0 ROWSn", query);
- cleanup(conn);
- }
- return row;
- }
- /**
- * Sets the API token, fetching a new one if necessary
- * @param conn
- * @param org_id to get API credentials for
- */
- char *set_api_token(MYSQL *conn, char *org_id) {
- char query[SMALL_CONTAINER];
- MYSQL_ROW row;
- snprintf(query, sizeof(query), "SELECT api_id FROM orgs WHERE id = %s", org_id);
- int api_id = atoi(fetch_row(conn, query)[0]);
- snprintf(query, sizeof(query), "SELECT token, token_updated FROM api WHERE id = %d", api_id);
- row = fetch_row(conn, query);
- int update_token_at = atoi(row[1]) + 3595;
- time_t epoch = time(NULL);
- struct tm *tm = localtime(&epoch);
- int time = (int) mktime(tm);
- if (update_token_at > time) { // token still not expired
- return row[0]; // Invalid read of size 8
- } else { // amortized cost; need to get new API token
- snprintf(query, sizeof(query), "SELECT client_id, client_secret, username, password FROM api WHERE id = '%d'", api_id);
- row = fetch_row(conn, query);
- char post_data[MED_CONTAINER];
- snprintf(post_data, sizeof(post_data), "client_id=%s&client_secret=%s&username=%s&password=%s&grant_type=password", row[0], row[1], row[2], row[3]);
- struct MemoryStruct response = http_request(TOKEN_URL, post_data, 0, 1, "");
- cJSON *root = cJSON_Parse(response.memory);
- cJSON *access_token = cJSON_GetObjectItem(root, "access_token");
- char *api_token = access_token->valuestring;
- snprintf(query, sizeof(query), "UPDATE api SET token = '%s', token_updated = %d WHERE id = %d", api_token, time, api_id);
- if (mysql_query(conn, query)) { // do this even if READONLY_MODE is on bc it cant hurt to update the api token
- error(mysql_error(conn), conn);
- }
- free(response.memory);
- cJSON_Delete(root);
- return api_token;
- }
- }
- /**
- * Updates a meter
- * this function has way too many parameters, but it's better than globals
- * @param conn
- * @param meter_id
- * @param meter_url
- * @param resolution
- * @param start_time the earlier date
- * @param end_time the later date
- */
- void update_meter(MYSQL *conn, int meter_id, char *meter_url, char *api_token, char *resolution, time_t start_time, time_t end_time, int verbose) {
- struct tm *ts;
- char iso8601_end_time[30];
- char iso8601_start_time[30];
- char query[SMALL_CONTAINER];
- ts = localtime(&end_time);
- strftime(iso8601_end_time, sizeof(iso8601_end_time), ISO8601_FORMAT_EST, ts);
- ts = localtime(&start_time);
- strftime(iso8601_start_time, sizeof(iso8601_start_time), ISO8601_FORMAT_EST, ts);
- // Make call to the API for meter data
- char post_data[SMALL_CONTAINER];
- char *encoded_iso8601_start_time = str_replace(iso8601_start_time, ":", "%3A");
- char *encoded_iso8601_end_time = str_replace(iso8601_end_time, ":", "%3A");
- snprintf(post_data, sizeof(post_data), "resolution=%s&start=%s&end=%s", resolution, encoded_iso8601_start_time, encoded_iso8601_end_time);
- free(encoded_iso8601_start_time);
- free(encoded_iso8601_end_time);
- struct MemoryStruct response = http_request(meter_url, post_data, 1, 0, api_token);
- cJSON *root = cJSON_Parse(response.memory);
- if (!cJSON_HasObjectItem(root, "data")) {
- error(response.memory, conn);
- }
- cJSON *data = cJSON_GetObjectItem(root, "data");
- // save new data
- FILE *buffer = fopen(BUFFER_FILE, "a");
- if (buffer == NULL) {
- error("Error opening meter_data buffer", conn);
- }
- int data_size = cJSON_GetArraySize(data);
- double last_non_null = -9999.0; // error value
- for (int i = 0; i < data_size; i++) {
- cJSON *data_point = cJSON_GetArrayItem(data, i);
- cJSON *data_point_val = cJSON_GetObjectItem(data_point, "value");
- cJSON *data_point_time = cJSON_GetObjectItem(data_point, "localtime");
- char val[10];
- if (data_point_val->type == 4) {
- val[0] = '\'; val[1] = 'N'; val[2] = '