From b6eefcaeb7e703e7c3c089681accc2c88273f10b Mon Sep 17 00:00:00 2001 From: Abhinaba Sarkar Date: Wed, 4 Mar 2020 23:14:43 +0530 Subject: [PATCH] HBASE-23788 ROW_INDEX_V1 encoder should consider the secondary index size with the encoded data size tracking (#1219) Signed-off-by Anoop Sam John --- .../io/encoding/BufferedDataBlockEncoder.java | 20 ++-- .../hadoop/hbase/io/encoding/DataBlockEncoder.java | 5 +- .../hadoop/hbase/io/encoding/EncodingState.java | 21 ++++ .../hadoop/hbase/io/encoding/RowIndexCodecV1.java | 4 +- .../hbase/io/encoding/RowIndexEncoderV1.java | 10 +- .../apache/hadoop/hbase/io/hfile/HFileBlock.java | 19 +--- .../hbase/io/hfile/HFileDataBlockEncoder.java | 3 +- .../hbase/io/hfile/HFileDataBlockEncoderImpl.java | 4 +- .../hbase/io/hfile/NoOpDataBlockEncoder.java | 8 +- .../hbase/io/hfile/TestRowIndexV1DataEncoder.java | 115 +++++++++++++++++++++ 10 files changed, 167 insertions(+), 42 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestRowIndexV1DataEncoder.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java index 755647d627..bcac9deac4 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java @@ -1122,21 +1122,16 @@ abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder { } } StreamUtils.writeInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding() - blkEncodingCtx.setEncodingState(new BufferedDataBlockEncodingState()); - } - - private static class BufferedDataBlockEncodingState extends EncodingState { - int unencodedDataSizeWritten = 0; + blkEncodingCtx.setEncodingState(new EncodingState()); } @Override - public int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out) + public void encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out) throws IOException { - BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx - .getEncodingState(); + EncodingState state = encodingCtx.getEncodingState(); + int posBeforeEncode = out.size(); int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out); - state.unencodedDataSizeWritten += encodedKvSize; - return encodedKvSize; + state.postCellEncode(encodedKvSize, out.size() - posBeforeEncode); } public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx, @@ -1145,12 +1140,11 @@ abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder { @Override public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out, byte[] uncompressedBytesWithHeader) throws IOException { - BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx - .getEncodingState(); + EncodingState state = encodingCtx.getEncodingState(); // Write the unencodedDataSizeWritten (with header size) Bytes.putInt(uncompressedBytesWithHeader, HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE, - state.unencodedDataSizeWritten); + state.getUnencodedDataSizeWritten()); postEncoding(encodingCtx); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java index d3c41fbac1..a6aafead50 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java @@ -50,9 +50,10 @@ public interface DataBlockEncoder { /** * Encodes a KeyValue. - * @return unencoded kv size written + * After the encode, {@link EncodingState#postCellEncode(int, int)} needs to be called to keep + * track of the encoded and unencoded data size */ - int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out) + void encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out) throws IOException; /** diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodingState.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodingState.java index e828e9811c..549987672d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodingState.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodingState.java @@ -32,6 +32,14 @@ public class EncodingState { */ protected Cell prevCell = null; + // Size of actual data being written. Not considering the block encoding/compression. This + // includes the header size also. + protected int unencodedDataSizeWritten = 0; + + // Size of actual data being written. considering the block encoding. This + // includes the header size also. + protected int encodedDataSizeWritten = 0; + public void beforeShipped() { if (this.prevCell != null) { // can't use KeyValueUtil#toNewKeyCell, because we need both key and value @@ -39,4 +47,17 @@ public class EncodingState { this.prevCell = KeyValueUtil.copyToNewKeyValue(this.prevCell); } } + + public void postCellEncode(int unencodedCellSizeWritten, int encodedCellSizeWritten) { + this.unencodedDataSizeWritten += unencodedCellSizeWritten; + this.encodedDataSizeWritten += encodedCellSizeWritten; + } + + public int getUnencodedDataSizeWritten() { + return unencodedDataSizeWritten; + } + + public int getEncodedDataSizeWritten() { + return encodedDataSizeWritten; + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexCodecV1.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexCodecV1.java index 5de20ddbf5..a2dd9ceb9b 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexCodecV1.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexCodecV1.java @@ -79,12 +79,12 @@ public class RowIndexCodecV1 extends AbstractDataBlockEncoder { } @Override - public int encode(Cell cell, HFileBlockEncodingContext encodingCtx, + public void encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out) throws IOException { RowIndexEncodingState state = (RowIndexEncodingState) encodingCtx .getEncodingState(); RowIndexEncoderV1 encoder = state.encoder; - return encoder.write(cell); + encoder.write(cell); } @Override diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexEncoderV1.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexEncoderV1.java index 711b9db30b..aa74bd2361 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexEncoderV1.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexEncoderV1.java @@ -15,6 +15,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.io.ByteArrayOutputStream; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,16 +39,21 @@ public class RowIndexEncoderV1 { this.context = encodingCtx; } - public int write(Cell cell) throws IOException { + public void write(Cell cell) throws IOException { // checkRow uses comparator to check we are writing in order. + int extraBytesForRowIndex = 0; + if (!checkRow(cell)) { if (startOffset < 0) { startOffset = out.size(); } rowsOffsetBAOS.writeInt(out.size() - startOffset); + // added for the int written in the previous line + extraBytesForRowIndex = Bytes.SIZEOF_INT; } lastCell = cell; - return encoder.write(cell); + int size = encoder.write(cell); + context.getEncodingState().postCellEncode(size, size + extraBytesForRowIndex); } protected boolean checkRow(final Cell cell) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index ec317e6f8e..6b145718f2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -798,14 +798,6 @@ public class HFileBlock implements Cacheable { */ private DataOutputStream userDataStream; - // Size of actual data being written. Not considering the block encoding/compression. This - // includes the header size also. - private int unencodedDataSizeWritten; - - // Size of actual data being written. considering the block encoding. This - // includes the header size also. - private int encodedDataSizeWritten; - /** * Bytes to be written to the file system, including the header. Compressed * if compression is turned on. It also includes the checksum data that @@ -911,8 +903,6 @@ public class HFileBlock implements Cacheable { if (newBlockType == BlockType.DATA) { this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream); } - this.unencodedDataSizeWritten = 0; - this.encodedDataSizeWritten = 0; return userDataStream; } @@ -921,10 +911,7 @@ public class HFileBlock implements Cacheable { */ void write(Cell cell) throws IOException{ expectState(State.WRITING); - int posBeforeEncode = this.userDataStream.size(); - this.unencodedDataSizeWritten += - this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, this.userDataStream); - this.encodedDataSizeWritten += this.userDataStream.size() - posBeforeEncode; + this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, this.userDataStream); } /** @@ -1155,7 +1142,7 @@ public class HFileBlock implements Cacheable { * @return the number of bytes written */ public int encodedBlockSizeWritten() { - return state != State.WRITING ? 0 : this.encodedDataSizeWritten; + return state != State.WRITING ? 0 : this.getEncodingState().getEncodedDataSizeWritten(); } /** @@ -1166,7 +1153,7 @@ public class HFileBlock implements Cacheable { * @return the number of bytes written */ int blockSizeWritten() { - return state != State.WRITING ? 0 : this.unencodedDataSizeWritten; + return state != State.WRITING ? 0 : this.getEncodingState().getUnencodedDataSizeWritten(); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java index 22dd6c404e..3c118da258 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java @@ -52,10 +52,9 @@ public interface HFileDataBlockEncoder { * @param cell * @param encodingCtx * @param out - * @return unencoded kv size * @throws IOException */ - int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out) + void encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out) throws IOException; /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java index 347b1f3c59..462064f7b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java @@ -91,9 +91,9 @@ public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder { } @Override - public int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out) + public void encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out) throws IOException { - return this.encoding.getEncoder().encode(cell, encodingCtx, out); + this.encoding.getEncoder().encode(cell, encodingCtx, out); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java index 06cc3e16f0..467480f68d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java @@ -47,12 +47,13 @@ public class NoOpDataBlockEncoder implements HFileDataBlockEncoder { } @Override - public int encode(Cell cell, HFileBlockEncodingContext encodingCtx, + public void encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out) throws IOException { NoneEncodingState state = (NoneEncodingState) encodingCtx .getEncodingState(); NoneEncoder encoder = state.encoder; - return encoder.write(cell); + int size = encoder.write(cell); + state.postCellEncode(size, size); } @Override @@ -99,7 +100,8 @@ public class NoOpDataBlockEncoder implements HFileDataBlockEncoder { + "encoding context."); } - HFileBlockDefaultEncodingContext encodingCtx = (HFileBlockDefaultEncodingContext) blkEncodingCtx; + HFileBlockDefaultEncodingContext encodingCtx = + (HFileBlockDefaultEncodingContext) blkEncodingCtx; encodingCtx.prepareEncoding(out); NoneEncoder encoder = new NoneEncoder(out, encodingCtx); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestRowIndexV1DataEncoder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestRowIndexV1DataEncoder.java new file mode 100644 index 0000000000..8e172436b5 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestRowIndexV1DataEncoder.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellComparatorImpl; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ IOTests.class, MediumTests.class }) +public class TestRowIndexV1DataEncoder { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRowIndexV1DataEncoder.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private Configuration conf; + private FileSystem fs; + private DataBlockEncoding dataBlockEncoding; + + @Before + public void setUp() throws IOException { + conf = TEST_UTIL.getConfiguration(); + fs = FileSystem.get(conf); + dataBlockEncoding = DataBlockEncoding.ROW_INDEX_V1; + } + + @Test + public void testBlockCountWritten() throws IOException { + Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), "testHFileFormatV3"); + final int entryCount = 10000; + writeDataToHFile(hfilePath, entryCount); + } + + private void writeDataToHFile(Path hfilePath, int entryCount) throws IOException { + HFileContext context = + new HFileContextBuilder().withBlockSize(1024).withDataBlockEncoding(dataBlockEncoding) + .withCellComparator(CellComparatorImpl.COMPARATOR).build(); + CacheConfig cacheConfig = new CacheConfig(conf); + HFile.Writer writer = + new HFile.WriterFactory(conf, cacheConfig).withPath(fs, hfilePath).withFileContext(context) + .create(); + + List keyValues = new ArrayList<>(entryCount); + + writeKeyValues(entryCount, writer, keyValues); + + FSDataInputStream fsdis = fs.open(hfilePath); + + long fileSize = fs.getFileStatus(hfilePath).getLen(); + FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis, fileSize); + + // HBASE-23788 + // kv size = 24 bytes, block size = 1024 bytes + // per row encoded data written = (4 (Row index) + 24 (Cell size) + 1 (MVCC)) bytes = 29 bytes + // creating block size of (29 * 36) bytes = 1044 bytes + // Number of blocks = ceil((29 * 10000) / 1044) = 278 + // Without the patch it would have produced 244 blocks (each block of 1236 bytes) + // Earlier this would create blocks ~20% greater than the block size of 1024 bytes + // After this patch actual block size is ~2% greater than the block size of 1024 bytes + Assert.assertEquals(278, trailer.getDataIndexCount()); + } + + private void writeKeyValues(int entryCount, HFile.Writer writer, List keyValues) + throws IOException { + for (int i = 0; i < entryCount; ++i) { + byte[] keyBytes = intToBytes(i); + + byte[] valueBytes = new byte[0]; + KeyValue keyValue = new KeyValue(keyBytes, null, null, valueBytes); + + writer.append(keyValue); + keyValues.add(keyValue); + } + writer.close(); + } + + private byte[] intToBytes(final int i) { + ByteBuffer bb = ByteBuffer.allocate(4); + bb.putInt(i); + return bb.array(); + } +} -- 2.11.4.GIT