Advertisement
Guest User

Untitled

a guest
Sep 29th, 2016
134
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 10.05 KB | None | 0 0
  1. #include "sampleConnector.h"
  2.  
  3. using namespace std;
  4.  
  5. dfESPconnectorInfo connectorInfo = {
  6.     type_BOTH,
  7.     sampleConnector::initialize,
  8.     sampleConnector::subRequiredConfig,
  9.     sampleConnector::sizeofSubReqConfig,
  10.     sampleConnector::pubRequiredConfig,
  11.     sampleConnector::sizeofPubReqConfig,
  12.     sampleConnector::subOptionalConfig,
  13.     sampleConnector::sizeofSubOptConfig,
  14.     sampleConnector::pubOptionalConfig,
  15.     sampleConnector::sizeofPubOptConfig,
  16. };
  17.  
  18. dfESPconnectorInfo *getConnectorInfo() {
  19.     return &connectorInfo;
  20. }
  21.  
  22. dfESPconnectorParmInfo_t sampleConnector::subRequiredConfig[] = {
  23.     {"type", "sub", dfESPconnector::sizeofPubSubValues, dfESPconnector::pubsubValues},
  24.     {"project", "", 0, NULL},
  25.     {"continuousquery", "", 0, NULL},
  26.     {"window", "", 0, NULL},
  27.     {"filename", "", 0, NULL}
  28. };
  29. size_t sampleConnector::sizeofSubReqConfig =
  30.     sizeof(sampleConnector::subRequiredConfig)/sizeof(dfESPconnectorParmInfo_t);
  31.  
  32. dfESPconnectorParmInfo_t sampleConnector::pubRequiredConfig[] = {
  33.     {"type", "pub", dfESPconnector::sizeofPubSubValues, dfESPconnector::pubsubValues},
  34.     {"project", "", 0, NULL},
  35.     {"continuousquery", "", 0, NULL},
  36.     {"window", "", 0, NULL},
  37.     {"filename", "", 0, NULL}
  38. };
  39. size_t sampleConnector::sizeofPubReqConfig =
  40.     sizeof(sampleConnector::pubRequiredConfig)/sizeof(dfESPconnectorParmInfo_t);
  41.  
  42. dfESPconnectorParmInfo_t sampleConnector::subOptionalConfig[] = {
  43.     {"host", "", 0, NULL},
  44.     {"port", "", 0, NULL},
  45.     {"snapshot", "true", dfESPconnector::sizeofTrueFalseValues, dfESPconnector::trueFalseValues}
  46. };
  47. size_t sampleConnector::sizeofSubOptConfig =
  48.     sizeof(sampleConnector::subOptionalConfig)/sizeof(dfESPconnectorParmInfo_t);
  49.  
  50. dfESPconnectorParmInfo_t sampleConnector::pubOptionalConfig[] = {
  51.     {"host", "", 0, NULL},
  52.     {"port", "", 0, NULL},
  53.     {"transactional", "false", dfESPconnector::sizeofTrueFalseValues, dfESPconnector::trueFalseValues},
  54.     {"blocksize", "1", 0, NULL}
  55. };
  56. size_t sampleConnector::sizeofPubOptConfig =
  57.     sizeof(sampleConnector::pubOptionalConfig)/sizeof(dfESPconnectorParmInfo_t);
  58.  
  59. bool sampleConnector::callbackFunction(dfESPeventblockPtr eventBlock, dfESPschema *schema) {
  60.     int32_t eventCnt = eventBlock->getSize();
  61.     int32_t eventIndx;
  62.     char *csvBuf;
  63.     bool error = false;
  64.  
  65.     for (eventIndx=0; eventIndx < eventCnt; eventIndx++) {
  66.         dfESPeventPtr event = eventBlock->getData(eventIndx);
  67.         csvBuf = dfESPconnector::getCSV(event, schema);
  68.         if (!csvBuf) {
  69.             error = true;
  70.             break;
  71.         }
  72.         _writeLock->lock();
  73.         if (!_writeStream) {
  74.             // stream not open
  75.             _writeLock->unlock();
  76.             gLOG_ERROR(dfESPlogServices::Connectors0012,
  77.                        "sampleConnector::csvCallback()", "stream");
  78.             error = true;
  79.             break;
  80.         }
  81.         *_writeStream << (char *)csvBuf << endl;
  82.         _writeLock->unlock();
  83.         free(csvBuf);
  84.         csvBuf = NULL;
  85.     }
  86.     if (error) {
  87.         if (csvBuf) {
  88.             free(csvBuf);
  89.         }
  90.         return false;
  91.     }
  92.     return true;
  93. }
  94.  
  95. void sampleConnector::errorCallbackFunction(dfESPpubsubFailures_t failure, dfESPpubsubFailureCodes_t code) {
  96.     gLOG_ERROR(dfESPlogServices::Connectors0019,
  97.                "sampleConnector::errorCallbackFunction()",
  98.                C_dfESPdecodePubSubFailure((C_dfESPpubsubFailures)failure),
  99.                C_dfESPdecodePubSubFailureCode((C_dfESPpubsubFailureCodes)code));
  100.     if (_errorCallback) {
  101.         // call application callback
  102.         _errorCallback(failure, code, _ctx);
  103.     }
  104. }
  105.  
  106. bool sampleConnector::setupCallbackFunction(dfESPschema *schema,
  107.                                             bool winIsAutogen) {
  108.     // schema related setup, nothing to do
  109.     _schema = schema;
  110.     _winIsAutogen = winIsAutogen;
  111.     return true;
  112. }
  113.  
  114. sampleConnector::sampleConnector(dfESPengine *engine, dfESPpsLib_t psLib,
  115.                                  dfESPstring name) {
  116.     init(psLib, engine, name);
  117.     _readStream = NULL;
  118.     _writeStream = NULL;
  119.     _writeLock = dfESPthreadUtils::mutex::mutex_create();
  120. }
  121.  
  122. dfESPconnector *sampleConnector::initialize(dfESPengine *engine, dfESPpsLib_t psLib,
  123.                                             dfESPstring name) {
  124.     return new sampleConnector(engine, psLib, name);
  125. }
  126.  
  127. sampleConnector::~sampleConnector() {
  128.     stop();
  129. }
  130.  
  131. bool sampleConnector::start() {
  132.     if (_started || _client) {
  133.         gLOG_ERROR(dfESPlogServices::Connectors0001, "sampleConnector::start()");
  134.         return false;
  135.     }
  136.     if (_type == type_SUB) {
  137.         if (!checkConfig(subRequiredConfig, sizeofSubReqConfig, subOptionalConfig,
  138.                          sizeofSubOptConfig)) {
  139.             return false;
  140.         }
  141.         if (!startSub()) {
  142.             return false;
  143.         }
  144.     } else if (_type == type_PUB) {
  145.         if (!checkConfig(pubRequiredConfig, sizeofPubReqConfig, pubOptionalConfig,
  146.                          sizeofPubOptConfig)) {
  147.             return false;
  148.         }
  149.         if (!startPub()) {
  150.             return false;
  151.         }
  152.     } else {
  153.         gLOG_ERROR(dfESPlogServices::Connectors0005, "sampleConnector::start()",
  154.                    "type");
  155.         return false;
  156.     }
  157.     _started = true;
  158.     dfESPconnector::setState(state_RUNNING);
  159.     return true;
  160. }
  161.  
  162. void sampleConnector::closeWriteStreams() {
  163.     // NOTE: _writeLock must be obtained around this call
  164.     if (_writeStream) {
  165.         _writeStream->flush();
  166.         delete _writeStream;
  167.         _writeStream = NULL;
  168.     }
  169. }
  170.  
  171. void sampleConnector::closeReadStreams() {
  172.     if (_readStream) {
  173.         delete _readStream;
  174.         _readStream = NULL;
  175.     }
  176. }
  177.  
  178. bool sampleConnector::stop() {
  179.     if (_pubThread) {
  180.         _pubThreadStop.inc();
  181.         _pubThread->join();
  182.         delete _pubThread;
  183.         _pubThread = NULL;
  184.     }
  185.     dfESPconnector::stop();
  186.     _writeLock->lock();
  187.     closeWriteStreams();
  188.     closeReadStreams();
  189.     _writeLock->unlock();
  190.  
  191.     return true;
  192. }
  193.  
  194. bool sampleConnector::openWriteStreams() {
  195.     // NOTE: _writeLock must be obtained around this call
  196.     dfESPstring filename = getParameter("filename");
  197.     if (!_writeStream) {
  198.         _writeStream = new ofstream(filename.c_str());
  199.         if (_writeStream == NULL) {
  200.             gLOG_ERROR(dfESPlogServices::Connectors0002,
  201.                        "sampleConnector::openWriteFilesAndStreams()", "stream",
  202.                        filename.c_str());
  203.             stop();
  204.             return false;
  205.         }
  206.     }
  207.     return true;
  208. }
  209.  
  210. bool sampleConnector::startSub() {
  211.     _writeLock->lock();
  212.     if (!openWriteStreams()) {
  213.         _writeLock->unlock();
  214.         return false;
  215.     }
  216.     _writeLock->unlock();
  217.     // connect to ESP server
  218.     if (!dfESPconnector::start()) {
  219.         return false;
  220.     }
  221.     return true;
  222. }
  223.  
  224. void pubThread(void *ctx) {
  225.     sampleConnector *fileConnector = (sampleConnector *)ctx;
  226.     fileConnector->publisherThread();
  227. }
  228.  
  229. void sampleConnector::publisherThread() {
  230.     int32_t blocksize = atoi(getParameter("blocksize").c_str());
  231.     bool transactional = (getParameter("transactional") == "true");
  232.     dfESPeventPtr event;
  233.     dfESPeventblockPtr eventBlock;
  234.     dfESPptrVect<dfESPeventPtr> trans;
  235.     int count = 0;
  236.     bool eof = false;
  237.     string valueStr;
  238.  
  239.     while (0 == _pubThreadStop.get()) {
  240.         if ((count >= blocksize) || (count && eof)) {
  241.             eventBlock = dfESPeventblock::newEventBlock(&trans,
  242.                                                         (transactional ? dfESPeventblock::ebt_TRANS :
  243.                                                          dfESPeventblock::ebt_NORMAL));
  244.             trans.free();
  245.             C_dfESPeventblock eb =
  246.                 reinterpret_cast<C_dfESPeventblock>(eventBlock);
  247.             int rc = C_dfESPpublisherInject(_client, eb);
  248.             if (!rc) {
  249.                 gLOG_ERROR(dfESPlogServices::Connectors0015,
  250.                            "sampleConnector::publisherThread()");
  251.                 break;
  252.             }
  253.             count=0;
  254.         }
  255.         if (eof) {
  256.             break;
  257.         }
  258.         event = NULL;
  259.         string line;
  260.         std::getline(*_readStream, line);
  261.         eof = _readStream->fail();
  262.         if (!eof) {
  263.             bool failure;
  264.             event = new dfESPevent(_schema, (char *)line.c_str(), failure);
  265.             if (failure) {
  266.                 gLOG_ERROR(dfESPlogServices::Connectors0003,
  267.                            "sampleConnector::publisherThread()", "dfESPevent");
  268.                 break;
  269.             }
  270.         }
  271.         if (event) {
  272.             trans.push_back(event);
  273.             count++;
  274.         }
  275.     }
  276.     if (count) {
  277.         trans.free();
  278.     }
  279.     dfESPconnector::setState(dfESPabsConnector::state_FINISHED);
  280.     _started = false;
  281. }
  282.  
  283. bool sampleConnector::startPub() {
  284.     dfESPstring filename = getParameter("filename");
  285.     if (!_readStream) {
  286.         _readStream = new ifstream(filename.c_str());
  287.         if (_readStream == NULL) {
  288.             gLOG_ERROR(dfESPlogServices::Connectors0002,
  289.                        "sampleConnector::startPub()", "stream", filename.c_str());
  290.             stop();
  291.             return false;
  292.         }
  293.     }
  294.     // connect to ESP server
  295.     if (!dfESPconnector::start()) {
  296.         return false;
  297.     }
  298.     if (!_schema) {
  299.         gLOG_ERROR(dfESPlogServices::Connectors0005,
  300.                    "sampleConnector::startPub()", "schema");
  301.         stop();
  302.         return false;
  303.     }
  304.     _pubThread = dfESPthreadUtils::thread::thread_create(pubThread);
  305.     if (!_pubThread) {
  306.         gLOG_ERROR(dfESPlogServices::Connectors0006,
  307.                    "sampleConnector::startPub()", "pubThread");
  308.         stop();
  309.         return false;
  310.     }
  311.     _pubThreadStop.set(0);
  312.     _pubThread->setArgs((void *)this);
  313.     int rc = _pubThread->start();
  314.     if (rc < 0) {
  315.         gLOG_ERROR(dfESPlogServices::Connectors0007,
  316.                    "sampleConnector::startPub()", "pubThread");
  317.         stop();
  318.         return false;
  319.     }
  320.     return true;
  321. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement