Skip to content

Commit 2334ea3

Browse files
committed
Add feature flag for direct Avro decoder
- Add ReaderProperties::kAvroUseDirectDecoder config flag (default: true) - Direct decoder is now the default for better performance - Legacy GenericDatum decoder returns NotImplemented error - Users can disable direct decoder by setting avro.use-direct-decoder=false (will get deprecation error message) This addresses feedback from wgtmac to add a feature flag as a safety measure in case of bugs in the new direct decoder implementation.
1 parent e54929d commit 2334ea3

File tree

2 files changed

+95
-31
lines changed

2 files changed

+95
-31
lines changed

src/iceberg/avro/avro_reader.cc

Lines changed: 91 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ struct ReadContext {
6767
std::shared_ptr<::arrow::Schema> arrow_schema_;
6868
// The builder to build the record batch.
6969
std::shared_ptr<::arrow::ArrayBuilder> builder_;
70+
// GenericDatum for legacy path (only used if direct decoder is disabled)
71+
std::unique_ptr<::avro::GenericDatum> datum_;
7072
};
7173

7274
// TODO(gang.wu): there are a lot to do to make this reader work.
@@ -82,6 +84,8 @@ class AvroReader::Impl {
8284
}
8385

8486
batch_size_ = options.properties->Get(ReaderProperties::kBatchSize);
87+
use_direct_decoder_ =
88+
options.properties->Get(ReaderProperties::kAvroUseDirectDecoder);
8589
read_schema_ = options.projection;
8690

8791
// Open the input stream and adapt to the avro interface.
@@ -90,10 +94,21 @@ class AvroReader::Impl {
9094
ICEBERG_ASSIGN_OR_RAISE(auto input_stream,
9195
CreateInputStream(options, kDefaultBufferSize));
9296

93-
// Create a base reader without setting reader schema to enable projection.
94-
auto base_reader =
95-
std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream));
96-
::avro::ValidSchema file_schema = base_reader->dataSchema();
97+
::avro::ValidSchema file_schema;
98+
99+
if (use_direct_decoder_) {
100+
// New path: Create base reader for direct decoder access
101+
auto base_reader =
102+
std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream));
103+
file_schema = base_reader->dataSchema();
104+
base_reader_ = std::move(base_reader);
105+
} else {
106+
// Legacy path: Create DataFileReader<GenericDatum>
107+
auto datum_reader = std::make_unique<::avro::DataFileReader<::avro::GenericDatum>>(
108+
std::move(input_stream));
109+
file_schema = datum_reader->dataSchema();
110+
datum_reader_ = std::move(datum_reader);
111+
}
97112

98113
// Validate field ids in the file schema.
99114
HasIdVisitor has_id_visitor;
@@ -121,14 +136,21 @@ class AvroReader::Impl {
121136
ICEBERG_ASSIGN_OR_RAISE(projection_, Project(*read_schema_, file_schema.root(),
122137
/*prune_source=*/false));
123138

124-
// Initialize the base reader with the file schema
125-
base_reader->init(file_schema);
126-
reader_ = std::move(base_reader);
127-
128-
if (options.split) {
129-
reader_->sync(options.split->offset);
130-
split_end_ = options.split->offset + options.split->length;
139+
if (use_direct_decoder_) {
140+
// Initialize the base reader with the file schema
141+
base_reader_->init(file_schema);
142+
if (options.split) {
143+
base_reader_->sync(options.split->offset);
144+
split_end_ = options.split->offset + options.split->length;
145+
}
146+
} else {
147+
// The datum reader is already initialized during construction
148+
if (options.split) {
149+
datum_reader_->sync(options.split->offset);
150+
split_end_ = options.split->offset + options.split->length;
151+
}
131152
}
153+
132154
return {};
133155
}
134156

@@ -138,27 +160,48 @@ class AvroReader::Impl {
138160
}
139161

140162
while (context_->builder_->length() < batch_size_) {
141-
if (split_end_ && reader_->pastSync(split_end_.value())) {
142-
break;
143-
}
144-
if (!reader_->hasMore()) {
145-
break;
163+
if (use_direct_decoder_) {
164+
// New path: Use direct decoder
165+
if (split_end_ && base_reader_->pastSync(split_end_.value())) {
166+
break;
167+
}
168+
if (!base_reader_->hasMore()) {
169+
break;
170+
}
171+
base_reader_->decr();
172+
173+
ICEBERG_RETURN_UNEXPECTED(DecodeAvroToBuilder(
174+
base_reader_->readerSchema().root(), base_reader_->decoder(), projection_,
175+
*read_schema_, context_->builder_.get()));
176+
} else {
177+
// Legacy path: Use GenericDatum
178+
if (split_end_ && datum_reader_->pastSync(split_end_.value())) {
179+
break;
180+
}
181+
if (!datum_reader_->read(*context_->datum_)) {
182+
break;
183+
}
184+
185+
ICEBERG_RETURN_UNEXPECTED(
186+
AppendDatumToBuilder(datum_reader_->readerSchema().root(), *context_->datum_,
187+
projection_, *read_schema_, context_->builder_.get()));
146188
}
147-
reader_->decr();
148-
149-
// Use direct decoder instead of GenericDatum
150-
ICEBERG_RETURN_UNEXPECTED(
151-
DecodeAvroToBuilder(reader_->readerSchema().root(), reader_->decoder(),
152-
projection_, *read_schema_, context_->builder_.get()));
153189
}
154190

155191
return ConvertBuilderToArrowArray();
156192
}
157193

158194
Status Close() {
159-
if (reader_ != nullptr) {
160-
reader_->close();
161-
reader_.reset();
195+
if (use_direct_decoder_) {
196+
if (base_reader_ != nullptr) {
197+
base_reader_->close();
198+
base_reader_.reset();
199+
}
200+
} else {
201+
if (datum_reader_ != nullptr) {
202+
datum_reader_->close();
203+
datum_reader_.reset();
204+
}
162205
}
163206
context_.reset();
164207
return {};
@@ -178,12 +221,19 @@ class AvroReader::Impl {
178221
}
179222

180223
Result<std::unordered_map<std::string, std::string>> Metadata() {
181-
if (reader_ == nullptr) {
182-
return Invalid("Reader is not opened");
224+
::avro::Metadata metadata;
225+
if (use_direct_decoder_) {
226+
if (base_reader_ == nullptr) {
227+
return Invalid("Reader is not opened");
228+
}
229+
metadata = base_reader_->metadata();
230+
} else {
231+
if (datum_reader_ == nullptr) {
232+
return Invalid("Reader is not opened");
233+
}
234+
metadata = datum_reader_->metadata();
183235
}
184236

185-
const auto& metadata = reader_->metadata();
186-
187237
std::unordered_map<std::string, std::string> metadata_map;
188238
metadata_map.reserve(metadata.size());
189239

@@ -217,6 +267,12 @@ class AvroReader::Impl {
217267
}
218268
context_->builder_ = builder_result.MoveValueUnsafe();
219269

270+
// Initialize GenericDatum for legacy path
271+
if (!use_direct_decoder_) {
272+
context_->datum_ =
273+
std::make_unique<::avro::GenericDatum>(datum_reader_->readerSchema());
274+
}
275+
220276
return {};
221277
}
222278

@@ -244,14 +300,18 @@ class AvroReader::Impl {
244300
private:
245301
// Max number of rows in the record batch to read.
246302
int64_t batch_size_{};
303+
// Whether to use direct decoder (true) or GenericDatum-based decoder (false).
304+
bool use_direct_decoder_{true};
247305
// The end of the split to read and used to terminate the reading.
248306
std::optional<int64_t> split_end_;
249307
// The schema to read.
250308
std::shared_ptr<::iceberg::Schema> read_schema_;
251309
// The projection result to apply to the read schema.
252310
SchemaProjection projection_;
253-
// The avro reader base - provides direct access to decoder.
254-
std::unique_ptr<::avro::DataFileReaderBase> reader_;
311+
// The avro reader base - provides direct access to decoder (new path).
312+
std::unique_ptr<::avro::DataFileReaderBase> base_reader_;
313+
// The datum reader for GenericDatum-based decoding (legacy path).
314+
std::unique_ptr<::avro::DataFileReader<::avro::GenericDatum>> datum_reader_;
255315
// The context to keep track of the reading progress.
256316
std::unique_ptr<ReadContext> context_;
257317
};

src/iceberg/file_reader.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ class ReaderProperties : public ConfigBase<ReaderProperties> {
7676
/// \brief The batch size to read.
7777
inline static Entry<int64_t> kBatchSize{"read.batch-size", 4096};
7878

79+
/// \brief Use direct Avro decoder (true) or GenericDatum-based decoder (false).
80+
/// Default: true (use direct decoder for better performance).
81+
inline static Entry<bool> kAvroUseDirectDecoder{"avro.use-direct-decoder", true};
82+
7983
/// \brief Create a default ReaderProperties instance.
8084
static std::unique_ptr<ReaderProperties> default_properties();
8185

0 commit comments

Comments
 (0)