Created
October 9, 2018 14:35
-
-
Save wesm/e8e43aba036db747fb9c021d590be938 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/cpp/src/parquet/.parquetcppversion b/cpp/src/parquet/.parquetcppversion | |
index d65937f10..f825f7c7f 100644 | |
--- a/cpp/src/parquet/.parquetcppversion | |
+++ b/cpp/src/parquet/.parquetcppversion | |
@@ -1 +1 @@ | |
-1.4.1-SNAPSHOT | |
+1.5.1-SNAPSHOT | |
diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc | |
index 5f4e12349..086672711 100644 | |
--- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc | |
+++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc | |
@@ -1391,6 +1391,16 @@ TEST(TestArrowReadWrite, CoerceTimestampsLosePrecision) { | |
ASSERT_RAISES(Invalid, WriteTable(*t4, ::arrow::default_memory_pool(), sink, 10, | |
default_writer_properties(), coerce_millis)); | |
+ // OK to lose precision if we explicitly allow it | |
+ auto allow_truncation = (ArrowWriterProperties::Builder() | |
+ .coerce_timestamps(TimeUnit::MILLI) | |
+ ->allow_truncated_timestamps() | |
+ ->build()); | |
+ ASSERT_OK_NO_THROW(WriteTable(*t3, ::arrow::default_memory_pool(), sink, 10, | |
+ default_writer_properties(), allow_truncation)); | |
+ ASSERT_OK_NO_THROW(WriteTable(*t4, ::arrow::default_memory_pool(), sink, 10, | |
+ default_writer_properties(), allow_truncation)); | |
+ | |
// OK to write micros to micros | |
auto coerce_micros = | |
(ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MICRO)->build()); | |
@@ -2316,11 +2326,11 @@ TEST(TestArrowReaderAdHoc, Int96BadMemoryAccess) { | |
ASSERT_OK_NO_THROW(arrow_reader->ReadTable(&table)); | |
} | |
-class TestArrowReaderAdHocSpark | |
+class TestArrowReaderAdHocSparkAndHvr | |
: public ::testing::TestWithParam< | |
std::tuple<std::string, std::shared_ptr<::DataType>>> {}; | |
-TEST_P(TestArrowReaderAdHocSpark, ReadDecimals) { | |
+TEST_P(TestArrowReaderAdHocSparkAndHvr, ReadDecimals) { | |
std::string path(test::get_data_dir()); | |
std::string filename; | |
@@ -2364,12 +2374,13 @@ TEST_P(TestArrowReaderAdHocSpark, ReadDecimals) { | |
} | |
INSTANTIATE_TEST_CASE_P( | |
- ReadDecimals, TestArrowReaderAdHocSpark, | |
+ ReadDecimals, TestArrowReaderAdHocSparkAndHvr, | |
::testing::Values( | |
std::make_tuple("int32_decimal.parquet", ::arrow::decimal(4, 2)), | |
std::make_tuple("int64_decimal.parquet", ::arrow::decimal(10, 2)), | |
std::make_tuple("fixed_length_decimal.parquet", ::arrow::decimal(25, 2)), | |
- std::make_tuple("fixed_length_decimal_legacy.parquet", ::arrow::decimal(13, 2)))); | |
+ std::make_tuple("fixed_length_decimal_legacy.parquet", ::arrow::decimal(13, 2)), | |
+ std::make_tuple("byte_array_decimal.parquet", ::arrow::decimal(4, 2)))); | |
} // namespace arrow | |
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc | |
index 11fb20cd1..2006025ac 100644 | |
--- a/cpp/src/parquet/arrow/reader.cc | |
+++ b/cpp/src/parquet/arrow/reader.cc | |
@@ -18,26 +18,30 @@ | |
#include "parquet/arrow/reader.h" | |
#include <algorithm> | |
-#include <atomic> | |
-#include <chrono> | |
-#include <mutex> | |
-#include <queue> | |
+#include <climits> | |
+#include <cstring> | |
+#include <future> | |
+#include <ostream> | |
#include <string> | |
-#include <thread> | |
#include <type_traits> | |
#include <utility> | |
#include <vector> | |
#include "arrow/api.h" | |
#include "arrow/util/bit-util.h" | |
-#include "arrow/util/decimal.h" | |
#include "arrow/util/logging.h" | |
#include "arrow/util/thread-pool.h" | |
#include "parquet/arrow/record_reader.h" | |
#include "parquet/arrow/schema.h" | |
#include "parquet/column_reader.h" | |
+#include "parquet/exception.h" | |
+#include "parquet/file_reader.h" | |
+#include "parquet/metadata.h" | |
+#include "parquet/properties.h" | |
#include "parquet/schema.h" | |
+#include "parquet/types.h" | |
+#include "parquet/util/memory.h" | |
#include "parquet/util/schema-util.h" | |
using arrow::Array; | |
@@ -1221,6 +1225,64 @@ struct TransferFunctor<::arrow::Decimal128Type, FLBAType> { | |
} | |
}; | |
+/// \brief Convert an arrow::BinaryArray to an arrow::Decimal128Array | |
+/// We do this by: | |
+/// 1. Creating an arrow::BinaryArray from the RecordReader's builder | |
+/// 2. Allocating a buffer for the arrow::Decimal128Array | |
+/// 3. Converting the big-endian bytes in each BinaryArray entry to two integers | |
+/// representing the high and low bits of each decimal value. | |
+template <> | |
+struct TransferFunctor<::arrow::Decimal128Type, ByteArrayType> { | |
+ Status operator()(RecordReader* reader, MemoryPool* pool, | |
+ const std::shared_ptr<::arrow::DataType>& type, | |
+ std::shared_ptr<Array>* out) { | |
+ DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL); | |
+ | |
+ // Finish the built data into a temporary array | |
+ std::shared_ptr<Array> array; | |
+ RETURN_NOT_OK(reader->builder()->Finish(&array)); | |
+ const auto& binary_array = static_cast<const ::arrow::BinaryArray&>(*array); | |
+ | |
+ const int64_t length = binary_array.length(); | |
+ | |
+ const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*type); | |
+ const int64_t type_length = decimal_type.byte_width(); | |
+ | |
+ std::shared_ptr<Buffer> data; | |
+ RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data)); | |
+ | |
+ // raw bytes that we can write to | |
+ uint8_t* out_ptr = data->mutable_data(); | |
+ | |
+ const int64_t null_count = binary_array.null_count(); | |
+ | |
+ // convert each BinaryArray value to valid decimal bytes | |
+ for (int64_t i = 0; i < length; i++, out_ptr += type_length) { | |
+ int32_t record_len = 0; | |
+ const uint8_t* record_loc = binary_array.GetValue(i, &record_len); | |
+ | |
+ if ((record_len < 0) || (record_len > type_length)) { | |
+ return Status::Invalid("Invalid BYTE_ARRAY size"); | |
+ } | |
+ | |
+ auto out_ptr_view = reinterpret_cast<uint64_t*>(out_ptr); | |
+ out_ptr_view[0] = 0; | |
+ out_ptr_view[1] = 0; | |
+ | |
+ // only convert rows that are not null if there are nulls, or | |
+ // all rows, if there are not | |
+ if (((null_count > 0) && !binary_array.IsNull(i)) || (null_count <= 0)) { | |
+ RawBytesToDecimalBytes(record_loc, record_len, out_ptr); | |
+ } | |
+ } | |
+ | |
+ *out = std::make_shared<::arrow::Decimal128Array>( | |
+ type, length, data, binary_array.null_bitmap(), null_count); | |
+ | |
+ return Status::OK(); | |
+ } | |
+}; | |
+ | |
/// \brief Convert an Int32 or Int64 array into a Decimal128Array | |
/// The parquet spec allows systems to write decimals in int32, int64 if the values are | |
/// small enough to fit in less 4 bytes or less than 8 bytes, respectively. | |
@@ -1353,12 +1415,16 @@ Status PrimitiveImpl::NextBatch(int64_t records_to_read, std::shared_ptr<Array>* | |
case ::parquet::Type::INT64: { | |
TRANSFER_DATA(::arrow::Decimal128Type, Int64Type); | |
} break; | |
+ case ::parquet::Type::BYTE_ARRAY: { | |
+ TRANSFER_DATA(::arrow::Decimal128Type, ByteArrayType); | |
+ } break; | |
case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: { | |
TRANSFER_DATA(::arrow::Decimal128Type, FLBAType); | |
} break; | |
default: | |
return Status::Invalid( | |
- "Physical type for decimal must be int32, int64, or fixed length binary"); | |
+ "Physical type for decimal must be int32, int64, byte array, or fixed " | |
+ "length binary"); | |
} | |
} break; | |
case ::arrow::Type::TIMESTAMP: { | |
diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h | |
index 6eee0f6e2..2cd94ca28 100644 | |
--- a/cpp/src/parquet/arrow/reader.h | |
+++ b/cpp/src/parquet/arrow/reader.h | |
@@ -18,25 +18,32 @@ | |
#ifndef PARQUET_ARROW_READER_H | |
#define PARQUET_ARROW_READER_H | |
+#include <cstdint> | |
#include <memory> | |
#include <vector> | |
-#include "parquet/api/reader.h" | |
-#include "parquet/api/schema.h" | |
+#include "parquet/util/visibility.h" | |
#include "arrow/io/interfaces.h" | |
+#include "arrow/util/macros.h" | |
namespace arrow { | |
class Array; | |
class MemoryPool; | |
class RecordBatchReader; | |
+class Schema; | |
class Status; | |
class Table; | |
+ | |
} // namespace arrow | |
namespace parquet { | |
+class FileMetaData; | |
+class ParquetFileReader; | |
+class ReaderProperties; | |
+ | |
namespace arrow { | |
class ColumnChunkReader; | |
diff --git a/cpp/src/parquet/arrow/record_reader.cc b/cpp/src/parquet/arrow/record_reader.cc | |
index 3fbdfd586..ce6fa2a5b 100644 | |
--- a/cpp/src/parquet/arrow/record_reader.cc | |
+++ b/cpp/src/parquet/arrow/record_reader.cc | |
@@ -19,21 +19,29 @@ | |
#include <algorithm> | |
#include <cstdint> | |
+#include <cstring> | |
#include <memory> | |
#include <sstream> | |
+#include <unordered_map> | |
#include <utility> | |
-#include <arrow/buffer.h> | |
-#include <arrow/memory_pool.h> | |
-#include <arrow/status.h> | |
-#include <arrow/util/bit-util.h> | |
-#include <arrow/util/rle-encoding.h> | |
+#include "arrow/buffer.h" | |
+#include "arrow/builder.h" | |
+#include "arrow/memory_pool.h" | |
+#include "arrow/status.h" | |
+#include "arrow/type.h" | |
+#include "arrow/util/bit-util.h" | |
+#include "arrow/util/logging.h" | |
+#include "arrow/util/rle-encoding.h" | |
#include "parquet/column_page.h" | |
#include "parquet/column_reader.h" | |
#include "parquet/encoding-internal.h" | |
+#include "parquet/encoding.h" | |
#include "parquet/exception.h" | |
#include "parquet/properties.h" | |
+#include "parquet/schema.h" | |
+#include "parquet/types.h" | |
using arrow::MemoryPool; | |
diff --git a/cpp/src/parquet/arrow/record_reader.h b/cpp/src/parquet/arrow/record_reader.h | |
index 4935713a2..8da070999 100644 | |
--- a/cpp/src/parquet/arrow/record_reader.h | |
+++ b/cpp/src/parquet/arrow/record_reader.h | |
@@ -19,22 +19,24 @@ | |
#define PARQUET_RECORD_READER_H | |
#include <cstdint> | |
-#include <cstring> | |
-#include <iostream> | |
#include <memory> | |
-#include <unordered_map> | |
-#include <vector> | |
-#include <arrow/buffer.h> | |
-#include <arrow/builder.h> | |
-#include <arrow/memory_pool.h> | |
-#include <arrow/util/bit-util.h> | |
+#include "arrow/memory_pool.h" | |
-#include "parquet/column_reader.h" | |
-#include "parquet/schema.h" | |
#include "parquet/util/macros.h" | |
+#include "parquet/util/memory.h" | |
+ | |
+namespace arrow { | |
+ | |
+class ArrayBuilder; | |
+ | |
+} // namespace arrow | |
namespace parquet { | |
+ | |
+class ColumnDescriptor; | |
+class PageReader; | |
+ | |
namespace internal { | |
/// \brief Stateful column reader that delimits semantic records for both flat | |
diff --git a/cpp/src/parquet/arrow/schema.h b/cpp/src/parquet/arrow/schema.h | |
index 3b212da7e..8e920850c 100644 | |
--- a/cpp/src/parquet/arrow/schema.h | |
+++ b/cpp/src/parquet/arrow/schema.h | |
@@ -18,14 +18,16 @@ | |
#ifndef PARQUET_ARROW_SCHEMA_H | |
#define PARQUET_ARROW_SCHEMA_H | |
+#include <cstdint> | |
#include <memory> | |
#include <vector> | |
#include "arrow/api.h" | |
-#include "parquet/api/schema.h" | |
-#include "parquet/api/writer.h" | |
#include "parquet/arrow/writer.h" | |
+#include "parquet/metadata.h" | |
+#include "parquet/schema.h" | |
+#include "parquet/util/visibility.h" | |
namespace arrow { | |
@@ -35,8 +37,12 @@ class Status; | |
namespace parquet { | |
+class WriterProperties; | |
+ | |
namespace arrow { | |
+class ArrowWriterProperties; | |
+ | |
PARQUET_EXPORT | |
::arrow::Status NodeToField(const schema::Node& node, | |
std::shared_ptr<::arrow::Field>* out); | |
diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc | |
index 9247b84cf..923f13294 100644 | |
--- a/cpp/src/parquet/arrow/writer.cc | |
+++ b/cpp/src/parquet/arrow/writer.cc | |
@@ -366,8 +366,9 @@ class ArrowColumnWriter { | |
Status WriteTimestamps(const Array& data, int64_t num_levels, const int16_t* def_levels, | |
const int16_t* rep_levels); | |
- Status WriteTimestampsCoerce(const Array& data, int64_t num_levels, | |
- const int16_t* def_levels, const int16_t* rep_levels); | |
+ Status WriteTimestampsCoerce(const bool truncated_timestamps_allowed, const Array& data, | |
+ int64_t num_levels, const int16_t* def_levels, | |
+ const int16_t* rep_levels); | |
template <typename ParquetType, typename ArrowType> | |
Status WriteNonNullableBatch(const ArrowType& type, int64_t num_values, | |
@@ -626,7 +627,8 @@ Status ArrowColumnWriter::WriteTimestamps(const Array& values, int64_t num_level | |
// Casting is required. This covers several cases | |
// * Nanoseconds -> cast to microseconds | |
// * coerce_timestamps_enabled_, cast all timestamps to requested unit | |
- return WriteTimestampsCoerce(values, num_levels, def_levels, rep_levels); | |
+ return WriteTimestampsCoerce(ctx_->properties->truncated_timestamps_allowed(), values, | |
+ num_levels, def_levels, rep_levels); | |
} else { | |
// No casting of timestamps is required, take the fast path | |
return TypedWriteBatch<Int64Type, ::arrow::TimestampType>(values, num_levels, | |
@@ -634,7 +636,8 @@ Status ArrowColumnWriter::WriteTimestamps(const Array& values, int64_t num_level | |
} | |
} | |
-Status ArrowColumnWriter::WriteTimestampsCoerce(const Array& array, int64_t num_levels, | |
+Status ArrowColumnWriter::WriteTimestampsCoerce(const bool truncated_timestamps_allowed, | |
+ const Array& array, int64_t num_levels, | |
const int16_t* def_levels, | |
const int16_t* rep_levels) { | |
int64_t* buffer; | |
@@ -652,7 +655,7 @@ Status ArrowColumnWriter::WriteTimestampsCoerce(const Array& array, int64_t num_ | |
auto DivideBy = [&](const int64_t factor) { | |
for (int64_t i = 0; i < array.length(); i++) { | |
- if (!data.IsNull(i) && (values[i] % factor != 0)) { | |
+ if (!truncated_timestamps_allowed && !data.IsNull(i) && (values[i] % factor != 0)) { | |
std::stringstream ss; | |
ss << "Casting from " << type.ToString() << " to " << target_type->ToString() | |
<< " would lose data: " << values[i]; | |
diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h | |
index ad6f1d52d..7e4b2287b 100644 | |
--- a/cpp/src/parquet/arrow/writer.h | |
+++ b/cpp/src/parquet/arrow/writer.h | |
@@ -44,7 +44,10 @@ class PARQUET_EXPORT ArrowWriterProperties { | |
public: | |
class Builder { | |
public: | |
- Builder() : write_nanos_as_int96_(false), coerce_timestamps_enabled_(false) {} | |
+ Builder() | |
+ : write_nanos_as_int96_(false), | |
+ coerce_timestamps_enabled_(false), | |
+ truncated_timestamps_allowed_(false) {} | |
virtual ~Builder() {} | |
Builder* disable_deprecated_int96_timestamps() { | |
@@ -63,9 +66,20 @@ class PARQUET_EXPORT ArrowWriterProperties { | |
return this; | |
} | |
+ Builder* allow_truncated_timestamps() { | |
+ truncated_timestamps_allowed_ = true; | |
+ return this; | |
+ } | |
+ | |
+ Builder* disallow_truncated_timestamps() { | |
+ truncated_timestamps_allowed_ = false; | |
+ return this; | |
+ } | |
+ | |
std::shared_ptr<ArrowWriterProperties> build() { | |
return std::shared_ptr<ArrowWriterProperties>(new ArrowWriterProperties( | |
- write_nanos_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_)); | |
+ write_nanos_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_, | |
+ truncated_timestamps_allowed_)); | |
} | |
private: | |
@@ -73,6 +87,7 @@ class PARQUET_EXPORT ArrowWriterProperties { | |
bool coerce_timestamps_enabled_; | |
::arrow::TimeUnit::type coerce_timestamps_unit_; | |
+ bool truncated_timestamps_allowed_; | |
}; | |
bool support_deprecated_int96_timestamps() const { return write_nanos_as_int96_; } | |
@@ -82,17 +97,22 @@ class PARQUET_EXPORT ArrowWriterProperties { | |
return coerce_timestamps_unit_; | |
} | |
+ bool truncated_timestamps_allowed() const { return truncated_timestamps_allowed_; } | |
+ | |
private: | |
explicit ArrowWriterProperties(bool write_nanos_as_int96, | |
bool coerce_timestamps_enabled, | |
- ::arrow::TimeUnit::type coerce_timestamps_unit) | |
+ ::arrow::TimeUnit::type coerce_timestamps_unit, | |
+ bool truncated_timestamps_allowed) | |
: write_nanos_as_int96_(write_nanos_as_int96), | |
coerce_timestamps_enabled_(coerce_timestamps_enabled), | |
- coerce_timestamps_unit_(coerce_timestamps_unit) {} | |
+ coerce_timestamps_unit_(coerce_timestamps_unit), | |
+ truncated_timestamps_allowed_(truncated_timestamps_allowed) {} | |
const bool write_nanos_as_int96_; | |
const bool coerce_timestamps_enabled_; | |
const ::arrow::TimeUnit::type coerce_timestamps_unit_; | |
+ const bool truncated_timestamps_allowed_; | |
}; | |
std::shared_ptr<ArrowWriterProperties> PARQUET_EXPORT default_arrow_writer_properties(); | |
diff --git a/cpp/src/parquet/bloom_filter-test.cc b/cpp/src/parquet/bloom_filter-test.cc | |
index 96d2e065f..945f80b7b 100644 | |
--- a/cpp/src/parquet/bloom_filter-test.cc | |
+++ b/cpp/src/parquet/bloom_filter-test.cc | |
@@ -17,13 +17,21 @@ | |
#include <gtest/gtest.h> | |
-#include <algorithm> | |
+#include <cstdint> | |
+#include <limits> | |
+#include <memory> | |
#include <random> | |
#include <string> | |
+#include <vector> | |
+#include "arrow/buffer.h" | |
#include "arrow/io/file.h" | |
+#include "arrow/status.h" | |
+ | |
#include "parquet/bloom_filter.h" | |
+#include "parquet/exception.h" | |
#include "parquet/murmur3.h" | |
+#include "parquet/types.h" | |
#include "parquet/util/memory.h" | |
#include "parquet/util/test-common.h" | |
diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc | |
index 173292ecd..7fbf9babd 100644 | |
--- a/cpp/src/parquet/column_reader.cc | |
+++ b/cpp/src/parquet/column_reader.cc | |
@@ -52,7 +52,8 @@ int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level, | |
num_bytes = *reinterpret_cast<const int32_t*>(data); | |
const uint8_t* decoder_data = data + sizeof(int32_t); | |
if (!rle_decoder_) { | |
- rle_decoder_.reset(new ::arrow::RleDecoder(decoder_data, num_bytes, bit_width_)); | |
+ rle_decoder_.reset( | |
+ new ::arrow::util::RleDecoder(decoder_data, num_bytes, bit_width_)); | |
} else { | |
rle_decoder_->Reset(decoder_data, num_bytes, bit_width_); | |
} | |
@@ -62,7 +63,7 @@ int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level, | |
num_bytes = | |
static_cast<int32_t>(BitUtil::BytesForBits(num_buffered_values * bit_width_)); | |
if (!bit_packed_decoder_) { | |
- bit_packed_decoder_.reset(new ::arrow::BitReader(data, num_bytes)); | |
+ bit_packed_decoder_.reset(new ::arrow::BitUtil::BitReader(data, num_bytes)); | |
} else { | |
bit_packed_decoder_->Reset(data, num_bytes); | |
} | |
@@ -123,7 +124,7 @@ class SerializedPageReader : public PageReader { | |
std::shared_ptr<Page> current_page_; | |
// Compression codec to use. | |
- std::unique_ptr<::arrow::Codec> decompressor_; | |
+ std::unique_ptr<::arrow::util::Codec> decompressor_; | |
std::shared_ptr<ResizableBuffer> decompression_buffer_; | |
// Maximum allowed page size | |
diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h | |
index d1b4d2ef5..960f2107d 100644 | |
--- a/cpp/src/parquet/column_reader.h | |
+++ b/cpp/src/parquet/column_reader.h | |
@@ -44,8 +44,13 @@ | |
namespace arrow { | |
+namespace BitUtil { | |
class BitReader; | |
+} // namespace BitUtil | |
+ | |
+namespace util { | |
class RleDecoder; | |
+} // namespace util | |
} // namespace arrow | |
@@ -76,8 +81,8 @@ class PARQUET_EXPORT LevelDecoder { | |
int bit_width_; | |
int num_values_remaining_; | |
Encoding::type encoding_; | |
- std::unique_ptr<::arrow::RleDecoder> rle_decoder_; | |
- std::unique_ptr<::arrow::BitReader> bit_packed_decoder_; | |
+ std::unique_ptr<::arrow::util::RleDecoder> rle_decoder_; | |
+ std::unique_ptr<::arrow::BitUtil::BitReader> bit_packed_decoder_; | |
}; | |
// Abstract page iterator interface. This way, we can feed column pages to the | |
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc | |
index 9c7a39bfe..a45613f1b 100644 | |
--- a/cpp/src/parquet/column_writer.cc | |
+++ b/cpp/src/parquet/column_writer.cc | |
@@ -34,8 +34,8 @@ | |
namespace parquet { | |
-using BitWriter = ::arrow::BitWriter; | |
-using RleEncoder = ::arrow::RleEncoder; | |
+using BitWriter = ::arrow::BitUtil::BitWriter; | |
+using RleEncoder = ::arrow::util::RleEncoder; | |
LevelEncoder::LevelEncoder() {} | |
LevelEncoder::~LevelEncoder() {} | |
@@ -271,7 +271,7 @@ class SerializedPageWriter : public PageWriter { | |
int64_t total_compressed_size_; | |
// Compression codec to use. | |
- std::unique_ptr<::arrow::Codec> compressor_; | |
+ std::unique_ptr<::arrow::util::Codec> compressor_; | |
}; | |
// This implementation of the PageWriter writes to the final sink on Close . | |
diff --git a/cpp/src/parquet/column_writer.h b/cpp/src/parquet/column_writer.h | |
index e3bfcf0ae..457c532bb 100644 | |
--- a/cpp/src/parquet/column_writer.h | |
+++ b/cpp/src/parquet/column_writer.h | |
@@ -34,8 +34,13 @@ | |
namespace arrow { | |
+namespace BitUtil { | |
class BitWriter; | |
+} // namespace BitUtil | |
+ | |
+namespace util { | |
class RleEncoder; | |
+} // namespace util | |
} // namespace arrow | |
@@ -67,8 +72,8 @@ class PARQUET_EXPORT LevelEncoder { | |
int bit_width_; | |
int rle_length_; | |
Encoding::type encoding_; | |
- std::unique_ptr<::arrow::RleEncoder> rle_encoder_; | |
- std::unique_ptr<::arrow::BitWriter> bit_packed_encoder_; | |
+ std::unique_ptr<::arrow::util::RleEncoder> rle_encoder_; | |
+ std::unique_ptr<::arrow::BitUtil::BitWriter> bit_packed_encoder_; | |
}; | |
class PageWriter { | |
diff --git a/cpp/src/parquet/encoding-internal.h b/cpp/src/parquet/encoding-internal.h | |
index 0bfd26fbd..93d499300 100644 | |
--- a/cpp/src/parquet/encoding-internal.h | |
+++ b/cpp/src/parquet/encoding-internal.h | |
@@ -143,7 +143,7 @@ class PlainDecoder<BooleanType> : public Decoder<BooleanType> { | |
virtual void SetData(int num_values, const uint8_t* data, int len) { | |
num_values_ = num_values; | |
- bit_reader_ = ::arrow::BitReader(data, len); | |
+ bit_reader_ = BitUtil::BitReader(data, len); | |
} | |
// Two flavors of bool decoding | |
@@ -175,7 +175,7 @@ class PlainDecoder<BooleanType> : public Decoder<BooleanType> { | |
} | |
private: | |
- ::arrow::BitReader bit_reader_; | |
+ BitUtil::BitReader bit_reader_; | |
}; | |
// ---------------------------------------------------------------------- | |
@@ -210,7 +210,7 @@ class PlainEncoder<BooleanType> : public Encoder<BooleanType> { | |
bits_available_(kInMemoryDefaultCapacity * 8), | |
bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)), | |
values_sink_(new InMemoryOutputStream(pool)) { | |
- bit_writer_.reset(new ::arrow::BitWriter(bits_buffer_->mutable_data(), | |
+ bit_writer_.reset(new BitUtil::BitWriter(bits_buffer_->mutable_data(), | |
static_cast<int>(bits_buffer_->size()))); | |
} | |
@@ -274,7 +274,7 @@ class PlainEncoder<BooleanType> : public Encoder<BooleanType> { | |
protected: | |
int bits_available_; | |
- std::unique_ptr<::arrow::BitWriter> bit_writer_; | |
+ std::unique_ptr<BitUtil::BitWriter> bit_writer_; | |
std::shared_ptr<ResizableBuffer> bits_buffer_; | |
std::unique_ptr<InMemoryOutputStream> values_sink_; | |
}; | |
@@ -341,7 +341,7 @@ class DictionaryDecoder : public Decoder<Type> { | |
uint8_t bit_width = *data; | |
++data; | |
--len; | |
- idx_decoder_ = ::arrow::RleDecoder(data, len, bit_width); | |
+ idx_decoder_ = ::arrow::util::RleDecoder(data, len, bit_width); | |
} | |
int Decode(T* buffer, int max_values) override { | |
@@ -376,7 +376,7 @@ class DictionaryDecoder : public Decoder<Type> { | |
// pointers). | |
std::shared_ptr<ResizableBuffer> byte_array_data_; | |
- ::arrow::RleDecoder idx_decoder_; | |
+ ::arrow::util::RleDecoder idx_decoder_; | |
}; | |
template <typename Type> | |
@@ -468,7 +468,7 @@ class DictEncoder : public Encoder<DType> { | |
dict_encoded_size_(0), | |
type_length_(desc->type_length()) { | |
hash_slots_.Assign(hash_table_size_, HASH_SLOT_EMPTY); | |
- cpu_info_ = ::arrow::CpuInfo::GetInstance(); | |
+ cpu_info_ = ::arrow::internal::CpuInfo::GetInstance(); | |
} | |
~DictEncoder() override { DCHECK(buffered_indices_.empty()); } | |
@@ -487,9 +487,9 @@ class DictEncoder : public Encoder<DType> { | |
// an extra "RleEncoder::MinBufferSize" bytes. These extra bytes won't be used | |
// but not reserving them would cause the encoder to fail. | |
return 1 + | |
- ::arrow::RleEncoder::MaxBufferSize( | |
+ ::arrow::util::RleEncoder::MaxBufferSize( | |
bit_width(), static_cast<int>(buffered_indices_.size())) + | |
- ::arrow::RleEncoder::MinBufferSize(bit_width()); | |
+ ::arrow::util::RleEncoder::MinBufferSize(bit_width()); | |
} | |
/// The minimum bit width required to encode the currently buffered indices. | |
@@ -580,7 +580,7 @@ class DictEncoder : public Encoder<DType> { | |
// For ByteArray / FixedLenByteArray data. Not owned | |
ChunkedAllocator* pool_; | |
- ::arrow::CpuInfo* cpu_info_; | |
+ ::arrow::internal::CpuInfo* cpu_info_; | |
/// Size of the table. Must be a power of 2. | |
int hash_table_size_; | |
@@ -791,7 +791,7 @@ inline int DictEncoder<DType>::WriteIndices(uint8_t* buffer, int buffer_len) { | |
++buffer; | |
--buffer_len; | |
- ::arrow::RleEncoder encoder(buffer, buffer_len, bit_width()); | |
+ ::arrow::util::RleEncoder encoder(buffer, buffer_len, bit_width()); | |
for (int index : buffered_indices_) { | |
if (!encoder.Put(index)) return -1; | |
} | |
@@ -819,7 +819,7 @@ class DeltaBitPackDecoder : public Decoder<DType> { | |
virtual void SetData(int num_values, const uint8_t* data, int len) { | |
num_values_ = num_values; | |
- decoder_ = ::arrow::BitReader(data, len); | |
+ decoder_ = BitUtil::BitReader(data, len); | |
values_current_block_ = 0; | |
values_current_mini_block_ = 0; | |
} | |
@@ -885,7 +885,7 @@ class DeltaBitPackDecoder : public Decoder<DType> { | |
} | |
::arrow::MemoryPool* pool_; | |
- ::arrow::BitReader decoder_; | |
+ BitUtil::BitReader decoder_; | |
int32_t values_current_block_; | |
int32_t num_mini_blocks_; | |
uint64_t values_per_mini_block_; | |
diff --git a/cpp/src/parquet/file-deserialize-test.cc b/cpp/src/parquet/file-deserialize-test.cc | |
index b766eedf5..17dfe387f 100644 | |
--- a/cpp/src/parquet/file-deserialize-test.cc | |
+++ b/cpp/src/parquet/file-deserialize-test.cc | |
@@ -17,16 +17,11 @@ | |
#include <gtest/gtest.h> | |
-#include <algorithm> | |
#include <cstdint> | |
-#include <cstdlib> | |
#include <cstring> | |
-#include <exception> | |
#include <memory> | |
-#include <string> | |
-#include <vector> | |
-#include "parquet/column_reader.h" | |
+#include "parquet/column_page.h" | |
#include "parquet/exception.h" | |
#include "parquet/file_reader.h" | |
#include "parquet/thrift.h" | |
@@ -34,6 +29,8 @@ | |
#include "parquet/util/memory.h" | |
#include "parquet/util/test-common.h" | |
+#include "arrow/io/memory.h" | |
+#include "arrow/status.h" | |
#include "arrow/util/compression.h" | |
namespace parquet { | |
@@ -196,7 +193,7 @@ TEST_F(TestPageSerde, Compression) { | |
test::random_bytes(page_size, 0, &faux_data[i]); | |
} | |
for (auto codec_type : codec_types) { | |
- std::unique_ptr<::arrow::Codec> codec = GetCodecFromArrow(codec_type); | |
+ auto codec = GetCodecFromArrow(codec_type); | |
std::vector<uint8_t> buffer; | |
for (int i = 0; i < num_pages; ++i) { | |
diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc | |
index ea518fd98..5be1a8623 100644 | |
--- a/cpp/src/parquet/file_reader.cc | |
+++ b/cpp/src/parquet/file_reader.cc | |
@@ -41,12 +41,6 @@ | |
using std::string; | |
-namespace arrow { | |
- | |
-class Codec; | |
- | |
-} // namespace arrow | |
- | |
namespace parquet { | |
// PARQUET-978: Minimize footer reads by reading 64 KB from the end of the file | |
diff --git a/cpp/src/parquet/metadata-test.cc b/cpp/src/parquet/metadata-test.cc | |
index 53653bd78..bcf911eab 100644 | |
--- a/cpp/src/parquet/metadata-test.cc | |
+++ b/cpp/src/parquet/metadata-test.cc | |
@@ -16,9 +16,12 @@ | |
// under the License. | |
#include "parquet/metadata.h" | |
+ | |
#include <gtest/gtest.h> | |
+ | |
#include "parquet/schema.h" | |
#include "parquet/statistics.h" | |
+#include "parquet/thrift.h" | |
#include "parquet/types.h" | |
namespace parquet { | |
@@ -219,12 +222,36 @@ TEST(ApplicationVersion, Basics) { | |
ASSERT_EQ(true, version.VersionLt(version1)); | |
- ASSERT_FALSE(version1.HasCorrectStatistics(Type::INT96, SortOrder::UNKNOWN)); | |
- ASSERT_TRUE(version.HasCorrectStatistics(Type::INT32, SortOrder::SIGNED)); | |
- ASSERT_FALSE(version.HasCorrectStatistics(Type::BYTE_ARRAY, SortOrder::SIGNED)); | |
- ASSERT_TRUE(version1.HasCorrectStatistics(Type::BYTE_ARRAY, SortOrder::SIGNED)); | |
+ EncodedStatistics stats; | |
+ ASSERT_FALSE(version1.HasCorrectStatistics(Type::INT96, stats, SortOrder::UNKNOWN)); | |
+ ASSERT_TRUE(version.HasCorrectStatistics(Type::INT32, stats, SortOrder::SIGNED)); | |
+ ASSERT_FALSE(version.HasCorrectStatistics(Type::BYTE_ARRAY, stats, SortOrder::SIGNED)); | |
+ ASSERT_TRUE(version1.HasCorrectStatistics(Type::BYTE_ARRAY, stats, SortOrder::SIGNED)); | |
+ ASSERT_FALSE( | |
+ version1.HasCorrectStatistics(Type::BYTE_ARRAY, stats, SortOrder::UNSIGNED)); | |
+ ASSERT_TRUE(version3.HasCorrectStatistics(Type::FIXED_LEN_BYTE_ARRAY, stats, | |
+ SortOrder::SIGNED)); | |
+ | |
+ // Check that the old stats are correct if min and max are the same | |
+ // regardless of sort order | |
+ EncodedStatistics stats_str; | |
+ stats_str.set_min("a").set_max("b"); | |
+ ASSERT_FALSE( | |
+ version1.HasCorrectStatistics(Type::BYTE_ARRAY, stats_str, SortOrder::UNSIGNED)); | |
+ stats_str.set_max("a"); | |
+ ASSERT_TRUE( | |
+ version1.HasCorrectStatistics(Type::BYTE_ARRAY, stats_str, SortOrder::UNSIGNED)); | |
+ | |
+ // Check that the same holds true for ints | |
+ int32_t int_min = 100, int_max = 200; | |
+ EncodedStatistics stats_int; | |
+ stats_int.set_min(std::string(reinterpret_cast<const char*>(&int_min), 4)) | |
+ .set_max(std::string(reinterpret_cast<const char*>(&int_max), 4)); | |
+ ASSERT_FALSE( | |
+ version1.HasCorrectStatistics(Type::BYTE_ARRAY, stats_int, SortOrder::UNSIGNED)); | |
+ stats_int.set_max(std::string(reinterpret_cast<const char*>(&int_min), 4)); | |
ASSERT_TRUE( | |
- version3.HasCorrectStatistics(Type::FIXED_LEN_BYTE_ARRAY, SortOrder::SIGNED)); | |
+ version1.HasCorrectStatistics(Type::BYTE_ARRAY, stats_int, SortOrder::UNSIGNED)); | |
} | |
} // namespace metadata | |
diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc | |
index 9c66c7aab..f49393b60 100644 | |
--- a/cpp/src/parquet/metadata.cc | |
+++ b/cpp/src/parquet/metadata.cc | |
@@ -55,7 +55,8 @@ static std::shared_ptr<RowGroupStatistics> MakeTypedColumnStats( | |
return std::make_shared<TypedRowGroupStatistics<DType>>( | |
descr, metadata.statistics.min_value, metadata.statistics.max_value, | |
metadata.num_values - metadata.statistics.null_count, | |
- metadata.statistics.null_count, metadata.statistics.distinct_count, true); | |
+ metadata.statistics.null_count, metadata.statistics.distinct_count, | |
+ metadata.statistics.__isset.max_value || metadata.statistics.__isset.min_value); | |
} | |
// Default behavior | |
return std::make_shared<TypedRowGroupStatistics<DType>>( | |
@@ -100,7 +101,7 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl { | |
for (auto encoding : meta_data.encodings) { | |
encodings_.push_back(FromThrift(encoding)); | |
} | |
- stats_ = nullptr; | |
+ possible_stats_ = nullptr; | |
} | |
~ColumnChunkMetaDataImpl() {} | |
@@ -125,15 +126,22 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl { | |
// Eg: UTF8 | |
inline bool is_stats_set() const { | |
DCHECK(writer_version_ != nullptr); | |
- return column_->meta_data.__isset.statistics && | |
- writer_version_->HasCorrectStatistics(type(), descr_->sort_order()); | |
+ // If the column statistics don't exist or column sort order is unknown | |
+ // we cannot use the column stats | |
+ if (!column_->meta_data.__isset.statistics || | |
+ descr_->sort_order() == SortOrder::UNKNOWN) { | |
+ return false; | |
+ } | |
+ if (possible_stats_ == nullptr) { | |
+ possible_stats_ = MakeColumnStats(column_->meta_data, descr_); | |
+ } | |
+ EncodedStatistics encodedStatistics = possible_stats_->Encode(); | |
+ return writer_version_->HasCorrectStatistics(type(), encodedStatistics, | |
+ descr_->sort_order()); | |
} | |
inline std::shared_ptr<RowGroupStatistics> statistics() const { | |
- if (stats_ == nullptr && is_stats_set()) { | |
- stats_ = MakeColumnStats(column_->meta_data, descr_); | |
- } | |
- return stats_; | |
+ return is_stats_set() ? possible_stats_ : nullptr; | |
} | |
inline Compression::type compression() const { | |
@@ -169,7 +177,7 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl { | |
} | |
private: | |
- mutable std::shared_ptr<RowGroupStatistics> stats_; | |
+ mutable std::shared_ptr<RowGroupStatistics> possible_stats_; | |
std::vector<Encoding::type> encodings_; | |
const format::ColumnChunk* column_; | |
const ColumnDescriptor* descr_; | |
@@ -530,11 +538,16 @@ bool ApplicationVersion::VersionEq(const ApplicationVersion& other_version) cons | |
// parquet-mr/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java | |
// PARQUET-686 has more disussion on statistics | |
bool ApplicationVersion::HasCorrectStatistics(Type::type col_type, | |
+ EncodedStatistics& statistics, | |
SortOrder::type sort_order) const { | |
// Parquet cpp version 1.3.0 onwards stats are computed correctly for all types | |
if ((application_ != "parquet-cpp") || (VersionLt(PARQUET_CPP_FIXED_STATS_VERSION()))) { | |
- // Only SIGNED are valid | |
- if (SortOrder::SIGNED != sort_order) { | |
+ // Only SIGNED are valid unless max and min are the same | |
+ // (in which case the sort order does not matter) | |
+ bool max_equals_min = statistics.has_min && statistics.has_max | |
+ ? statistics.min() == statistics.max() | |
+ : false; | |
+ if (SortOrder::SIGNED != sort_order && !max_equals_min) { | |
return false; | |
} | |
diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h | |
index 79f4fdb35..7e29fe91a 100644 | |
--- a/cpp/src/parquet/metadata.h | |
+++ b/cpp/src/parquet/metadata.h | |
@@ -85,7 +85,7 @@ class ApplicationVersion { | |
bool VersionEq(const ApplicationVersion& other_version) const; | |
// Checks if the Version has the correct statistics for a given column | |
- bool HasCorrectStatistics(Type::type primitive, | |
+ bool HasCorrectStatistics(Type::type primitive, EncodedStatistics& statistics, | |
SortOrder::type sort_order = SortOrder::SIGNED) const; | |
}; | |
diff --git a/cpp/src/parquet/public-api-test.cc b/cpp/src/parquet/public-api-test.cc | |
index 958e97016..c0ef97a70 100644 | |
--- a/cpp/src/parquet/public-api-test.cc | |
+++ b/cpp/src/parquet/public-api-test.cc | |
@@ -17,10 +17,10 @@ | |
#include <gtest/gtest.h> | |
-#include "parquet/api/io.h" | |
-#include "parquet/api/reader.h" | |
-#include "parquet/api/schema.h" | |
-#include "parquet/api/writer.h" | |
+#include "parquet/api/io.h" // IWYU pragma: keep | |
+#include "parquet/api/reader.h" // IWYU pragma: keep | |
+#include "parquet/api/schema.h" // IWYU pragma: keep | |
+#include "parquet/api/writer.h" // IWYU pragma: keep | |
TEST(TestPublicAPI, DoesNotIncludeThrift) { | |
#ifdef _THRIFT_THRIFT_H_ | |
diff --git a/cpp/src/parquet/thrift.h b/cpp/src/parquet/thrift.h | |
index 217cc76c0..9c665acfa 100644 | |
--- a/cpp/src/parquet/thrift.h | |
+++ b/cpp/src/parquet/thrift.h | |
@@ -44,7 +44,7 @@ | |
#include "parquet/exception.h" | |
#include "parquet/util/memory.h" | |
-#include "parquet/parquet_types.h" | |
+#include "parquet/parquet_types.h" // IYWU pragma: export | |
namespace parquet { | |
diff --git a/cpp/src/parquet/util/memory.cc b/cpp/src/parquet/util/memory.cc | |
index 5c76cd8a6..fde424aaf 100644 | |
--- a/cpp/src/parquet/util/memory.cc | |
+++ b/cpp/src/parquet/util/memory.cc | |
@@ -32,31 +32,32 @@ | |
#include "parquet/types.h" | |
using arrow::MemoryPool; | |
+using arrow::util::Codec; | |
namespace parquet { | |
-std::unique_ptr<::arrow::Codec> GetCodecFromArrow(Compression::type codec) { | |
- std::unique_ptr<::arrow::Codec> result; | |
+std::unique_ptr<Codec> GetCodecFromArrow(Compression::type codec) { | |
+ std::unique_ptr<Codec> result; | |
switch (codec) { | |
case Compression::UNCOMPRESSED: | |
break; | |
case Compression::SNAPPY: | |
- PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::SNAPPY, &result)); | |
+ PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::SNAPPY, &result)); | |
break; | |
case Compression::GZIP: | |
- PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::GZIP, &result)); | |
+ PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::GZIP, &result)); | |
break; | |
case Compression::LZO: | |
- PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::LZO, &result)); | |
+ PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::LZO, &result)); | |
break; | |
case Compression::BROTLI: | |
- PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::BROTLI, &result)); | |
+ PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::BROTLI, &result)); | |
break; | |
case Compression::LZ4: | |
- PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::LZ4, &result)); | |
+ PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::LZ4, &result)); | |
break; | |
case Compression::ZSTD: | |
- PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::ZSTD, &result)); | |
+ PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::ZSTD, &result)); | |
break; | |
default: | |
break; | |
diff --git a/cpp/src/parquet/util/memory.h b/cpp/src/parquet/util/memory.h | |
index 2eadb3326..cccafe8cb 100644 | |
--- a/cpp/src/parquet/util/memory.h | |
+++ b/cpp/src/parquet/util/memory.h | |
@@ -37,15 +37,17 @@ | |
#include "parquet/util/visibility.h" | |
namespace arrow { | |
+namespace util { | |
class Codec; | |
+} // namespace util | |
} // namespace arrow | |
namespace parquet { | |
PARQUET_EXPORT | |
-std::unique_ptr<::arrow::Codec> GetCodecFromArrow(Compression::type codec); | |
+std::unique_ptr<::arrow::util::Codec> GetCodecFromArrow(Compression::type codec); | |
static constexpr int64_t kInMemoryDefaultCapacity = 1024; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment