Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- diff --git a/cpp/examples/parquet/parquet-arrow/reader-writer.cc b/cpp/examples/parquet/parquet-arrow/reader-writer.cc
- index a5f928b6d..68c1d988b 100644
- --- a/cpp/examples/parquet/parquet-arrow/reader-writer.cc
- +++ b/cpp/examples/parquet/parquet-arrow/reader-writer.cc
- @@ -16,12 +16,17 @@
- // under the License.
- #include <iostream>
- +#include <chrono>
- #include <arrow/api.h>
- #include <arrow/io/api.h>
- #include <parquet/arrow/reader.h>
- #include <parquet/arrow/writer.h>
- #include <parquet/exception.h>
- +#include <arrow/ipc/reader.h>
- +#include <arrow/ipc/writer.h>
- +#include <arrow/table.h>
- +
- // #0 Build dummy data to pass around
- // To have some input data, we first create an Arrow Table that holds
- // some data.
- @@ -46,18 +51,139 @@ std::shared_ptr<arrow::Table> generate_table() {
- return arrow::Table::Make(schema, {i64array, strarray});
- }
- +int consume_float64(std::shared_ptr<arrow::Array> col) {
- + const double* const raw_val = std::dynamic_pointer_cast<arrow::DoubleArray>(col).get()->raw_values();
- + const unsigned long items = col.get()->length();
- + // get the raw data here
- + const std::shared_ptr<arrow::ArrayData> data = col.get()->data();
- + // get the raw bitmap
- + const uint8_t *bitmap = col.get()->null_bitmap_data();
- + // use the local variables instead of using the class
- + unsigned long intsx = 0, checksumx = 0;
- + if(bitmap == NULLPTR){
- + // fast path
- + for (unsigned long i = 0; i < items; i++){
- + checksumx+=raw_val[i];
- + }
- + intsx = items;
- + } else {
- + // slow path
- + for (unsigned long i = 0, j = data->offset; i < items; i++, j++) {
- + //if ((bitmap[j >> 3] & (1 << (j & 0x07))) != 0){
- + //if ((bitmap[j >> 3] & kBitmask[j & 0x07]) != 0){
- + if ((bitmap[j / 8] & arrow::BitUtil::kBitmask[j % 8]) != 0){ // gives 14.1 Gbps
- + //if ((bitmap[j / 8] & kBitmask[(j & 0x07)]) != 0){ // gives 18.08 Gbps
- + intsx++;
- + checksumx += raw_val[i];
- + }
- + }
- + }
- + return 0;
- +}
- +
- +
- // #1 Write out the data as a Parquet file
- void write_parquet_file(const arrow::Table& table) {
- +/*
- std::shared_ptr<arrow::io::FileOutputStream> outfile;
- PARQUET_THROW_NOT_OK(
- - arrow::io::FileOutputStream::Open("parquet-arrow-example.parquet", &outfile));
- + arrow::io::FileOutputStream::Open("parquet-arrow-example.arrow", &outfile));
- +
- + std::shared_ptr<arrow::ipc::RecordBatchWriter> writer;
- + arrow::ipc::RecordBatchFileWriter::Open(outfile.get(), table.schema(), &writer);
- +
- + arrow::TableBatchReader reader(table);
- +
- + std::shared_ptr<arrow::RecordBatch> batch;
- + while (true) {
- + reader.ReadNext(&batch);
- + if (batch == nullptr) {
- + break;
- + }
- + writer.get()->WriteRecordBatch(*batch);
- + }
- + writer.get()->Close();
- +
- + std::shared_ptr<arrow::io::FileOutputStream> pfile;
- + PARQUET_THROW_NOT_OK(
- + arrow::io::FileOutputStream::Open("parquet-arrow-example.parquet", &pfile));
- +
- +
- // The last argument to the function call is the size of the RowGroup in
- // the parquet file. Normally you would choose this to be rather large but
- // for the example, we use a small value to have multiple RowGroups.
- + //PARQUET_THROW_NOT_OK(
- + parquet::arrow::WriteTable(table, arrow::default_memory_pool(), pfile, 3);
- +*/
- +
- + auto start = std::chrono::high_resolution_clock::now();
- +
- + //std::shared_ptr<arrow::io::ReadableFile> infile;
- + //arrow::io::ReadableFile::Open("/tmp/low_entropy_UNCOMPRESSED.arrow", &infile);
- + std::shared_ptr<arrow::io::MemoryMappedFile> mfile;
- + arrow::io::MemoryMappedFile::Open("/tmp/low_entropy_UNCOMPRESSED.arrow", arrow::io::FileMode::READ, &mfile);
- + std::shared_ptr<arrow::ipc::RecordBatchFileReader> file_reader;
- + //arrow::ipc::RecordBatchFileReader::Open(infile, &file_reader);
- + arrow::ipc::RecordBatchFileReader::Open(mfile, &file_reader);
- + auto tschema = file_reader.get()->schema();
- + auto num_blocks = file_reader.get()->num_record_batches();
- +
- + std::shared_ptr<arrow::RecordBatch> result;
- + file_reader->ReadRecordBatch(0, &result);
- + int num_cols = result.get()->num_columns();
- +
- + for(int i = 0; i < num_cols; i++){
- + std::shared_ptr<arrow::Array> col = result.get()->column(i);
- + // we need to get the type and consume
- + arrow::Type::type id = col.get()->type_id();
- + switch(id){
- + case arrow::Type::type::DOUBLE: consume_float64(col) ; break;
- + default: std::cout << "NYI \n"; break;
- + }
- + }
- +
- + auto end = std::chrono::high_resolution_clock::now();
- + std::cout << "arrow time cost: " << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() << "ms" << std::endl;
- +}
- +
- +void read_parquet_file() {
- + auto start = std::chrono::high_resolution_clock::now();
- +
- + std::shared_ptr<arrow::io::ReadableFile> infile;
- + arrow::io::ReadableFile::Open("/tmp/low_entropy_UNCOMPRESSED.parquet", arrow::default_memory_pool(), &infile);
- + std::unique_ptr<parquet::arrow::FileReader> reader;
- PARQUET_THROW_NOT_OK(
- - parquet::arrow::WriteTable(table, arrow::default_memory_pool(), outfile, 3));
- + parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));
- + std::shared_ptr<arrow::Table> table;
- + PARQUET_THROW_NOT_OK(reader->ReadTable(&table));
- +
- + //auto tschema = table->schema();
- + //auto num_rows = table->num_rows();
- + //auto num_cols = table->num_columns();
- + //std::cout << "rows: " << num_rows << " cols:" << num_cols << std::endl;
- +
- + //std::shared_ptr<arrow::RecordBatch> result;
- + //reader->ReadRecordBatch(0, &result);
- + //int num_cols = result.get()->num_columns();
- +
- + //std::shared_ptr<arrow::ChunkedArray> array;
- + //for(int i = 0; i < 16; i++){
- + // reader->ReadColumn(i, &array);
- +
- +
- + // //arrow::Type::type id = col.get()->type_id();
- + // //switch(id){
- + // // case arrow::Type::type::DOUBLE: consume_float64(col) ; break;
- + // // default: std::cout << "NYI \n"; break;
- + // //}
- + //}
- +
- + auto end = std::chrono::high_resolution_clock::now();
- + std::cout << "parquet time cost: " << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() << "ms" << std::endl;
- }
- +
- +
- // #2: Fully read in the file
- void read_whole_file() {
- std::cout << "Reading parquet-arrow-example.parquet at once" << std::endl;
- @@ -128,8 +254,9 @@ void read_single_column_chunk() {
- int main(int argc, char** argv) {
- std::shared_ptr<arrow::Table> table = generate_table();
- write_parquet_file(*table);
- - read_whole_file();
- - read_single_rowgroup();
- - read_single_column();
- - read_single_column_chunk();
- + read_parquet_file();
- + //read_whole_file();
- + //read_single_rowgroup();
- + //read_single_column();
- + //read_single_column_chunk();
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement