Guest User

Untitled

a guest
Jun 19th, 2018
80
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.91 KB | None | 0 0
  1. #pragma once
  2.  
  3. #include "xxxxxxxxxProtocol.h"
  4. #include "xxxxxxxxxTools.h"
  5.  
  6. #define ZSTD_STATIC_LINKING_ONLY /* Enable advanced API */
  7. #include "thirdparty/zstd/zstd.h" // Zstd
  8. #include "thirdparty/zstd/zstd_errors.h"
  9.  
  10. namespace xxx {
  11.  
  12.  
  13. //------------------------------------------------------------------------------
  14. // Compression Constants
  15.  
  16. /// Zstd compression level
  17. static const int kCompressionLevel = 1;
  18.  
  19. /// Compression history buffer size
  20. static const unsigned kCompressionDictBytes = 24 * 1000;
  21.  
  22. /// Bytes allocated per packet
  23. static const unsigned kCompressionAllocateBytes = \
  24. protocol::kMaxPossibleDatagramByteLimit - protocol::kMaxOverheadBytes;
  25.  
  26.  
  27. //------------------------------------------------------------------------------
  28. // Ring Buffer
  29.  
  30. template<size_t kBufferBytes>
  31. class RingBuffer
  32. {
  33. public:
  34. /// Get a contiguous region that is at least `bytes` in size
  35. XXXX_FORCE_INLINE void* Allocate(unsigned bytes)
  36. {
  37. if (NextWriteOffset + bytes > kBufferBytes) {
  38. NextWriteOffset = 0;
  39. }
  40. return Buffer + NextWriteOffset;
  41. }
  42.  
  43. /// Commit some number of bytes up to allocated bytes
  44. XXXX_FORCE_INLINE void Commit(unsigned bytes)
  45. {
  46. XXXX_DEBUG_ASSERT(NextWriteOffset + bytes <= kBufferBytes);
  47. NextWriteOffset += bytes;
  48. }
  49.  
  50. protected:
  51. /// Ring buffer that eats its own tail
  52. uint8_t Buffer[kBufferBytes];
  53.  
  54. /// Next offset to write to
  55. unsigned NextWriteOffset = 0;
  56. };
  57.  
  58.  
  59. //------------------------------------------------------------------------------
  60. // MessageCompressor
  61.  
  62. class MessageCompressor
  63. {
  64. public:
  65. Result Initialize();
  66.  
  67. ~MessageCompressor();
  68.  
  69. /// Compress data to the destination buffer `destBuffer`.
  70. /// Returns the number of bytes written in `writtenBytes`.
  71. /// Returns writtenBytes = 0 if data should not be compressed
  72. Result Compress(
  73. const uint8_t* data,
  74. unsigned bytes,
  75. uint8_t* dest,
  76. unsigned& writtenBytes);
  77.  
  78. protected:
  79. /// Dictionary history used by decompressor
  80. RingBuffer<kCompressionDictBytes> History;
  81.  
  82. /// Zstd context object used to compress packets
  83. ZSTD_CCtx* CCtx = nullptr;
  84. };
  85.  
  86.  
  87. //------------------------------------------------------------------------------
  88. // MessageDecompressor
  89.  
  90. struct Decompressed
  91. {
  92. const uint8_t* Data;
  93. unsigned Bytes;
  94. };
  95.  
  96. class MessageDecompressor
  97. {
  98. public:
  99. Result Initialize();
  100.  
  101. ~MessageDecompressor();
  102.  
  103. /// Decompress and handle a block of messages
  104. Result Decompress(
  105. const void* data,
  106. unsigned bytes,
  107. Decompressed& decompressed);
  108.  
  109. /// Insert uncompressed reliable datagram
  110. void InsertUncompressed(
  111. const uint8_t* data,
  112. unsigned bytes);
  113.  
  114. protected:
  115. /// Dictionary history used by decompressor
  116. RingBuffer<kCompressionDictBytes> History;
  117.  
  118. /// Zstd context object used to decompress packets
  119. ZSTD_DCtx* DCtx = nullptr;
  120. };
  121.  
  122.  
  123. //------------------------------------------------------------------------------
  124. // MessageCompressor
  125.  
  126. Result MessageCompressor::Initialize()
  127. {
  128. CCtx = ZSTD_createCCtx();
  129.  
  130. if (!CCtx) {
  131. return Result("SessionOutgoing::Initialize", "ZSTD_createCCtx failed", ErrorType::Zstd);
  132. }
  133.  
  134. const size_t estimatedPacketSize = kCompressionAllocateBytes;
  135.  
  136. ZSTD_parameters zParams;
  137.  
  138. zParams.cParams = ZSTD_getCParams(
  139. kCompressionLevel,
  140. estimatedPacketSize,
  141. kCompressionDictBytes);
  142.  
  143. zParams.fParams.checksumFlag = 0;
  144. zParams.fParams.contentSizeFlag = 0;
  145. zParams.fParams.noDictIDFlag = 1;
  146.  
  147. const size_t icsResult = ZSTD_compressBegin_advanced(
  148. CCtx,
  149. nullptr,
  150. 0,
  151. zParams,
  152. ZSTD_CONTENTSIZE_UNKNOWN);
  153.  
  154. if (0 != ZSTD_isError(icsResult)) {
  155. XXXX_DEBUG_BREAK();
  156. return Result("SessionOutgoing::Initialize", "ZSTD_compressBegin_advanced failed", ErrorType::Zstd, icsResult);
  157. }
  158.  
  159. const size_t blockSizeBytes = ZSTD_getBlockSize(CCtx);
  160. if (blockSizeBytes < kCompressionAllocateBytes) {
  161. return Result("SessionOutgoing::Initialize", "Zstd block size is too small", ErrorType::Zstd);
  162. }
  163.  
  164. return Result::Success();
  165. }
  166.  
  167. MessageCompressor::~MessageCompressor()
  168. {
  169. if (CCtx) {
  170. ZSTD_freeCCtx(CCtx);
  171. }
  172. }
  173.  
  174. Result MessageCompressor::Compress(
  175. const uint8_t* data,
  176. unsigned bytes,
  177. uint8_t* destBuffer,
  178. unsigned& writtenBytes)
  179. {
  180. XXXX_DEBUG_ASSERT(bytes >= protocol::kMessageFrameBytes);
  181.  
  182. writtenBytes = 0;
  183.  
  184. // Insert data into history ring buffer
  185. XXXX_DEBUG_ASSERT(kCompressionAllocateBytes >= bytes);
  186. void* history = History.Allocate(kCompressionAllocateBytes);
  187. memcpy(history, data, bytes);
  188. History.Commit(bytes);
  189.  
  190. // Compress into scratch buffer, leaving room for a frame header
  191. const size_t result = ZSTD_compressBlock(
  192. CCtx,
  193. destBuffer + protocol::kMessageFrameBytes,
  194. kCompressionAllocateBytes,
  195. history,
  196. bytes);
  197.  
  198. // If no data to compress, or would require too much space,
  199. // or did not produce a small enough result:
  200. if (0 == result ||
  201. (size_t)-ZSTD_error_dstSize_tooSmall == result ||
  202. protocol::kMessageFrameBytes + result >= bytes)
  203. {
  204. // Note: Input data was accumulated into history ring buffer
  205. return Result::Success();
  206. }
  207.  
  208. // If compression failed:
  209. if (0 != ZSTD_isError(result))
  210. {
  211. std::string reason = "ZSTD_compressBlock failed: ";
  212. reason += ZSTD_getErrorName(result);
  213. XXXX_DEBUG_BREAK();
  214. return Result("SessionOutgoing::compress", reason, ErrorType::Zstd, result);
  215. }
  216.  
  217. const unsigned compressedBytes = static_cast<unsigned>(result);
  218.  
  219. // Write Compressed frame header
  220. protocol::WriteMessageFrameHeader(
  221. destBuffer,
  222. protocol::MessageType_Compressed,
  223. compressedBytes);
  224.  
  225. // Compressed bytes includes the frame header
  226. writtenBytes = protocol::kMessageFrameBytes + compressedBytes;
  227. XXXX_DEBUG_ASSERT(writtenBytes <= kCompressionAllocateBytes);
  228.  
  229. return Result::Success();
  230. }
  231.  
  232.  
  233. //------------------------------------------------------------------------------
  234. // MessageDecompressor
  235.  
  236. Result MessageDecompressor::Initialize()
  237. {
  238. DCtx = ZSTD_createDCtx();
  239.  
  240. if (!DCtx) {
  241. return Result("SessionIncoming::Initialize", "ZSTD_createDCtx failed", ErrorType::Zstd);
  242. }
  243.  
  244. const size_t beginResult = ZSTD_decompressBegin(DCtx);
  245.  
  246. if (0 != ZSTD_isError(beginResult)) {
  247. return Result("SessionIncoming::Initialize", "ZSTD_decompressBegin failed", ErrorType::Zstd, beginResult);
  248. }
  249.  
  250. return Result::Success();
  251. }
  252.  
  253. MessageDecompressor::~MessageDecompressor()
  254. {
  255. if (DCtx) {
  256. ZSTD_freeDCtx(DCtx);
  257. }
  258. }
  259.  
  260. void MessageDecompressor::InsertUncompressed(
  261. const uint8_t* data,
  262. unsigned bytes)
  263. {
  264. if (bytes > kCompressionAllocateBytes) {
  265. XXXX_DEBUG_BREAK(); // Invalid input
  266. return;
  267. }
  268.  
  269. void* history = History.Allocate(kCompressionAllocateBytes);
  270. memcpy(history, data, bytes);
  271. ZSTD_insertBlock(DCtx, history, bytes);
  272. History.Commit(bytes);
  273. }
  274.  
  275. Result MessageDecompressor::Decompress(
  276. const void* data,
  277. unsigned bytes,
  278. Decompressed& decompressed)
  279. {
  280. // Decompress data into history ring buffer
  281. void* history = History.Allocate(kCompressionAllocateBytes);
  282.  
  283. const size_t result = ZSTD_decompressBlock(
  284. DCtx,
  285. history,
  286. kCompressionAllocateBytes,
  287. data,
  288. bytes);
  289.  
  290. // If decompression failed:
  291. if (0 == result || 0 != ZSTD_isError(result))
  292. {
  293. std::string reason = "ZSTD_decompressBlock failed: ";
  294. reason += ZSTD_getErrorName(result);
  295. XXXX_DEBUG_BREAK();
  296. return Result("SessionOutgoing::decompress", reason, ErrorType::Zstd, result);
  297. }
  298.  
  299. const uint8_t* datagramData = reinterpret_cast<uint8_t*>(history);
  300. const unsigned datagramBytes = static_cast<unsigned>(result);
  301.  
  302. History.Commit(datagramBytes);
  303.  
  304. decompressed.Data = datagramData;
  305. decompressed.Bytes = datagramBytes;
  306.  
  307. return Result::Success();
  308. }
Add Comment
Please, Sign In to add comment