Advertisement
Guest User

Untitled

a guest
Sep 15th, 2019
175
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.63 KB | None | 0 0
  1. diff --git a/cpp/examples/parquet/parquet-arrow/reader-writer.cc b/cpp/examples/parquet/parquet-arrow/reader-writer.cc
  2. index a5f928b6d..68c1d988b 100644
  3. --- a/cpp/examples/parquet/parquet-arrow/reader-writer.cc
  4. +++ b/cpp/examples/parquet/parquet-arrow/reader-writer.cc
  5. @@ -16,12 +16,17 @@
  6. // under the License.
  7.  
  8. #include <iostream>
  9. +#include <chrono>
  10. #include <arrow/api.h>
  11. #include <arrow/io/api.h>
  12. #include <parquet/arrow/reader.h>
  13. #include <parquet/arrow/writer.h>
  14. #include <parquet/exception.h>
  15.  
  16. +#include <arrow/ipc/reader.h>
  17. +#include <arrow/ipc/writer.h>
  18. +#include <arrow/table.h>
  19. +
  20. // #0 Build dummy data to pass around
  21. // To have some input data, we first create an Arrow Table that holds
  22. // some data.
  23. @@ -46,18 +51,139 @@ std::shared_ptr<arrow::Table> generate_table() {
  24. return arrow::Table::Make(schema, {i64array, strarray});
  25. }
  26.  
  27. +int consume_float64(std::shared_ptr<arrow::Array> col) {
  28. + const double* const raw_val = std::dynamic_pointer_cast<arrow::DoubleArray>(col).get()->raw_values();
  29. + const unsigned long items = col.get()->length();
  30. + // get the raw data here
  31. + const std::shared_ptr<arrow::ArrayData> data = col.get()->data();
  32. + // get the raw bitmap
  33. + const uint8_t *bitmap = col.get()->null_bitmap_data();
  34. + // use the local variables instead of using the class
  35. + unsigned long intsx = 0, checksumx = 0;
  36. + if(bitmap == NULLPTR){
  37. + // fast path
  38. + for (unsigned long i = 0; i < items; i++){
  39. + checksumx+=raw_val[i];
  40. + }
  41. + intsx = items;
  42. + } else {
  43. + // slow path
  44. + for (unsigned long i = 0, j = data->offset; i < items; i++, j++) {
  45. + //if ((bitmap[j >> 3] & (1 << (j & 0x07))) != 0){
  46. + //if ((bitmap[j >> 3] & kBitmask[j & 0x07]) != 0){
  47. + if ((bitmap[j / 8] & arrow::BitUtil::kBitmask[j % 8]) != 0){ // gives 14.1 Gbps
  48. + //if ((bitmap[j / 8] & kBitmask[(j & 0x07)]) != 0){ // gives 18.08 Gbps
  49. + intsx++;
  50. + checksumx += raw_val[i];
  51. + }
  52. + }
  53. + }
  54. + return 0;
  55. +}
  56. +
  57. +
  58. // #1 Write out the data as a Parquet file
  59. void write_parquet_file(const arrow::Table& table) {
  60. +/*
  61. std::shared_ptr<arrow::io::FileOutputStream> outfile;
  62. PARQUET_THROW_NOT_OK(
  63. - arrow::io::FileOutputStream::Open("parquet-arrow-example.parquet", &outfile));
  64. + arrow::io::FileOutputStream::Open("parquet-arrow-example.arrow", &outfile));
  65. +
  66. + std::shared_ptr<arrow::ipc::RecordBatchWriter> writer;
  67. + arrow::ipc::RecordBatchFileWriter::Open(outfile.get(), table.schema(), &writer);
  68. +
  69. + arrow::TableBatchReader reader(table);
  70. +
  71. + std::shared_ptr<arrow::RecordBatch> batch;
  72. + while (true) {
  73. + reader.ReadNext(&batch);
  74. + if (batch == nullptr) {
  75. + break;
  76. + }
  77. + writer.get()->WriteRecordBatch(*batch);
  78. + }
  79. + writer.get()->Close();
  80. +
  81. + std::shared_ptr<arrow::io::FileOutputStream> pfile;
  82. + PARQUET_THROW_NOT_OK(
  83. + arrow::io::FileOutputStream::Open("parquet-arrow-example.parquet", &pfile));
  84. +
  85. +
  86. // The last argument to the function call is the size of the RowGroup in
  87. // the parquet file. Normally you would choose this to be rather large but
  88. // for the example, we use a small value to have multiple RowGroups.
  89. + //PARQUET_THROW_NOT_OK(
  90. + parquet::arrow::WriteTable(table, arrow::default_memory_pool(), pfile, 3);
  91. +*/
  92. +
  93. + auto start = std::chrono::high_resolution_clock::now();
  94. +
  95. + //std::shared_ptr<arrow::io::ReadableFile> infile;
  96. + //arrow::io::ReadableFile::Open("/tmp/low_entropy_UNCOMPRESSED.arrow", &infile);
  97. + std::shared_ptr<arrow::io::MemoryMappedFile> mfile;
  98. + arrow::io::MemoryMappedFile::Open("/tmp/low_entropy_UNCOMPRESSED.arrow", arrow::io::FileMode::READ, &mfile);
  99. + std::shared_ptr<arrow::ipc::RecordBatchFileReader> file_reader;
  100. + //arrow::ipc::RecordBatchFileReader::Open(infile, &file_reader);
  101. + arrow::ipc::RecordBatchFileReader::Open(mfile, &file_reader);
  102. + auto tschema = file_reader.get()->schema();
  103. + auto num_blocks = file_reader.get()->num_record_batches();
  104. +
  105. + std::shared_ptr<arrow::RecordBatch> result;
  106. + file_reader->ReadRecordBatch(0, &result);
  107. + int num_cols = result.get()->num_columns();
  108. +
  109. + for(int i = 0; i < num_cols; i++){
  110. + std::shared_ptr<arrow::Array> col = result.get()->column(i);
  111. + // we need to get the type and consume
  112. + arrow::Type::type id = col.get()->type_id();
  113. + switch(id){
  114. + case arrow::Type::type::DOUBLE: consume_float64(col) ; break;
  115. + default: std::cout << "NYI \n"; break;
  116. + }
  117. + }
  118. +
  119. + auto end = std::chrono::high_resolution_clock::now();
  120. + std::cout << "arrow time cost: " << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() << "ms" << std::endl;
  121. +}
  122. +
  123. +void read_parquet_file() {
  124. + auto start = std::chrono::high_resolution_clock::now();
  125. +
  126. + std::shared_ptr<arrow::io::ReadableFile> infile;
  127. + arrow::io::ReadableFile::Open("/tmp/low_entropy_UNCOMPRESSED.parquet", arrow::default_memory_pool(), &infile);
  128. + std::unique_ptr<parquet::arrow::FileReader> reader;
  129. PARQUET_THROW_NOT_OK(
  130. - parquet::arrow::WriteTable(table, arrow::default_memory_pool(), outfile, 3));
  131. + parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));
  132. + std::shared_ptr<arrow::Table> table;
  133. + PARQUET_THROW_NOT_OK(reader->ReadTable(&table));
  134. +
  135. + //auto tschema = table->schema();
  136. + //auto num_rows = table->num_rows();
  137. + //auto num_cols = table->num_columns();
  138. + //std::cout << "rows: " << num_rows << " cols:" << num_cols << std::endl;
  139. +
  140. + //std::shared_ptr<arrow::RecordBatch> result;
  141. + //reader->ReadRecordBatch(0, &result);
  142. + //int num_cols = result.get()->num_columns();
  143. +
  144. + //std::shared_ptr<arrow::ChunkedArray> array;
  145. + //for(int i = 0; i < 16; i++){
  146. + // reader->ReadColumn(i, &array);
  147. +
  148. +
  149. + // //arrow::Type::type id = col.get()->type_id();
  150. + // //switch(id){
  151. + // // case arrow::Type::type::DOUBLE: consume_float64(col) ; break;
  152. + // // default: std::cout << "NYI \n"; break;
  153. + // //}
  154. + //}
  155. +
  156. + auto end = std::chrono::high_resolution_clock::now();
  157. + std::cout << "parquet time cost: " << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() << "ms" << std::endl;
  158. }
  159.  
  160. +
  161. +
  162. // #2: Fully read in the file
  163. void read_whole_file() {
  164. std::cout << "Reading parquet-arrow-example.parquet at once" << std::endl;
  165. @@ -128,8 +254,9 @@ void read_single_column_chunk() {
  166. int main(int argc, char** argv) {
  167. std::shared_ptr<arrow::Table> table = generate_table();
  168. write_parquet_file(*table);
  169. - read_whole_file();
  170. - read_single_rowgroup();
  171. - read_single_column();
  172. - read_single_column_chunk();
  173. + read_parquet_file();
  174. + //read_whole_file();
  175. + //read_single_rowgroup();
  176. + //read_single_column();
  177. + //read_single_column_chunk();
  178. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement