diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java index fad14184fa5..d2ab14f5aab 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java @@ -144,7 +144,7 @@ public int getRecordCount() { private IterOutcome doWork(BatchStatusWrappper batchStatus, boolean newSchema) { Preconditions.checkArgument(batchStatus.batch.getSchema().getFieldCount() == container.getSchema().getFieldCount(), - "Input batch and output batch have different field counthas!"); + "Input batch and output batch have different field counts!"); if (newSchema) { createUnionAller(batchStatus.batch); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java index d2c864683af..c109b573f59 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.resolver; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -751,6 +752,17 @@ public static MinorType getLeastRestrictiveType(MinorType... types) { return result; } + public static MajorType getLeastRestrictiveMajorType(MajorType... majorTypes) { + MinorType[] minorTypes = Arrays.stream(majorTypes).map(MajorType::getMinorType).toArray(MinorType[]::new); + DataMode[] dataModes = Arrays.stream(majorTypes).map(MajorType::getMode).toArray(DataMode[]::new); + MinorType leastRestrictiveMinorType = getLeastRestrictiveType(minorTypes); + DataMode leastRestrictiveDataMode = getLeastRestrictiveDataMode(dataModes); + return MajorType.newBuilder() + .setMinorType(leastRestrictiveMinorType) + .setMode(leastRestrictiveDataMode) + .build(); + } + /** * Finds the type in a given set that has the cheapest cast from a given * starting type. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java index 9bc53d6e788..50921554fca 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java @@ -335,7 +335,9 @@ private Map createReaderAndImplicitColumns(ExecutorFragmentConte ccf, footer, rowGroupScan.getColumns(), - containsCorruptDates); + containsCorruptDates, + // each parquet SubScan shares the same table schema constructed by a GroupScan + rowGroupScan.getSchema()); } logger.debug("Query {} uses {}", diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java index 72420c29fd2..b26ff86b9cc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java @@ -19,6 +19,7 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; import org.apache.drill.exec.planner.common.DrillStatsTable; import org.apache.drill.exec.record.SchemaUtil; import org.apache.drill.metastore.metadata.BaseTableMetadata; @@ -52,6 +53,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; import com.google.common.primitives.Longs; import org.apache.hadoop.fs.Path; import org.apache.parquet.io.api.Binary; @@ -661,6 +663,12 @@ static Map resolveFields(MetadataBase.ParquetT // row groups in the file have the same schema, so using the first one Map fileColumns = getFileFields(parquetTableMetadata, file); fileColumns.forEach((columnPath, type) -> putType(columns, columnPath, type)); + // If at least 1 parquet file to read doesn't contain a column, enforce this column + // DataMode to OPTIONAL in the overall table schema + for (SchemaPath column: Sets.symmetricDifference(columns.keySet(), fileColumns.keySet())) { + TypeProtos.MinorType minorType = columns.get(column).getMinorType(); + columns.put(column, Types.optional(minorType)); + } } return columns; } @@ -680,13 +688,7 @@ private static void putType(Map columns, Schem if (majorType == null) { columns.put(columnPath, type); } else if (!majorType.equals(type)) { - TypeProtos.MinorType leastRestrictiveType = TypeCastRules.getLeastRestrictiveType( - majorType.getMinorType(), - type.getMinorType() - ); - if (leastRestrictiveType != majorType.getMinorType()) { - columns.put(columnPath, type); - } + columns.put(columnPath, TypeCastRules.getLeastRestrictiveMajorType(majorType, type)); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java index d22c07d7bcd..aae079d01ff 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.store.parquet.columnreaders; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.vector.BigIntVector; import org.apache.drill.exec.vector.BitVector; @@ -73,7 +74,10 @@ static ColumnReader createFixedColumnReader(ParquetRecordReader recordReader, ConvertedType convertedType = schemaElement.getConverted_type(); // if the column is required, or repeated (in which case we just want to use this to generate our appropriate // ColumnReader for actually transferring data into the data vector inside of our repeated vector - if (descriptor.getMaxDefinitionLevel() == 0 || descriptor.getMaxRepetitionLevel() > 0) { + // Choose a reader based on a ValueVector DataMode since we might want to put + // parquet's REQUIRED column into a Drill's OPTIONAL ValueVector + // see ParquetSchema#tableSchema for details + if (v.getField().getDataMode() != TypeProtos.DataMode.OPTIONAL) { return getColumnReader(recordReader, fixedLength, descriptor, columnChunkMetaData, v, schemaElement, convertedType); } else { // if the column is nullable return getNullableColumnReader(recordReader, descriptor, @@ -86,8 +90,11 @@ static VarLengthValuesColumn getReader(ParquetRecordReader parentReader, Colu SchemaElement schemaElement ) throws ExecutionSetupException { ConvertedType convertedType = schemaElement.getConverted_type(); - switch (descriptor.getMaxDefinitionLevel()) { - case 0: + // Choose a reader based on a ValueVector DataMode since we might want to put + // parquet's REQUIRED column into a Drill's OPTIONAL ValueVector + // see ParquetSchema#tableSchema for details + switch (v.getField().getDataMode()) { + case REQUIRED: if (convertedType == null) { return new VarLengthColumnReaders.VarBinaryColumn(parentReader, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java index 738d13c4a82..b6af0528c16 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java @@ -488,6 +488,11 @@ protected int decodeLevels() throws IOException { ValuesReader dlReader = dlEncoding.getValuesReader(columnDescriptor, ValuesType.DEFINITION_LEVEL); dlReader.initFromPage(pageValueCount, dataStream); this.definitionLevels = new ValuesReaderIntIterator(dlReader); + } else { + // Even if all values in a page are REQUIRED, still initialize definitionLevels this way + // to be able to read such a column with NullableColumnReader and treat each value + // definition as 1 + this.definitionLevels = () -> 1; } dataOffset = (int) dataStream.position(); @@ -511,6 +516,11 @@ protected int decodeLevels() throws IOException { maxDefLevel, BytesInput.from(pageData.nioBuffer(repLevelLen, defLevelLen)) ); + } else { + // Even if all values in a page are REQUIRED, still initialize definitionLevels this way + // to be able to read such a column with NullableColumnReader and treat each value + // definition as 1 + this.definitionLevels = () -> 1; } dataOffset = repLevelLen + defLevelLen; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java index 62b3b56c38c..549b725848a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java @@ -56,10 +56,10 @@ public ParquetColumnMetadata(ColumnDescriptor column) { this.column = column; } - public void resolveDrillType(Map schemaElements, OptionManager options) { + public void resolveDrillType(Map schemaElements, OptionManager options, boolean isEnforcedOptional) { se = schemaElements.get(ParquetReaderUtility.getFullColumnPath(column)); type = ParquetToDrillTypeConverter.toMajorType(column.getType(), column.getTypeLength(), - getDataMode(column), se, options); + isEnforcedOptional ? DataMode.OPTIONAL : getDataMode(column), se, options); field = MaterializedField.create(toFieldName(column.getPath()).getLastSegment().getNameSegment().getPath(), type); length = getDataTypeLength(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java index eeca2cabfe6..56aea74aebb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.parquet.columnreaders; +import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.store.CommonParquetRecordReader; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; @@ -72,6 +73,11 @@ public class ParquetRecordReader extends CommonParquetRecordReader { private final boolean useBulkReader; + /** + * See {@link ParquetSchema#tableSchema} + */ + private final TupleMetadata tableSchema; + public ParquetRecordReader(FragmentContext fragmentContext, Path path, int rowGroupIndex, @@ -80,8 +86,8 @@ public ParquetRecordReader(FragmentContext fragmentContext, CompressionCodecFactory codecFactory, ParquetMetadata footer, List columns, - ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) { - this(fragmentContext, numRecordsToRead, path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus); + ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus, TupleMetadata tableSchema) { + this(fragmentContext, numRecordsToRead, path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus, tableSchema); } public ParquetRecordReader(FragmentContext fragmentContext, @@ -91,9 +97,9 @@ public ParquetRecordReader(FragmentContext fragmentContext, CompressionCodecFactory codecFactory, ParquetMetadata footer, List columns, - ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) { + ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus, TupleMetadata tableSchema) { this(fragmentContext, footer.getBlocks().get(rowGroupIndex).getRowCount(), path, rowGroupIndex, fs, codecFactory, - footer, columns, dateCorruptionStatus); + footer, columns, dateCorruptionStatus, tableSchema); } public ParquetRecordReader( @@ -105,13 +111,14 @@ public ParquetRecordReader( CompressionCodecFactory codecFactory, ParquetMetadata footer, List columns, - ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) { + ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus, TupleMetadata tableSchema) { super(footer, fragmentContext); this.hadoopPath = path; this.fileSystem = fs; this.codecFactory = codecFactory; this.rowGroupIndex = rowGroupIndex; this.dateCorruptionStatus = dateCorruptionStatus; + this.tableSchema = tableSchema; this.numRecordsToRead = initNumRecordsToRead(numRecordsToRead, rowGroupIndex, footer); this.useAsyncColReader = fragmentContext.getOptions().getOption(ExecConstants.PARQUET_COLUMNREADER_ASYNC).bool_val; this.useAsyncPageReader = fragmentContext.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val; @@ -185,7 +192,7 @@ public ReadState getReadState() { @Override public void setup(OperatorContext operatorContext, OutputMutator output) throws ExecutionSetupException { this.operatorContext = operatorContext; - ParquetSchema schema = new ParquetSchema(fragmentContext.getOptions(), rowGroupIndex, footer, isStarQuery() ? null : getColumns()); + ParquetSchema schema = new ParquetSchema(fragmentContext.getOptions(), rowGroupIndex, footer, isStarQuery() ? null : getColumns(), tableSchema); batchSizerMgr = new RecordBatchSizerManager(fragmentContext.getOptions(), schema, numRecordsToRead, new RecordBatchStatsContext(fragmentContext, operatorContext)); logger.debug("Reading {} records from row group({}) in file {}.", numRecordsToRead, rowGroupIndex, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java index 53bced71012..db59ddc6e1a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java @@ -32,9 +32,10 @@ import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.store.parquet.ParquetReaderUtility; -import org.apache.drill.exec.vector.NullableIntVector; +import org.apache.drill.exec.vector.ValueVector; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.format.SchemaElement; import org.apache.parquet.hadoop.metadata.BlockMetaData; @@ -64,6 +65,19 @@ public final class ParquetSchema { private final int rowGroupIndex; private final ParquetMetadata footer; + /** + * Schema for the whole table constructed by a GroupScan from all the parquet files to read. + * If we don't find a selected column in our parquet file, type for the null-filled vector + * to create would be tried to find in this schema. That is, if some other parquet file contains + * the column, we'll take their type. Otherwise, default to Nullable Int. + * Also, if at least 1 file does not contain the selected column, then the overall table schema + * should have this field with OPTIONAL data mode. GroupScan catches this case and sets the + * appropriate data mode in this schema. Our mission here is to enforce that OPTIONAL mode in our + * output schema, even if the particular parquet file we're reading from has this field REQUIRED, + * to provide consistency across all scan batches. + */ + private final TupleMetadata tableSchema; + /** * List of metadata for selected columns. This list does two things. * First, it identifies the Parquet columns we wish to select. Second, it @@ -91,11 +105,12 @@ public final class ParquetSchema { * this is a SELECT * query */ - public ParquetSchema(OptionManager options, int rowGroupIndex, ParquetMetadata footer, Collection selectedCols) { + public ParquetSchema(OptionManager options, int rowGroupIndex, ParquetMetadata footer, Collection selectedCols, TupleMetadata tableSchema) { this.options = options; this.rowGroupIndex = rowGroupIndex; this.selectedCols = selectedCols; this.footer = footer; + this.tableSchema = tableSchema; if (selectedCols == null) { columnsFound = null; } else { @@ -127,7 +142,7 @@ private void loadParquetSchema() { // loop to add up the length of the fixed width columns and build the schema for (ColumnDescriptor column : footer.getFileMetaData().getSchema().getColumns()) { ParquetColumnMetadata columnMetadata = new ParquetColumnMetadata(column); - columnMetadata.resolveDrillType(schemaElements, options); + columnMetadata.resolveDrillType(schemaElements, options, shouldEnforceOptional(column)); if (!columnSelected(column)) { continue; } @@ -135,6 +150,15 @@ private void loadParquetSchema() { } } + private boolean shouldEnforceOptional(ColumnDescriptor column) { + String columnName = SchemaPath.getCompoundPath(column.getPath()).getAsUnescapedPath(); + MaterializedField tableField; + if (tableSchema == null || (tableField = tableSchema.column(columnName)) == null) { + return false; + } + return tableField.getDataMode() == DataMode.OPTIONAL; + } + /** * Fixed-width fields are the easiest to plan. We know the size of each column, * making it easy to determine the total length of each vector, once we know @@ -206,7 +230,7 @@ private boolean columnSelected(ColumnDescriptor column) { * @throws SchemaChangeException should not occur */ - public void createNonExistentColumns(OutputMutator output, List nullFilledVectors) throws SchemaChangeException { + public void createNonExistentColumns(OutputMutator output, List nullFilledVectors) throws SchemaChangeException { List projectedColumns = Lists.newArrayList(selectedCols); for (int i = 0; i < columnsFound.length; i++) { SchemaPath col = projectedColumns.get(i); @@ -227,12 +251,14 @@ public void createNonExistentColumns(OutputMutator output, List buildChunkMap(BlockMetaData rowGroupMetadata) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java index 1325ddd5e51..f6b30d7a77a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java @@ -25,7 +25,6 @@ import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.store.parquet.ParquetReaderStats; import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager; -import org.apache.drill.exec.vector.NullableIntVector; import org.apache.drill.exec.vector.ValueVector; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.hadoop.metadata.BlockMetaData; @@ -51,7 +50,7 @@ public class ReadState { * at that position in the schema. Currently this requires a vector be present. Here is a list of all of these vectors * that need only have their value count set at the end of each call to next(), as the values default to null. */ - private List nullFilledVectors; + private List nullFilledVectors; private List> fixedLenColumnReaders = new ArrayList<>(); private final long totalNumRecordsToRead; // number of records to read @@ -229,4 +228,4 @@ public void close() { varLengthReader = null; } } -} \ No newline at end of file +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java index e9850dae3ad..cf718e8b18d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java @@ -54,6 +54,7 @@ import java.io.OutputStream; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; @@ -279,15 +280,16 @@ private Pair createMetaFilesR } else { for (ColumnTypeMetadata_v4.Key key : subTableColumnTypeInfo.keySet()) { ColumnTypeMetadata_v4 columnTypeMetadata_v4 = columnTypeInfoSet.get(key); - if (columnTypeMetadata_v4 == null) { - columnTypeMetadata_v4 = subTableColumnTypeInfo.get(key); + ColumnTypeMetadata_v4 subTableColumnTypeMetadata_v4 = subTableColumnTypeInfo.get(key); + if (columnTypeMetadata_v4 == null || columnTypeMetadata_v4.repetition.isMoreRestrictiveThan(subTableColumnTypeMetadata_v4.repetition)) { + columnTypeMetadata_v4 = subTableColumnTypeMetadata_v4; } else { // If the existing total null count or the null count of the child file is unknown(-1), update the total null count // as unknown - if (subTableColumnTypeInfo.get(key).totalNullCount < 0 || columnTypeMetadata_v4.totalNullCount < 0) { + if (subTableColumnTypeMetadata_v4.totalNullCount < 0 || columnTypeMetadata_v4.totalNullCount < 0) { columnTypeMetadata_v4.totalNullCount = NULL_COUNT_NOT_EXISTS; } else { - columnTypeMetadata_v4.totalNullCount = columnTypeMetadata_v4.totalNullCount + subTableColumnTypeInfo.get(key).totalNullCount; + columnTypeMetadata_v4.totalNullCount = columnTypeMetadata_v4.totalNullCount + subTableColumnTypeMetadata_v4.totalNullCount; } } columnTypeInfoSet.put(key, columnTypeMetadata_v4); @@ -516,10 +518,29 @@ public static ParquetFileAndRowCountMetadata getParquetFileMetadata_v4(ParquetTa FileMetadataCollector metadataCollector = new FileMetadataCollector(metadata, file, fs, allColumnsInteresting, skipNonInteresting, columnSet, readerConfig); - parquetTableMetadata.metadataSummary.columnTypeInfo.putAll(metadataCollector.getColumnTypeInfo()); + mergeColumns(parquetTableMetadata.metadataSummary.columnTypeInfo, metadataCollector.getColumnTypeInfo()); return metadataCollector.getFileMetadata(); } + /** + * Merges myColumns into resultColumns map with the least restrictive repetition resolution + * @param resultColumns - overall columns map from all the files + * @param myColumns - columns from a particular file to merge into resultColumns + */ + private static synchronized void mergeColumns(Map resultColumns, + Map myColumns) { + Map columnsToMerge = new HashMap<>(myColumns); + for (ColumnTypeMetadata_v4.Key key: columnsToMerge.keySet()) { + ColumnTypeMetadata_v4 columnToMerge = columnsToMerge.get(key); + ColumnTypeMetadata_v4 resultColumn = resultColumns.get(key); + if (resultColumn != null && columnToMerge.repetition.isMoreRestrictiveThan(resultColumn.repetition)) { + columnToMerge.repetition = resultColumn.repetition; + } + } + resultColumns.putAll(columnsToMerge); + } + + /** * Serialize parquet metadata to json and write to a file. * diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestParquetMissingColumns.java b/exec/java-exec/src/test/java/org/apache/drill/TestParquetMissingColumns.java new file mode 100644 index 00000000000..c914d87c17b --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/TestParquetMissingColumns.java @@ -0,0 +1,91 @@ +package org.apache.drill; + +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.BatchSchemaBuilder; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterTest; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Covers selecting completely missing columns from a parquet table. Should create Nullable Int + * ValueVector in that case since there is no chance to guess the correct data type here. + */ +public class TestParquetMissingColumns extends ClusterTest { + + private static final TypeProtos.MajorType NULLABLE_INT_TYPE = Types.optional(TypeProtos.MinorType.INT); + + @BeforeClass + public static void setup() throws Exception { + startCluster(ClusterFixture.builder(dirTestWatcher)); + } + + @Test + public void testCoalesceOnNotExistentColumns() throws Exception { + String query = "select coalesce(cunk1, unk2) as coal from cp.`tpch/nation.parquet` limit 5"; + SchemaBuilder schemaBuilder = new SchemaBuilder() + .add("coal", NULLABLE_INT_TYPE); + BatchSchema expectedSchema = new BatchSchemaBuilder() + .withSchemaBuilder(schemaBuilder) + .build(); + + testBuilder() + .sqlQuery(query) + .schemaBaseLine(expectedSchema) + .go(); + + testBuilder() + .sqlQuery(query) + .unOrdered() + .baselineColumns("coal") + .baselineValuesForSingleColumn(null, null, null, null, null) + .go(); + } + + @Test + public void testCoalesceOnNotExistentColumnsWithGroupBy() throws Exception { + String query = "select coalesce(unk1, unk2) as coal from cp.`tpch/nation.parquet` group by 1"; + SchemaBuilder schemaBuilder = new SchemaBuilder() + .add("coal", NULLABLE_INT_TYPE); + BatchSchema expectedSchema = new BatchSchemaBuilder() + .withSchemaBuilder(schemaBuilder) + .build(); + + testBuilder() + .sqlQuery(query) + .schemaBaseLine(expectedSchema) + .go(); + + testBuilder() + .sqlQuery(query) + .unOrdered() + .baselineColumns("coal") + .baselineValuesForSingleColumn(new Object[] {null}) + .go(); + } + + @Test + public void testCoalesceOnNotExistentColumnsWithOrderBy() throws Exception { + String query = "select coalesce(unk1, unk2) as coal from cp.`tpch/nation.parquet` order by 1 limit 5"; + SchemaBuilder schemaBuilder = new SchemaBuilder() + .add("coal", NULLABLE_INT_TYPE); + BatchSchema expectedSchema = new BatchSchemaBuilder() + .withSchemaBuilder(schemaBuilder) + .build(); + + testBuilder() + .sqlQuery(query) + .schemaBaseLine(expectedSchema) + .go(); + + testBuilder() + .sqlQuery(query) + .unOrdered() + .baselineColumns("coal") + .baselineValuesForSingleColumn(null, null, null, null, null) + .go(); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestParquetPartiallyMissingColumns.java b/exec/java-exec/src/test/java/org/apache/drill/TestParquetPartiallyMissingColumns.java new file mode 100644 index 00000000000..7e316fdf57a --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/TestParquetPartiallyMissingColumns.java @@ -0,0 +1,128 @@ +package org.apache.drill; + +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.BatchSchemaBuilder; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterTest; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.nio.file.Paths; + +/** + * Covers querying a table in which some parquet files do contain selected columns, and + * others do not (or have them as OPTIONALs). + * + * Expected behavior for the missing columns is following: + * 1) If at least 1 parquet file to be read has the column, take the minor type from there. + * Otherwise, default to INT. + * 2) If at least 1 parquet file to be read doesn't have the column, or has it as OPTIONAL, + * enforce the overall scan output schema to have it as OPTIONAL + * + * We need to control ordering of scanning batches to cover different erroneous cases, and we assume + * that parquet files in a table would be read in alphabetic order (not a real use case though). So + * we name our files 0.parquet and 1.parquet expecting that they would be scanned in that order + * (not guaranteed though, but seems to work). We use such tables for such scenarios: + * + * - parquet/partially_missing/o_m -- optional, then missing + * - parquet/partially_missing/m_o -- missing, then optional + * - parquet/partially_missing/r_m -- required, then missing + * - parquet/partially_missing/r_o -- required, then optional + * + * These tables have these parquet files with such schemas: + * + * - parquet/partially_missing/o_m/0.parquet: id | name | age + * - parquet/partially_missing/o_m/1.parquet: id + * + * - parquet/partially_missing/m_0/0.parquet: id + * - parquet/partially_missing/m_0/1.parquet: id | name | age + * + * - parquet/partially_missing/r_m/0.parquet: id | name | age + * - parquet/partially_missing/r_m/1.parquet: id + * + * - parquet/partially_missing/r_o/0.parquet: id | name | age + * - parquet/partially_missing/r_o/1.parquet: id | name | age + * + * So, by querying "age" or "name" columns we would trigger both 0.parquet reader to read the data and + * 1.parquet reader to create the missing column vector. + */ +public class TestParquetPartiallyMissingColumns extends ClusterTest { + + private static final SchemaBuilder ageSchema = + new SchemaBuilder().add("age", Types.optional(TypeProtos.MinorType.INT)); + private static final SchemaBuilder nameSchema = + new SchemaBuilder().add("name", Types.optional(TypeProtos.MinorType.VARCHAR)); + + @BeforeClass + public static void setup() throws Exception { + startCluster(ClusterFixture.builder(dirTestWatcher)); + dirTestWatcher.copyResourceToRoot(Paths.get("parquet", "partially_missing")); + } + + /* + Field name for the missing column MUST NOT be quoted with back-ticks, so we should have ONLY ONE + column for that field (unquoted) + */ + + @Test + public void testMissingColumnNamingWithOrderBy() throws Exception { + test("SELECT age FROM dfs.`parquet/partially_missing/o_m` ORDER BY age", ageSchema); + } + + @Test + public void testMissingColumnNamingWithUnionAll() throws Exception { + test("SELECT age FROM dfs.`parquet/partially_missing/o_m` UNION ALL (VALUES (1))", ageSchema); + } + + /* + If at least 1 file in the table has the selected column, the overall scan output schema should + take the MinorType for the column from there (and not default to Int) + */ + + @Test + public void testMissingColumnTypeGuessWithOrderBy() throws Exception { + test("SELECT name FROM dfs.`parquet/partially_missing/o_m` ORDER BY name", nameSchema); + } + + @Test + public void testMissingColumnTypeGuessWithUnionAll() throws Exception { + test("SELECT name FROM dfs.`parquet/partially_missing/m_o` UNION ALL (VALUES ('Bob'))", nameSchema); + } + + /* + If at least 1 file in the table doesn't have the selected column, or has it as OPTIONAL, + the overall scan output schema should have this column as OPTIONAL + */ + + @Test + public void testEnforcingOptionalWithOrderBy() throws Exception { + test("SELECT age FROM dfs.`parquet/partially_missing/r_o` ORDER BY age", ageSchema); + test("SELECT age FROM dfs.`parquet/partially_missing/r_m` ORDER BY age", ageSchema); + test("SELECT name FROM dfs.`parquet/partially_missing/r_o` ORDER BY name", nameSchema); + test("SELECT name FROM dfs.`parquet/partially_missing/r_m` ORDER BY name", nameSchema); + } + + @Test + public void testEnforcingOptionalWithUnionAll() throws Exception { + test("SELECT age FROM dfs.`parquet/partially_missing/r_o` UNION ALL (VALUES (1))", ageSchema); + test("SELECT age FROM dfs.`parquet/partially_missing/r_m` UNION ALL (VALUES (1))", ageSchema); + test("SELECT name FROM dfs.`parquet/partially_missing/r_o` UNION ALL (VALUES ('Bob'))", nameSchema); + test("SELECT name FROM dfs.`parquet/partially_missing/r_m` UNION ALL (VALUES ('Bob'))", nameSchema); + } + + // Runs the query and verifies the result schema against the expected schema + private void test(String query, SchemaBuilder expectedSchemaBuilder) throws Exception { + BatchSchema expectedSchema = new BatchSchemaBuilder() + .withSchemaBuilder(expectedSchemaBuilder) + .build(); + + testBuilder() + .sqlQuery(query) + .schemaBaseLine(expectedSchema) + .go(); + } + +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUntypedNull.java b/exec/java-exec/src/test/java/org/apache/drill/TestUntypedNull.java index bb707c64af4..d959334a6c7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestUntypedNull.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestUntypedNull.java @@ -18,12 +18,7 @@ package org.apache.drill; import org.apache.drill.categories.SqlFunctionTest; -import org.apache.drill.common.types.TypeProtos; -import org.apache.drill.common.types.Types; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.record.BatchSchema; -import org.apache.drill.exec.record.BatchSchemaBuilder; -import org.apache.drill.exec.record.metadata.SchemaBuilder; import org.apache.drill.test.ClusterFixture; import org.apache.drill.test.ClusterFixtureBuilder; import org.apache.drill.test.ClusterTest; @@ -38,8 +33,6 @@ @Category(SqlFunctionTest.class) public class TestUntypedNull extends ClusterTest { - private static final TypeProtos.MajorType UNTYPED_NULL_TYPE = Types.optional(TypeProtos.MinorType.NULL); - @BeforeClass public static void setup() throws Exception { ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher); @@ -121,72 +114,6 @@ public void testTypeAndMode() throws Exception { assertEquals(0, summary.recordCount()); } - @Test - public void testCoalesceOnNotExistentColumns() throws Exception { - String query = "select coalesce(unk1, unk2) as coal from cp.`tpch/nation.parquet` limit 5"; - SchemaBuilder schemaBuilder = new SchemaBuilder() - .add("coal", UNTYPED_NULL_TYPE); - BatchSchema expectedSchema = new BatchSchemaBuilder() - .withSchemaBuilder(schemaBuilder) - .build(); - - testBuilder() - .sqlQuery(query) - .schemaBaseLine(expectedSchema) - .go(); - - testBuilder() - .sqlQuery(query) - .unOrdered() - .baselineColumns("coal") - .baselineValuesForSingleColumn(null, null, null, null, null) - .go(); - } - - @Test - public void testCoalesceOnNotExistentColumnsWithGroupBy() throws Exception { - String query = "select coalesce(unk1, unk2) as coal from cp.`tpch/nation.parquet` group by 1"; - SchemaBuilder schemaBuilder = new SchemaBuilder() - .add("coal", UNTYPED_NULL_TYPE); - BatchSchema expectedSchema = new BatchSchemaBuilder() - .withSchemaBuilder(schemaBuilder) - .build(); - - testBuilder() - .sqlQuery(query) - .schemaBaseLine(expectedSchema) - .go(); - - testBuilder() - .sqlQuery(query) - .unOrdered() - .baselineColumns("coal") - .baselineValuesForSingleColumn(new Object[] {null}) - .go(); - } - - @Test - public void testCoalesceOnNotExistentColumnsWithOrderBy() throws Exception { - String query = "select coalesce(unk1, unk2) as coal from cp.`tpch/nation.parquet` order by 1 limit 5"; - SchemaBuilder schemaBuilder = new SchemaBuilder() - .add("coal", UNTYPED_NULL_TYPE); - BatchSchema expectedSchema = new BatchSchemaBuilder() - .withSchemaBuilder(schemaBuilder) - .build(); - - testBuilder() - .sqlQuery(query) - .schemaBaseLine(expectedSchema) - .go(); - - testBuilder() - .sqlQuery(query) - .unOrdered() - .baselineColumns("coal") - .baselineValuesForSingleColumn(null, null, null, null, null) - .go(); - } - @Test public void testCoalesceOnNotExistentColumnsWithCoalesceInWhereClause() throws Exception { String query = "select coalesce(unk1, unk2) as coal from cp.`tpch/nation.parquet` where coalesce(unk1, unk2) > 10"; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java index 784a1820292..6f8c2f36f90 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java @@ -460,7 +460,8 @@ private RecordBatch getScanBatch() throws Exception { ccf, footer, columnsToRead, - ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_NO_CORRUPTION)); + ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_NO_CORRUPTION, + null)); } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestCaseNullableTypes.java b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestCaseNullableTypes.java index cf598275e99..02f8210d955 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestCaseNullableTypes.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestCaseNullableTypes.java @@ -157,25 +157,4 @@ public void testCaseNullableTypesDecimal() throws Exception { resetSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY); } } - - // Coalesce is being transformed to if-else cases - @Test - public void testCoalesceWithUntypedNullValues() throws Exception { - testBuilder() - .sqlQuery("select coalesce(coalesce(n_name1, n_name2, n_name), coalesce(n_name3, n_name4), n_name3) res from cp.`tpch/nation.parquet` limit 1") - .ordered() - .baselineColumns("res") - .baselineValues("ALGERIA") - .go(); - } - - @Test - public void testCoalesceWithUntypedNullValues_2() throws Exception { - testBuilder() - .sqlQuery("select coalesce(coalesce(n_name1, n_name2), n_name) res from cp.`tpch/nation.parquet` limit 1") - .ordered() - .baselineColumns("res") - .baselineValues("ALGERIA") - .go(); - } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java index 5f06d8bed01..5e625417c40 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java @@ -640,7 +640,7 @@ public void testPerformance() throws Exception { ); final ParquetRecordReader rr = new ParquetRecordReader(context, fileName, 0, fs, ccf, - f.getParquetMetadata(), columns, ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_CORRUPTION); + f.getParquetMetadata(), columns, ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_CORRUPTION, null); final TestOutputMutator mutator = new TestOutputMutator(allocator); rr.setup(null, mutator); final Stopwatch watch = Stopwatch.createStarted(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java index 6ab7bbe6744..00c3be6a687 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java @@ -550,7 +550,7 @@ public void testBooleanPredicate() throws Exception { final String queryNotEqualFalse = "select col_bln from dfs.`parquetFilterPush/blnTbl` where not col_bln = false"; testParquetFilterPD(queryNotEqualFalse, 4, 2, false); - final String queryEqualTrueWithAnd = "select col_bln from dfs.`parquetFilterPush/blnTbl` where col_bln = true and unk_col = 'a'"; + final String queryEqualTrueWithAnd = "select col_bln from dfs.`parquetFilterPush/blnTbl` where col_bln = true and convert_to(unk_col, 'UTF8') = 'a'"; testParquetFilterPD(queryEqualTrueWithAnd, 0, 2, false); // File ff1.parquet has column with the values: false, null, false. diff --git a/exec/java-exec/src/test/resources/parquet/partially_missing/m_o/0.parquet b/exec/java-exec/src/test/resources/parquet/partially_missing/m_o/0.parquet new file mode 100644 index 00000000000..543249e0b3d Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/partially_missing/m_o/0.parquet differ diff --git a/exec/java-exec/src/test/resources/parquet/partially_missing/m_o/1.parquet b/exec/java-exec/src/test/resources/parquet/partially_missing/m_o/1.parquet new file mode 100644 index 00000000000..75fd2db8ec9 Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/partially_missing/m_o/1.parquet differ diff --git a/exec/java-exec/src/test/resources/parquet/partially_missing/o_m/0.parquet b/exec/java-exec/src/test/resources/parquet/partially_missing/o_m/0.parquet new file mode 100644 index 00000000000..75fd2db8ec9 Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/partially_missing/o_m/0.parquet differ diff --git a/exec/java-exec/src/test/resources/parquet/partially_missing/o_m/1.parquet b/exec/java-exec/src/test/resources/parquet/partially_missing/o_m/1.parquet new file mode 100644 index 00000000000..543249e0b3d Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/partially_missing/o_m/1.parquet differ diff --git a/exec/java-exec/src/test/resources/parquet/partially_missing/r_m/0.parquet b/exec/java-exec/src/test/resources/parquet/partially_missing/r_m/0.parquet new file mode 100644 index 00000000000..4dfdc2a4978 Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/partially_missing/r_m/0.parquet differ diff --git a/exec/java-exec/src/test/resources/parquet/partially_missing/r_m/1.parquet b/exec/java-exec/src/test/resources/parquet/partially_missing/r_m/1.parquet new file mode 100644 index 00000000000..543249e0b3d Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/partially_missing/r_m/1.parquet differ diff --git a/exec/java-exec/src/test/resources/parquet/partially_missing/r_o/0.parquet b/exec/java-exec/src/test/resources/parquet/partially_missing/r_o/0.parquet new file mode 100644 index 00000000000..4dfdc2a4978 Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/partially_missing/r_o/0.parquet differ diff --git a/exec/java-exec/src/test/resources/parquet/partially_missing/r_o/1.parquet b/exec/java-exec/src/test/resources/parquet/partially_missing/r_o/1.parquet new file mode 100644 index 00000000000..5f0812d395a Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/partially_missing/r_o/1.parquet differ