Advertisement
Guest User

Untitled

a guest
Mar 17th, 2017
70
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.58 KB | None | 0 0
  1. Class:
  2. ==========
  3. PackageBuffer: PackageStream
  4.  
  5. Interface:
  6. ==========
  7. PackageStream
  8.  
  9. #ifndef THORS_ANVIL_MYSQL_PACKAGE_BUFFER_H
  10. #define THORS_ANVIL_MYSQL_PACKAGE_BUFFER_H
  11.  
  12. #include "PackageStream.h"
  13. #include <vector>
  14. #include <string>
  15. #include <cstddef>
  16.  
  17. namespace ThorsAnvil
  18. {
  19. namespace MySQL
  20. {
  21.  
  22. template<typename T>
  23. class PackageBuffer: public PackageStream
  24. {
  25. T& stream;
  26. std::size_t readCurrentPacketSize;
  27. std::size_t readCurrentPacketPosition;
  28. unsigned char currentPacketSequenceID;
  29. bool hasMore;
  30. bool flushed;
  31. std::vector<char> sendBuffer;
  32.  
  33. private:
  34. void nextPacket();
  35. void writePackageHeader(std::size_t size);
  36. void writeStream(char const* buffer, std::size_t len);
  37.  
  38. public:
  39. PackageBuffer(T& stream, bool flushed = false);
  40. virtual void read(char* buffer, std::size_t len) override;
  41. virtual void write(char const* buffer, std::size_t len) override;
  42. virtual bool isEmpty() override;
  43. virtual void startNewConversation() override;
  44. virtual void flush() override;
  45. virtual void drop() override;
  46. virtual void reset() override;
  47. virtual std::string readRemainingData() override;
  48. };
  49.  
  50. }
  51. }
  52.  
  53. #ifndef COVERAGE_MySQL
  54. #include "PackageBuffer.tpp"
  55. #endif
  56.  
  57. #endif
  58.  
  59. #include "ThorSQL/SQLUtil.h"
  60. #include <iomanip>
  61.  
  62. namespace ThorsAnvil
  63. {
  64. namespace MySQL
  65. {
  66.  
  67. template<typename T>
  68. PackageBuffer<T>::PackageBuffer(T& stream, bool flushed)
  69. : stream(stream)
  70. , readCurrentPacketSize(0)
  71. , readCurrentPacketPosition(0)
  72. , currentPacketSequenceID(-1)
  73. , hasMore(true)
  74. , flushed(flushed)
  75. {
  76. sendBuffer.reserve(0xFFFFFF/*16MByte*/);
  77. }
  78.  
  79. template<typename T>
  80. inline void PackageBuffer<T>::read(char* buffer, std::size_t len)
  81. {
  82. std::size_t retrieved = 0;
  83. do
  84. {
  85. std::size_t remaining = readCurrentPacketSize - readCurrentPacketPosition;
  86. std::size_t getFromPacket = std::min((len - retrieved), remaining);
  87. stream.read(buffer + retrieved, getFromPacket);
  88. retrieved += getFromPacket;
  89. readCurrentPacketPosition += getFromPacket;
  90. if (retrieved != len && remaining == 0)
  91. { nextPacket();
  92. }
  93. }
  94. while (len != retrieved);
  95. }
  96.  
  97. template<typename T>
  98. bool PackageBuffer<T>::isEmpty()
  99. {
  100. long remaining = readCurrentPacketSize - readCurrentPacketPosition;
  101. if ((remaining == 0) && hasMore)
  102. {
  103. nextPacket();
  104. remaining = readCurrentPacketSize - readCurrentPacketPosition;
  105. }
  106. return remaining == 0;
  107. }
  108.  
  109. template<typename T>
  110. std::string PackageBuffer<T>::readRemainingData()
  111. {
  112. std::string dst;
  113. do
  114. {
  115. long retrieved = dst.size();
  116. long remaining = readCurrentPacketSize - readCurrentPacketPosition;
  117. dst.resize(retrieved + remaining);
  118. read(&dst[retrieved], remaining);
  119. if (hasMore)
  120. {
  121. nextPacket();
  122. continue;
  123. }
  124. break;
  125. }
  126. while (true);
  127.  
  128. return dst;
  129. }
  130.  
  131. template<typename T>
  132. void PackageBuffer<T>::nextPacket()
  133. {
  134. if (!hasMore)
  135. {
  136. throw std::domain_error(
  137. errorMsg("ThorsAnvil::MySQL::PackageBuffer::nextPacket: ",
  138. "No more data expected from server"
  139. ));
  140. }
  141. readCurrentPacketSize = 0;
  142. readCurrentPacketPosition = 0;
  143. currentPacketSequenceID++;
  144.  
  145. std::uint32_t packetBufferSize = 0;
  146. // TODO FIX only works on little endian
  147. stream.read(reinterpret_cast<char*>(&packetBufferSize), 3);
  148. readCurrentPacketSize = packetBufferSize;
  149.  
  150.  
  151. char actualSequenceID;
  152. stream.read(&actualSequenceID, 1);
  153.  
  154. if (currentPacketSequenceID != actualSequenceID)
  155. {
  156. throw std::domain_error(
  157. errorMsg("ThorsAnvil::MySQL::PackageBuffer::nextPacket: ",
  158. "currentPacketSequenceID(", currentPacketSequenceID, ")",
  159. " != actual sequence on input stream(", actualSequenceID, ")"
  160. ));
  161. }
  162.  
  163. hasMore = readCurrentPacketSize == 0xFFFFFF;
  164. }
  165.  
  166. template<typename T>
  167. void PackageBuffer<T>::startNewConversation()
  168. {
  169. currentPacketSequenceID = -1;
  170. }
  171.  
  172. template<typename T>
  173. void PackageBuffer<T>::write(char const* buffer, std::size_t len)
  174. {
  175. if (flushed)
  176. {
  177. throw std::domain_error(
  178. bugReport("ThorsAnvil::MySQL::PackageBuffer::write: ",
  179. "Writting to a flushed buffer"
  180. ));
  181. }
  182. std::size_t currentSize = sendBuffer.size();
  183. if (currentSize + len >= 0xFFFFFF)
  184. {
  185. std::size_t available = 0xFFFFFF - currentSize;
  186. writePackageHeader(0xFFFFFF);
  187. writeStream(&sendBuffer[0], currentSize);
  188. writeStream(buffer, available);
  189. sendBuffer.clear();
  190.  
  191. buffer += available;
  192. len -= available;
  193.  
  194. while (len > 0xFFFFFF)
  195. {
  196. writePackageHeader(0xFFFFFF);
  197. writeStream(buffer, 0xFFFFFF);
  198.  
  199. buffer += 0xFFFFFF;
  200. len -= 0xFFFFFF;
  201. }
  202. }
  203. sendBuffer.insert(sendBuffer.end(), buffer, buffer + len);
  204. }
  205.  
  206. template<typename T>
  207. void PackageBuffer<T>::flush()
  208. {
  209. if (flushed)
  210. {
  211. throw std::domain_error(
  212. bugReport("ThorsAnvil::MySQL::PackageBuffer<T>::flush: ",
  213. "Already flushed"
  214. ));
  215. }
  216. flushed = true;
  217. std::size_t currentSize = sendBuffer.size();
  218.  
  219. writePackageHeader(currentSize);
  220. writeStream(&sendBuffer[0], currentSize);
  221. sendBuffer.clear();
  222. }
  223.  
  224. template<typename T>
  225. void PackageBuffer<T>::drop()
  226. {
  227. std::size_t dataLeft;
  228. std::vector<char> drop;
  229. do
  230. {
  231. std::size_t readDataAvailable = readCurrentPacketSize - readCurrentPacketPosition;
  232. drop.resize(readDataAvailable);
  233. read(&drop[0], readDataAvailable);
  234.  
  235. dataLeft = 0;
  236. if (hasMore)
  237. {
  238. nextPacket();
  239. dataLeft = readCurrentPacketSize - readCurrentPacketPosition;
  240. }
  241. }
  242. while (dataLeft != 0);
  243. }
  244.  
  245. template<typename T>
  246. void PackageBuffer<T>::reset()
  247. {
  248. std::size_t readDataAvailable = readCurrentPacketSize - readCurrentPacketPosition;
  249. if (readDataAvailable == 0 && hasMore)
  250. {
  251. nextPacket();
  252. readDataAvailable = readCurrentPacketSize - readCurrentPacketPosition;
  253. }
  254. if (readDataAvailable != 0)
  255. {
  256. std::stringstream extraData;
  257. for (std::size_t loop=0; loop < readDataAvailable; ++loop)
  258. {
  259. char x;
  260. read(&x, 1);
  261. extraData << std::hex << std::setw(2) << std::setfill('0') << static_cast<int>(x) << "(" << x << ") ";
  262. }
  263. throw std::domain_error(
  264. bugReport("ThorsAnvil::MySQL::PackageBuffer<T>::reset: ",
  265. "reset() before message was read:",
  266. extraData.str()
  267. ));
  268. }
  269.  
  270. flushed = false;
  271. hasMore = true; // Will allow us to start reading the next packet
  272. }
  273.  
  274. template<typename T>
  275. void PackageBuffer<T>::writePackageHeader(std::size_t size)
  276. {
  277. ++currentPacketSequenceID;
  278. stream.write(reinterpret_cast<char const*>(&size), 3);
  279. stream.write(reinterpret_cast<char const*>(&currentPacketSequenceID), 1);
  280. }
  281.  
  282. template<typename T>
  283. void PackageBuffer<T>::writeStream(char const* buffer, std::size_t len)
  284. {
  285. stream.write(buffer, len);
  286. }
  287.  
  288. }
  289. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement