Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include "sampleConnector.h"
- using namespace std;
- dfESPconnectorInfo connectorInfo = {
- type_BOTH,
- sampleConnector::initialize,
- sampleConnector::subRequiredConfig,
- sampleConnector::sizeofSubReqConfig,
- sampleConnector::pubRequiredConfig,
- sampleConnector::sizeofPubReqConfig,
- sampleConnector::subOptionalConfig,
- sampleConnector::sizeofSubOptConfig,
- sampleConnector::pubOptionalConfig,
- sampleConnector::sizeofPubOptConfig,
- };
- dfESPconnectorInfo *getConnectorInfo() {
- return &connectorInfo;
- }
- dfESPconnectorParmInfo_t sampleConnector::subRequiredConfig[] = {
- {"type", "sub", dfESPconnector::sizeofPubSubValues, dfESPconnector::pubsubValues},
- {"project", "", 0, NULL},
- {"continuousquery", "", 0, NULL},
- {"window", "", 0, NULL},
- {"filename", "", 0, NULL}
- };
- size_t sampleConnector::sizeofSubReqConfig =
- sizeof(sampleConnector::subRequiredConfig)/sizeof(dfESPconnectorParmInfo_t);
- dfESPconnectorParmInfo_t sampleConnector::pubRequiredConfig[] = {
- {"type", "pub", dfESPconnector::sizeofPubSubValues, dfESPconnector::pubsubValues},
- {"project", "", 0, NULL},
- {"continuousquery", "", 0, NULL},
- {"window", "", 0, NULL},
- {"filename", "", 0, NULL}
- };
- size_t sampleConnector::sizeofPubReqConfig =
- sizeof(sampleConnector::pubRequiredConfig)/sizeof(dfESPconnectorParmInfo_t);
- dfESPconnectorParmInfo_t sampleConnector::subOptionalConfig[] = {
- {"host", "", 0, NULL},
- {"port", "", 0, NULL},
- {"snapshot", "true", dfESPconnector::sizeofTrueFalseValues, dfESPconnector::trueFalseValues}
- };
- size_t sampleConnector::sizeofSubOptConfig =
- sizeof(sampleConnector::subOptionalConfig)/sizeof(dfESPconnectorParmInfo_t);
- dfESPconnectorParmInfo_t sampleConnector::pubOptionalConfig[] = {
- {"host", "", 0, NULL},
- {"port", "", 0, NULL},
- {"transactional", "false", dfESPconnector::sizeofTrueFalseValues, dfESPconnector::trueFalseValues},
- {"blocksize", "1", 0, NULL}
- };
- size_t sampleConnector::sizeofPubOptConfig =
- sizeof(sampleConnector::pubOptionalConfig)/sizeof(dfESPconnectorParmInfo_t);
- bool sampleConnector::callbackFunction(dfESPeventblockPtr eventBlock, dfESPschema *schema) {
- int32_t eventCnt = eventBlock->getSize();
- int32_t eventIndx;
- char *csvBuf;
- bool error = false;
- for (eventIndx=0; eventIndx < eventCnt; eventIndx++) {
- dfESPeventPtr event = eventBlock->getData(eventIndx);
- csvBuf = dfESPconnector::getCSV(event, schema);
- if (!csvBuf) {
- error = true;
- break;
- }
- _writeLock->lock();
- if (!_writeStream) {
- // stream not open
- _writeLock->unlock();
- gLOG_ERROR(dfESPlogServices::Connectors0012,
- "sampleConnector::csvCallback()", "stream");
- error = true;
- break;
- }
- *_writeStream << (char *)csvBuf << endl;
- _writeLock->unlock();
- free(csvBuf);
- csvBuf = NULL;
- }
- if (error) {
- if (csvBuf) {
- free(csvBuf);
- }
- return false;
- }
- return true;
- }
- void sampleConnector::errorCallbackFunction(dfESPpubsubFailures_t failure, dfESPpubsubFailureCodes_t code) {
- gLOG_ERROR(dfESPlogServices::Connectors0019,
- "sampleConnector::errorCallbackFunction()",
- C_dfESPdecodePubSubFailure((C_dfESPpubsubFailures)failure),
- C_dfESPdecodePubSubFailureCode((C_dfESPpubsubFailureCodes)code));
- if (_errorCallback) {
- // call application callback
- _errorCallback(failure, code, _ctx);
- }
- }
- bool sampleConnector::setupCallbackFunction(dfESPschema *schema,
- bool winIsAutogen) {
- // schema related setup, nothing to do
- _schema = schema;
- _winIsAutogen = winIsAutogen;
- return true;
- }
- sampleConnector::sampleConnector(dfESPengine *engine, dfESPpsLib_t psLib,
- dfESPstring name) {
- init(psLib, engine, name);
- _readStream = NULL;
- _writeStream = NULL;
- _writeLock = dfESPthreadUtils::mutex::mutex_create();
- }
- dfESPconnector *sampleConnector::initialize(dfESPengine *engine, dfESPpsLib_t psLib,
- dfESPstring name) {
- return new sampleConnector(engine, psLib, name);
- }
- sampleConnector::~sampleConnector() {
- stop();
- }
- bool sampleConnector::start() {
- if (_started || _client) {
- gLOG_ERROR(dfESPlogServices::Connectors0001, "sampleConnector::start()");
- return false;
- }
- if (_type == type_SUB) {
- if (!checkConfig(subRequiredConfig, sizeofSubReqConfig, subOptionalConfig,
- sizeofSubOptConfig)) {
- return false;
- }
- if (!startSub()) {
- return false;
- }
- } else if (_type == type_PUB) {
- if (!checkConfig(pubRequiredConfig, sizeofPubReqConfig, pubOptionalConfig,
- sizeofPubOptConfig)) {
- return false;
- }
- if (!startPub()) {
- return false;
- }
- } else {
- gLOG_ERROR(dfESPlogServices::Connectors0005, "sampleConnector::start()",
- "type");
- return false;
- }
- _started = true;
- dfESPconnector::setState(state_RUNNING);
- return true;
- }
- void sampleConnector::closeWriteStreams() {
- // NOTE: _writeLock must be obtained around this call
- if (_writeStream) {
- _writeStream->flush();
- delete _writeStream;
- _writeStream = NULL;
- }
- }
- void sampleConnector::closeReadStreams() {
- if (_readStream) {
- delete _readStream;
- _readStream = NULL;
- }
- }
- bool sampleConnector::stop() {
- if (_pubThread) {
- _pubThreadStop.inc();
- _pubThread->join();
- delete _pubThread;
- _pubThread = NULL;
- }
- dfESPconnector::stop();
- _writeLock->lock();
- closeWriteStreams();
- closeReadStreams();
- _writeLock->unlock();
- return true;
- }
- bool sampleConnector::openWriteStreams() {
- // NOTE: _writeLock must be obtained around this call
- dfESPstring filename = getParameter("filename");
- if (!_writeStream) {
- _writeStream = new ofstream(filename.c_str());
- if (_writeStream == NULL) {
- gLOG_ERROR(dfESPlogServices::Connectors0002,
- "sampleConnector::openWriteFilesAndStreams()", "stream",
- filename.c_str());
- stop();
- return false;
- }
- }
- return true;
- }
- bool sampleConnector::startSub() {
- _writeLock->lock();
- if (!openWriteStreams()) {
- _writeLock->unlock();
- return false;
- }
- _writeLock->unlock();
- // connect to ESP server
- if (!dfESPconnector::start()) {
- return false;
- }
- return true;
- }
- void pubThread(void *ctx) {
- sampleConnector *fileConnector = (sampleConnector *)ctx;
- fileConnector->publisherThread();
- }
- void sampleConnector::publisherThread() {
- int32_t blocksize = atoi(getParameter("blocksize").c_str());
- bool transactional = (getParameter("transactional") == "true");
- dfESPeventPtr event;
- dfESPeventblockPtr eventBlock;
- dfESPptrVect<dfESPeventPtr> trans;
- int count = 0;
- bool eof = false;
- string valueStr;
- while (0 == _pubThreadStop.get()) {
- if ((count >= blocksize) || (count && eof)) {
- eventBlock = dfESPeventblock::newEventBlock(&trans,
- (transactional ? dfESPeventblock::ebt_TRANS :
- dfESPeventblock::ebt_NORMAL));
- trans.free();
- C_dfESPeventblock eb =
- reinterpret_cast<C_dfESPeventblock>(eventBlock);
- int rc = C_dfESPpublisherInject(_client, eb);
- if (!rc) {
- gLOG_ERROR(dfESPlogServices::Connectors0015,
- "sampleConnector::publisherThread()");
- break;
- }
- count=0;
- }
- if (eof) {
- break;
- }
- event = NULL;
- string line;
- std::getline(*_readStream, line);
- eof = _readStream->fail();
- if (!eof) {
- bool failure;
- event = new dfESPevent(_schema, (char *)line.c_str(), failure);
- if (failure) {
- gLOG_ERROR(dfESPlogServices::Connectors0003,
- "sampleConnector::publisherThread()", "dfESPevent");
- break;
- }
- }
- if (event) {
- trans.push_back(event);
- count++;
- }
- }
- if (count) {
- trans.free();
- }
- dfESPconnector::setState(dfESPabsConnector::state_FINISHED);
- _started = false;
- }
- bool sampleConnector::startPub() {
- dfESPstring filename = getParameter("filename");
- if (!_readStream) {
- _readStream = new ifstream(filename.c_str());
- if (_readStream == NULL) {
- gLOG_ERROR(dfESPlogServices::Connectors0002,
- "sampleConnector::startPub()", "stream", filename.c_str());
- stop();
- return false;
- }
- }
- // connect to ESP server
- if (!dfESPconnector::start()) {
- return false;
- }
- if (!_schema) {
- gLOG_ERROR(dfESPlogServices::Connectors0005,
- "sampleConnector::startPub()", "schema");
- stop();
- return false;
- }
- _pubThread = dfESPthreadUtils::thread::thread_create(pubThread);
- if (!_pubThread) {
- gLOG_ERROR(dfESPlogServices::Connectors0006,
- "sampleConnector::startPub()", "pubThread");
- stop();
- return false;
- }
- _pubThreadStop.set(0);
- _pubThread->setArgs((void *)this);
- int rc = _pubThread->start();
- if (rc < 0) {
- gLOG_ERROR(dfESPlogServices::Connectors0007,
- "sampleConnector::startPub()", "pubThread");
- stop();
- return false;
- }
- return true;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement