Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.CellSink;
Expand Down Expand Up @@ -174,6 +175,29 @@ public DefaultMobStoreCompactor(Configuration conf, HStore store) {
MobConstants.DEFAULT_MOB_COMPACTION_READ_CACHE_BLOCKS);
}

/**
* Resolves a MOB reference cell to its backing MOB value and returns an independent,
* heap-resident copy of the resolved cell.
* <p>
* A MOB cell resolved from a MOB file is backed by a {@code StoreFileScanner}; closing the
* {@link MobCell} closes that scanner and may release/recycle the NIO buffers referenced by the
* returned cell. We close the {@link MobCell} here to avoid leaking scanners/buffers while
* compacting many reference cells.
* <p>
* The {@link KeyValueUtil#copyToNewKeyValue(ExtendedCell)} call is required by this ownership
* model: HFile writers and encoders may retain references to appended cells (e.g.
* {@code lastCell}, {@code firstCellInBlock}, and the data block encoder's {@code prevCell})
* until {@code beforeShipped()}. Returning the scanner-backed cell directly would let those later
* reads access released buffers. Removing this copy would require changing the caller to retain
* each {@link MobCell} and close it only after the writers have shipped their retained
* references.
*/
protected ExtendedCell resolveMobCell(ExtendedCell reference) throws IOException {
try (MobCell mobCell = mobStore.resolve(reference, cacheMobBlocksOnCompaction, false)) {
Comment thread
liuxiaocs7 marked this conversation as resolved.
return KeyValueUtil.copyToNewKeyValue(mobCell.getCell());
}
}

@Override
public List<Path> compact(CompactionRequestImpl request,
ThroughputController throughputController, User user) throws IOException {
Expand Down Expand Up @@ -379,7 +403,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
String fName = MobUtils.getMobFileName(c);
// Added to support migration
try {
mobCell = mobStore.resolve(c, cacheMobBlocksOnCompaction, false).getCell();
mobCell = resolveMobCell(c);
} catch (DoNotRetryIOException e) {
if (
discardMobMiss && e.getCause() != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
String fName = MobUtils.getMobFileName(c);
// Added to support migration
try {
mobCell = mobStore.resolve(c, cacheMobBlocksOnCompaction, false).getCell();
mobCell = resolveMobCell(c);
} catch (DoNotRetryIOException e) {
if (
discardMobMiss && e.getCause() != null
Expand Down Expand Up @@ -269,7 +269,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
} else {
// If the value is not larger than the threshold, it's not regarded a mob. Retrieve
// the mob cell from the mob file, and write it back to the store file.
mobCell = mobStore.resolve(c, cacheMobBlocksOnCompaction, false).getCell();
mobCell = resolveMobCell(c);
if (mobCell.getValueLength() != 0) {
// put the mob data back to the store file
PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,21 @@
package org.apache.hadoop.hbase.mob;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

Expand All @@ -49,6 +55,27 @@ public void testCacheMobBlocksOnCompactionCanBeDisabled() {
assertFalse(compactor.cacheMobBlocksOnCompaction);
}

@Test
public void testResolveMobCellClosesMobCellAndReturnsIndependentCopy() throws Exception {
Configuration conf = new Configuration();
DefaultMobStoreCompactor compactor = newCompactor(conf);
ExtendedCell reference = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"),
Bytes.toBytes("qualifier"), Bytes.toBytes("mob-reference"));
ExtendedCell resolved = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"),
Bytes.toBytes("qualifier"), Bytes.toBytes("mob-value"));
MobCell mobCell = mock(MobCell.class);
when(mobCell.getCell()).thenReturn(resolved);
when(compactor.mobStore.resolve(reference, compactor.cacheMobBlocksOnCompaction, false))
.thenReturn(mobCell);

ExtendedCell copied = compactor.resolveMobCell(reference);

assertNotSame(resolved, copied);
assertTrue(CellUtil.matchingValue(resolved, copied));
verify(compactor.mobStore).resolve(reference, compactor.cacheMobBlocksOnCompaction, false);
verify(mobCell).close();
}

private DefaultMobStoreCompactor newCompactor(Configuration conf) {
HMobStore store = mock(HMobStore.class);
ColumnFamilyDescriptor family = mock(ColumnFamilyDescriptor.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.hadoop.hbase.io.crypto.MockAesKeyProvider;
import org.apache.hadoop.hbase.io.crypto.aes.AES;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.mob.MobCell;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobFileCache;
import org.apache.hadoop.hbase.mob.MobUtils;
Expand Down Expand Up @@ -435,13 +436,17 @@ public void testResolve() throws Exception {
Path targetPath = new Path(store.getPath(), targetPathName);
store.commitFile(mobFilePath, targetPath);
// resolve
Cell resultCell1 = store.resolve(seekKey1, false).getCell();
Cell resultCell2 = store.resolve(seekKey2, false).getCell();
Cell resultCell3 = store.resolve(seekKey3, false).getCell();
// compare
assertEquals(Bytes.toString(value), Bytes.toString(CellUtil.cloneValue(resultCell1)));
assertEquals(Bytes.toString(value), Bytes.toString(CellUtil.cloneValue(resultCell2)));
assertEquals(Bytes.toString(value2), Bytes.toString(CellUtil.cloneValue(resultCell3)));
try (MobCell resultCell1 = store.resolve(seekKey1, false);
MobCell resultCell2 = store.resolve(seekKey2, false);
MobCell resultCell3 = store.resolve(seekKey3, false)) {
// compare
assertEquals(Bytes.toString(value),
Bytes.toString(CellUtil.cloneValue(resultCell1.getCell())));
assertEquals(Bytes.toString(value),
Bytes.toString(CellUtil.cloneValue(resultCell2.getCell())));
assertEquals(Bytes.toString(value2),
Bytes.toString(CellUtil.cloneValue(resultCell3.getCell())));
}
}

@Test
Expand Down