2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import static org
.apache
.hadoop
.hbase
.HBaseTestingUtility
.COLUMNS
;
21 import static org
.apache
.hadoop
.hbase
.HBaseTestingUtility
.fam1
;
22 import static org
.apache
.hadoop
.hbase
.HBaseTestingUtility
.fam2
;
23 import static org
.apache
.hadoop
.hbase
.HBaseTestingUtility
.fam3
;
24 import static org
.junit
.Assert
.assertArrayEquals
;
25 import static org
.junit
.Assert
.assertEquals
;
26 import static org
.junit
.Assert
.assertFalse
;
27 import static org
.junit
.Assert
.assertNotNull
;
28 import static org
.junit
.Assert
.assertNull
;
29 import static org
.junit
.Assert
.assertTrue
;
30 import static org
.junit
.Assert
.fail
;
31 import static org
.mockito
.ArgumentMatchers
.any
;
32 import static org
.mockito
.ArgumentMatchers
.anyLong
;
33 import static org
.mockito
.Mockito
.doThrow
;
34 import static org
.mockito
.Mockito
.mock
;
35 import static org
.mockito
.Mockito
.never
;
36 import static org
.mockito
.Mockito
.spy
;
37 import static org
.mockito
.Mockito
.times
;
38 import static org
.mockito
.Mockito
.verify
;
39 import static org
.mockito
.Mockito
.when
;
41 import java
.io
.IOException
;
42 import java
.io
.InterruptedIOException
;
43 import java
.math
.BigDecimal
;
44 import java
.nio
.charset
.StandardCharsets
;
45 import java
.security
.PrivilegedExceptionAction
;
46 import java
.util
.ArrayList
;
47 import java
.util
.Arrays
;
48 import java
.util
.Collection
;
49 import java
.util
.List
;
51 import java
.util
.NavigableMap
;
52 import java
.util
.Objects
;
53 import java
.util
.TreeMap
;
54 import java
.util
.concurrent
.Callable
;
55 import java
.util
.concurrent
.CountDownLatch
;
56 import java
.util
.concurrent
.ExecutorService
;
57 import java
.util
.concurrent
.Executors
;
58 import java
.util
.concurrent
.Future
;
59 import java
.util
.concurrent
.TimeUnit
;
60 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
61 import java
.util
.concurrent
.atomic
.AtomicInteger
;
62 import java
.util
.concurrent
.atomic
.AtomicReference
;
63 import org
.apache
.commons
.lang3
.RandomStringUtils
;
64 import org
.apache
.hadoop
.conf
.Configuration
;
65 import org
.apache
.hadoop
.fs
.FSDataOutputStream
;
66 import org
.apache
.hadoop
.fs
.FileStatus
;
67 import org
.apache
.hadoop
.fs
.FileSystem
;
68 import org
.apache
.hadoop
.fs
.Path
;
69 import org
.apache
.hadoop
.hbase
.ArrayBackedTag
;
70 import org
.apache
.hadoop
.hbase
.Cell
;
71 import org
.apache
.hadoop
.hbase
.Cell
.Type
;
72 import org
.apache
.hadoop
.hbase
.CellBuilderFactory
;
73 import org
.apache
.hadoop
.hbase
.CellBuilderType
;
74 import org
.apache
.hadoop
.hbase
.CellUtil
;
75 import org
.apache
.hadoop
.hbase
.CompareOperator
;
76 import org
.apache
.hadoop
.hbase
.CompatibilitySingletonFactory
;
77 import org
.apache
.hadoop
.hbase
.DroppedSnapshotException
;
78 import org
.apache
.hadoop
.hbase
.ExtendedCellBuilderFactory
;
79 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
80 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
81 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
82 import org
.apache
.hadoop
.hbase
.HColumnDescriptor
;
83 import org
.apache
.hadoop
.hbase
.HConstants
;
84 import org
.apache
.hadoop
.hbase
.HConstants
.OperationStatusCode
;
85 import org
.apache
.hadoop
.hbase
.HDFSBlocksDistribution
;
86 import org
.apache
.hadoop
.hbase
.HRegionInfo
;
87 import org
.apache
.hadoop
.hbase
.HTableDescriptor
;
88 import org
.apache
.hadoop
.hbase
.KeyValue
;
89 import org
.apache
.hadoop
.hbase
.MiniHBaseCluster
;
90 import org
.apache
.hadoop
.hbase
.MultithreadedTestUtil
;
91 import org
.apache
.hadoop
.hbase
.MultithreadedTestUtil
.RepeatingTestThread
;
92 import org
.apache
.hadoop
.hbase
.MultithreadedTestUtil
.TestThread
;
93 import org
.apache
.hadoop
.hbase
.NotServingRegionException
;
94 import org
.apache
.hadoop
.hbase
.PrivateCellUtil
;
95 import org
.apache
.hadoop
.hbase
.RegionTooBusyException
;
96 import org
.apache
.hadoop
.hbase
.ServerName
;
97 import org
.apache
.hadoop
.hbase
.StartMiniClusterOption
;
98 import org
.apache
.hadoop
.hbase
.TableName
;
99 import org
.apache
.hadoop
.hbase
.TagType
;
100 import org
.apache
.hadoop
.hbase
.Waiter
;
101 import org
.apache
.hadoop
.hbase
.client
.Append
;
102 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptor
;
103 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
104 import org
.apache
.hadoop
.hbase
.client
.Delete
;
105 import org
.apache
.hadoop
.hbase
.client
.Durability
;
106 import org
.apache
.hadoop
.hbase
.client
.Get
;
107 import org
.apache
.hadoop
.hbase
.client
.Increment
;
108 import org
.apache
.hadoop
.hbase
.client
.Mutation
;
109 import org
.apache
.hadoop
.hbase
.client
.Put
;
110 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
111 import org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
;
112 import org
.apache
.hadoop
.hbase
.client
.Result
;
113 import org
.apache
.hadoop
.hbase
.client
.RowMutations
;
114 import org
.apache
.hadoop
.hbase
.client
.Scan
;
115 import org
.apache
.hadoop
.hbase
.client
.Table
;
116 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
117 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
118 import org
.apache
.hadoop
.hbase
.coprocessor
.CoprocessorHost
;
119 import org
.apache
.hadoop
.hbase
.exceptions
.FailedSanityCheckException
;
120 import org
.apache
.hadoop
.hbase
.filter
.BigDecimalComparator
;
121 import org
.apache
.hadoop
.hbase
.filter
.BinaryComparator
;
122 import org
.apache
.hadoop
.hbase
.filter
.ColumnCountGetFilter
;
123 import org
.apache
.hadoop
.hbase
.filter
.Filter
;
124 import org
.apache
.hadoop
.hbase
.filter
.FilterBase
;
125 import org
.apache
.hadoop
.hbase
.filter
.FilterList
;
126 import org
.apache
.hadoop
.hbase
.filter
.NullComparator
;
127 import org
.apache
.hadoop
.hbase
.filter
.PrefixFilter
;
128 import org
.apache
.hadoop
.hbase
.filter
.SingleColumnValueExcludeFilter
;
129 import org
.apache
.hadoop
.hbase
.filter
.SingleColumnValueFilter
;
130 import org
.apache
.hadoop
.hbase
.filter
.SubstringComparator
;
131 import org
.apache
.hadoop
.hbase
.filter
.ValueFilter
;
132 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFile
;
133 import org
.apache
.hadoop
.hbase
.monitoring
.MonitoredRPCHandler
;
134 import org
.apache
.hadoop
.hbase
.monitoring
.MonitoredTask
;
135 import org
.apache
.hadoop
.hbase
.monitoring
.TaskMonitor
;
136 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
.MutationBatchOperation
;
137 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
.RegionScannerImpl
;
138 import org
.apache
.hadoop
.hbase
.regionserver
.Region
.RowLock
;
139 import org
.apache
.hadoop
.hbase
.regionserver
.TestHStore
.FaultyFileSystem
;
140 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionRequestImpl
;
141 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.FSHLog
;
142 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.MetricsWALSource
;
143 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.WALUtil
;
144 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.ReplicationObserver
;
145 import org
.apache
.hadoop
.hbase
.security
.User
;
146 import org
.apache
.hadoop
.hbase
.test
.MetricsAssertHelper
;
147 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
148 import org
.apache
.hadoop
.hbase
.testclassification
.VerySlowRegionServerTests
;
149 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
150 import org
.apache
.hadoop
.hbase
.util
.CommonFSUtils
;
151 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
152 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManagerTestHelper
;
153 import org
.apache
.hadoop
.hbase
.util
.FSUtils
;
154 import org
.apache
.hadoop
.hbase
.util
.HFileArchiveUtil
;
155 import org
.apache
.hadoop
.hbase
.util
.IncrementingEnvironmentEdge
;
156 import org
.apache
.hadoop
.hbase
.util
.ManualEnvironmentEdge
;
157 import org
.apache
.hadoop
.hbase
.util
.Threads
;
158 import org
.apache
.hadoop
.hbase
.wal
.AbstractFSWALProvider
;
159 import org
.apache
.hadoop
.hbase
.wal
.FaultyFSLog
;
160 import org
.apache
.hadoop
.hbase
.wal
.NettyAsyncFSWALConfigHelper
;
161 import org
.apache
.hadoop
.hbase
.wal
.WAL
;
162 import org
.apache
.hadoop
.hbase
.wal
.WALEdit
;
163 import org
.apache
.hadoop
.hbase
.wal
.WALFactory
;
164 import org
.apache
.hadoop
.hbase
.wal
.WALKeyImpl
;
165 import org
.apache
.hadoop
.hbase
.wal
.WALProvider
;
166 import org
.apache
.hadoop
.hbase
.wal
.WALProvider
.Writer
;
167 import org
.apache
.hadoop
.hbase
.wal
.WALSplitUtil
;
168 import org
.junit
.After
;
169 import org
.junit
.Assert
;
170 import org
.junit
.Before
;
171 import org
.junit
.ClassRule
;
172 import org
.junit
.Rule
;
173 import org
.junit
.Test
;
174 import org
.junit
.experimental
.categories
.Category
;
175 import org
.junit
.rules
.ExpectedException
;
176 import org
.junit
.rules
.TestName
;
177 import org
.mockito
.ArgumentCaptor
;
178 import org
.mockito
.ArgumentMatcher
;
179 import org
.mockito
.Mockito
;
180 import org
.mockito
.invocation
.InvocationOnMock
;
181 import org
.mockito
.stubbing
.Answer
;
182 import org
.slf4j
.Logger
;
183 import org
.slf4j
.LoggerFactory
;
185 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Lists
;
186 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.ByteString
;
187 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.EventLoopGroup
;
188 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.nio
.NioEventLoopGroup
;
189 import org
.apache
.hbase
.thirdparty
.io
.netty
.channel
.socket
.nio
.NioSocketChannel
;
191 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
192 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.CompactionDescriptor
;
193 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.FlushDescriptor
;
194 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.FlushDescriptor
.FlushAction
;
195 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.FlushDescriptor
.StoreFlushDescriptor
;
196 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.RegionEventDescriptor
;
197 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.StoreDescriptor
;
200 * Basic stand-alone testing of HRegion. No clusters!
202 * A lot of the meta information for an HRegion now lives inside other HRegions
203 * or in the HBaseMaster, so only basic testing is possible.
205 @Category({VerySlowRegionServerTests
.class, LargeTests
.class})
206 @SuppressWarnings("deprecation")
207 public class TestHRegion
{
210 public static final HBaseClassTestRule CLASS_RULE
=
211 HBaseClassTestRule
.forClass(TestHRegion
.class);
213 // Do not spin up clusters in here. If you need to spin up a cluster, do it
214 // over in TestHRegionOnCluster.
215 private static final Logger LOG
= LoggerFactory
.getLogger(TestHRegion
.class);
217 public TestName name
= new TestName();
218 @Rule public final ExpectedException thrown
= ExpectedException
.none();
220 private static final String COLUMN_FAMILY
= "MyCF";
221 private static final byte [] COLUMN_FAMILY_BYTES
= Bytes
.toBytes(COLUMN_FAMILY
);
222 private static final EventLoopGroup GROUP
= new NioEventLoopGroup();
224 HRegion region
= null;
225 // Do not run unit tests in parallel (? Why not? It don't work? Why not? St.Ack)
226 protected static HBaseTestingUtility TEST_UTIL
;
227 public static Configuration CONF
;
229 private final int MAX_VERSIONS
= 2;
232 protected TableName tableName
;
233 protected String method
;
234 protected final byte[] qual
= Bytes
.toBytes("qual");
235 protected final byte[] qual1
= Bytes
.toBytes("qual1");
236 protected final byte[] qual2
= Bytes
.toBytes("qual2");
237 protected final byte[] qual3
= Bytes
.toBytes("qual3");
238 protected final byte[] value
= Bytes
.toBytes("value");
239 protected final byte[] value1
= Bytes
.toBytes("value1");
240 protected final byte[] value2
= Bytes
.toBytes("value2");
241 protected final byte[] row
= Bytes
.toBytes("rowA");
242 protected final byte[] row2
= Bytes
.toBytes("rowB");
244 protected final MetricsAssertHelper metricsAssertHelper
= CompatibilitySingletonFactory
245 .getInstance(MetricsAssertHelper
.class);
248 public void setup() throws IOException
{
249 TEST_UTIL
= HBaseTestingUtility
.createLocalHTU();
250 CONF
= TEST_UTIL
.getConfiguration();
251 NettyAsyncFSWALConfigHelper
.setEventLoopConfig(CONF
, GROUP
, NioSocketChannel
.class);
252 dir
= TEST_UTIL
.getDataTestDir("TestHRegion").toString();
253 method
= name
.getMethodName();
254 tableName
= TableName
.valueOf(method
);
255 CONF
.set(CompactingMemStore
.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY
, String
.valueOf(0.09));
259 public void tearDown() throws IOException
{
260 // Region may have been closed, but it is still no harm if we close it again here using HTU.
261 HBaseTestingUtility
.closeRegionAndWAL(region
);
262 EnvironmentEdgeManagerTestHelper
.reset();
263 LOG
.info("Cleaning test directory: " + TEST_UTIL
.getDataTestDir());
264 TEST_UTIL
.cleanupTestDir();
268 * Test that I can use the max flushed sequence id after the close.
269 * @throws IOException
272 public void testSequenceId() throws IOException
{
273 region
= initHRegion(tableName
, method
, CONF
, COLUMN_FAMILY_BYTES
);
274 assertEquals(HConstants
.NO_SEQNUM
, region
.getMaxFlushedSeqId());
275 // Weird. This returns 0 if no store files or no edits. Afraid to change it.
276 assertEquals(0, (long)region
.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES
));
277 HBaseTestingUtility
.closeRegionAndWAL(this.region
);
278 assertEquals(HConstants
.NO_SEQNUM
, region
.getMaxFlushedSeqId());
279 assertEquals(0, (long)region
.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES
));
280 // Open region again.
281 region
= initHRegion(tableName
, method
, CONF
, COLUMN_FAMILY_BYTES
);
282 byte [] value
= Bytes
.toBytes(method
);
283 // Make a random put against our cf.
284 Put put
= new Put(value
);
285 put
.addColumn(COLUMN_FAMILY_BYTES
, null, value
);
287 // No flush yet so init numbers should still be in place.
288 assertEquals(HConstants
.NO_SEQNUM
, region
.getMaxFlushedSeqId());
289 assertEquals(0, (long)region
.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES
));
291 long max
= region
.getMaxFlushedSeqId();
292 HBaseTestingUtility
.closeRegionAndWAL(this.region
);
293 assertEquals(max
, region
.getMaxFlushedSeqId());
298 * Test for Bug 2 of HBASE-10466.
299 * "Bug 2: Conditions for the first flush of region close (so-called pre-flush) If memstoreSize
300 * is smaller than a certain value, or when region close starts a flush is ongoing, the first
301 * flush is skipped and only the second flush takes place. However, two flushes are required in
302 * case previous flush fails and leaves some data in snapshot. The bug could cause loss of data
303 * in current memstore. The fix is removing all conditions except abort check so we ensure 2
304 * flushes for region close."
305 * @throws IOException
308 public void testCloseCarryingSnapshot() throws IOException
{
309 region
= initHRegion(tableName
, method
, CONF
, COLUMN_FAMILY_BYTES
);
310 HStore store
= region
.getStore(COLUMN_FAMILY_BYTES
);
311 // Get some random bytes.
312 byte [] value
= Bytes
.toBytes(method
);
313 // Make a random put against our cf.
314 Put put
= new Put(value
);
315 put
.addColumn(COLUMN_FAMILY_BYTES
, null, value
);
316 // First put something in current memstore, which will be in snapshot after flusher.prepare()
318 StoreFlushContext storeFlushCtx
= store
.createFlushContext(12345, FlushLifeCycleTracker
.DUMMY
);
319 storeFlushCtx
.prepare();
320 // Second put something in current memstore
321 put
.addColumn(COLUMN_FAMILY_BYTES
, Bytes
.toBytes("abc"), value
);
323 // Close with something in memstore and something in the snapshot. Make sure all is cleared.
324 HBaseTestingUtility
.closeRegionAndWAL(region
);
325 assertEquals(0, region
.getMemStoreDataSize());
330 * This test is for verifying memstore snapshot size is correctly updated in case of rollback
334 public void testMemstoreSnapshotSize() throws IOException
{
335 class MyFaultyFSLog
extends FaultyFSLog
{
336 StoreFlushContext storeFlushCtx
;
337 public MyFaultyFSLog(FileSystem fs
, Path rootDir
, String logName
, Configuration conf
)
339 super(fs
, rootDir
, logName
, conf
);
342 void setStoreFlushCtx(StoreFlushContext storeFlushCtx
) {
343 this.storeFlushCtx
= storeFlushCtx
;
347 public void sync(long txid
) throws IOException
{
348 storeFlushCtx
.prepare();
353 FileSystem fs
= FileSystem
.get(CONF
);
354 Path rootDir
= new Path(dir
+ "testMemstoreSnapshotSize");
355 MyFaultyFSLog faultyLog
= new MyFaultyFSLog(fs
, rootDir
, "testMemstoreSnapshotSize", CONF
);
357 region
= initHRegion(tableName
, null, null, false, Durability
.SYNC_WAL
, faultyLog
,
358 COLUMN_FAMILY_BYTES
);
360 HStore store
= region
.getStore(COLUMN_FAMILY_BYTES
);
361 // Get some random bytes.
362 byte [] value
= Bytes
.toBytes(method
);
363 faultyLog
.setStoreFlushCtx(store
.createFlushContext(12345, FlushLifeCycleTracker
.DUMMY
));
365 Put put
= new Put(value
);
366 put
.addColumn(COLUMN_FAMILY_BYTES
, Bytes
.toBytes("abc"), value
);
367 faultyLog
.setFailureType(FaultyFSLog
.FailureType
.SYNC
);
368 boolean threwIOE
= false;
371 } catch (IOException ioe
) {
374 assertTrue("The regionserver should have thrown an exception", threwIOE
);
376 MemStoreSize mss
= store
.getFlushableSize();
377 assertTrue("flushable size should be zero, but it is " + mss
,
378 mss
.getDataSize() == 0);
382 * Create a WAL outside of the usual helper in
383 * {@link HBaseTestingUtility#createWal(Configuration, Path, RegionInfo)} because that method
384 * doesn't play nicely with FaultyFileSystem. Call this method before overriding
385 * {@code fs.file.impl}.
386 * @param callingMethod a unique component for the path, probably the name of the test method.
388 private static WAL
createWALCompatibleWithFaultyFileSystem(String callingMethod
,
389 Configuration conf
, TableName tableName
) throws IOException
{
390 final Path logDir
= TEST_UTIL
.getDataTestDirOnTestFS(callingMethod
+ ".log");
391 final Configuration walConf
= new Configuration(conf
);
392 FSUtils
.setRootDir(walConf
, logDir
);
393 return new WALFactory(walConf
, callingMethod
)
394 .getWAL(RegionInfoBuilder
.newBuilder(tableName
).build());
398 public void testMemstoreSizeAccountingWithFailedPostBatchMutate() throws IOException
{
399 String testName
= "testMemstoreSizeAccountingWithFailedPostBatchMutate";
400 FileSystem fs
= FileSystem
.get(CONF
);
401 Path rootDir
= new Path(dir
+ testName
);
402 FSHLog hLog
= new FSHLog(fs
, rootDir
, testName
, CONF
);
404 region
= initHRegion(tableName
, null, null, false, Durability
.SYNC_WAL
, hLog
,
405 COLUMN_FAMILY_BYTES
);
406 HStore store
= region
.getStore(COLUMN_FAMILY_BYTES
);
407 assertEquals(0, region
.getMemStoreDataSize());
410 byte [] value
= Bytes
.toBytes(method
);
411 Put put
= new Put(value
);
412 put
.addColumn(COLUMN_FAMILY_BYTES
, Bytes
.toBytes("abc"), value
);
414 long onePutSize
= region
.getMemStoreDataSize();
415 assertTrue(onePutSize
> 0);
417 RegionCoprocessorHost mockedCPHost
= Mockito
.mock(RegionCoprocessorHost
.class);
418 doThrow(new IOException())
419 .when(mockedCPHost
).postBatchMutate(Mockito
.<MiniBatchOperationInProgress
<Mutation
>>any());
420 region
.setCoprocessorHost(mockedCPHost
);
422 put
= new Put(value
);
423 put
.addColumn(COLUMN_FAMILY_BYTES
, Bytes
.toBytes("dfg"), value
);
426 fail("Should have failed with IOException");
427 } catch (IOException expected
) {
429 long expectedSize
= onePutSize
* 2;
430 assertEquals("memstoreSize should be incremented",
431 expectedSize
, region
.getMemStoreDataSize());
432 assertEquals("flushable size should be incremented",
433 expectedSize
, store
.getFlushableSize().getDataSize());
435 region
.setCoprocessorHost(null);
439 * A test case of HBASE-21041
440 * @throws Exception Exception
443 public void testFlushAndMemstoreSizeCounting() throws Exception
{
444 byte[] family
= Bytes
.toBytes("family");
445 this.region
= initHRegion(tableName
, method
, CONF
, family
);
446 final WALFactory wals
= new WALFactory(CONF
, method
);
448 for (byte[] row
: HBaseTestingUtility
.ROWS
) {
449 Put put
= new Put(row
);
450 put
.addColumn(family
, family
, row
);
454 // After flush, data size should be zero
455 assertEquals(0, region
.getMemStoreDataSize());
456 // After flush, a new active mutable segment is created, so the heap size
457 // should equal to MutableSegment.DEEP_OVERHEAD
458 assertEquals(MutableSegment
.DEEP_OVERHEAD
, region
.getMemStoreHeapSize());
459 // After flush, offheap should be zero
460 assertEquals(0, region
.getMemStoreOffHeapSize());
462 HBaseTestingUtility
.closeRegionAndWAL(this.region
);
469 * Test we do not lose data if we fail a flush and then close.
470 * Part of HBase-10466. Tests the following from the issue description:
471 * "Bug 1: Wrong calculation of HRegion.memstoreSize: When a flush fails, data to be flushed is
472 * kept in each MemStore's snapshot and wait for next flush attempt to continue on it. But when
473 * the next flush succeeds, the counter of total memstore size in HRegion is always deduced by
474 * the sum of current memstore sizes instead of snapshots left from previous failed flush. This
475 * calculation is problematic that almost every time there is failed flush, HRegion.memstoreSize
476 * gets reduced by a wrong value. If region flush could not proceed for a couple cycles, the size
477 * in current memstore could be much larger than the snapshot. It's likely to drift memstoreSize
478 * much smaller than expected. In extreme case, if the error accumulates to even bigger than
479 * HRegion's memstore size limit, any further flush is skipped because flush does not do anything
480 * if memstoreSize is not larger than 0."
484 public void testFlushSizeAccounting() throws Exception
{
485 final Configuration conf
= HBaseConfiguration
.create(CONF
);
486 final WAL wal
= createWALCompatibleWithFaultyFileSystem(method
, conf
, tableName
);
488 conf
.setInt("hbase.hstore.flush.retries.number", 1);
490 User
.createUserForTesting(conf
, method
, new String
[]{"foo"});
491 // Inject our faulty LocalFileSystem
492 conf
.setClass("fs.file.impl", FaultyFileSystem
.class, FileSystem
.class);
493 user
.runAs(new PrivilegedExceptionAction
<Object
>() {
495 public Object
run() throws Exception
{
496 // Make sure it worked (above is sensitive to caching details in hadoop core)
497 FileSystem fs
= FileSystem
.get(conf
);
498 Assert
.assertEquals(FaultyFileSystem
.class, fs
.getClass());
499 FaultyFileSystem ffs
= (FaultyFileSystem
)fs
;
500 HRegion region
= null;
503 region
= initHRegion(tableName
, null, null, false, Durability
.SYNC_WAL
, wal
,
504 COLUMN_FAMILY_BYTES
);
505 long size
= region
.getMemStoreDataSize();
506 Assert
.assertEquals(0, size
);
507 // Put one item into memstore. Measure the size of one item in memstore.
508 Put p1
= new Put(row
);
509 p1
.add(new KeyValue(row
, COLUMN_FAMILY_BYTES
, qual1
, 1, (byte[]) null));
511 final long sizeOfOnePut
= region
.getMemStoreDataSize();
512 // Fail a flush which means the current memstore will hang out as memstore 'snapshot'.
514 LOG
.info("Flushing");
516 Assert
.fail("Didn't bubble up IOE!");
517 } catch (DroppedSnapshotException dse
) {
518 // What we are expecting
519 region
.closing
.set(false); // this is needed for the rest of the test to work
521 // Make it so all writes succeed from here on out
522 ffs
.fault
.set(false);
523 // Check sizes. Should still be the one entry.
524 Assert
.assertEquals(sizeOfOnePut
, region
.getMemStoreDataSize());
525 // Now add two entries so that on this next flush that fails, we can see if we
526 // subtract the right amount, the snapshot size only.
527 Put p2
= new Put(row
);
528 p2
.add(new KeyValue(row
, COLUMN_FAMILY_BYTES
, qual2
, 2, (byte[])null));
529 p2
.add(new KeyValue(row
, COLUMN_FAMILY_BYTES
, qual3
, 3, (byte[])null));
531 long expectedSize
= sizeOfOnePut
* 3;
532 Assert
.assertEquals(expectedSize
, region
.getMemStoreDataSize());
533 // Do a successful flush. It will clear the snapshot only. Thats how flushes work.
534 // If already a snapshot, we clear it else we move the memstore to be snapshot and flush
537 // Make sure our memory accounting is right.
538 Assert
.assertEquals(sizeOfOnePut
* 2, region
.getMemStoreDataSize());
540 HBaseTestingUtility
.closeRegionAndWAL(region
);
545 FileSystem
.closeAllForUGI(user
.getUGI());
549 public void testCloseWithFailingFlush() throws Exception
{
550 final Configuration conf
= HBaseConfiguration
.create(CONF
);
551 final WAL wal
= createWALCompatibleWithFaultyFileSystem(method
, conf
, tableName
);
553 conf
.setInt("hbase.hstore.flush.retries.number", 1);
555 User
.createUserForTesting(conf
, this.method
, new String
[]{"foo"});
556 // Inject our faulty LocalFileSystem
557 conf
.setClass("fs.file.impl", FaultyFileSystem
.class, FileSystem
.class);
558 user
.runAs(new PrivilegedExceptionAction
<Object
>() {
560 public Object
run() throws Exception
{
561 // Make sure it worked (above is sensitive to caching details in hadoop core)
562 FileSystem fs
= FileSystem
.get(conf
);
563 Assert
.assertEquals(FaultyFileSystem
.class, fs
.getClass());
564 FaultyFileSystem ffs
= (FaultyFileSystem
)fs
;
565 HRegion region
= null;
568 region
= initHRegion(tableName
, null, null, false,
569 Durability
.SYNC_WAL
, wal
, COLUMN_FAMILY_BYTES
);
570 long size
= region
.getMemStoreDataSize();
571 Assert
.assertEquals(0, size
);
572 // Put one item into memstore. Measure the size of one item in memstore.
573 Put p1
= new Put(row
);
574 p1
.add(new KeyValue(row
, COLUMN_FAMILY_BYTES
, qual1
, 1, (byte[])null));
576 // Manufacture an outstanding snapshot -- fake a failed flush by doing prepare step only.
577 HStore store
= region
.getStore(COLUMN_FAMILY_BYTES
);
578 StoreFlushContext storeFlushCtx
=
579 store
.createFlushContext(12345, FlushLifeCycleTracker
.DUMMY
);
580 storeFlushCtx
.prepare();
581 // Now add two entries to the foreground memstore.
582 Put p2
= new Put(row
);
583 p2
.add(new KeyValue(row
, COLUMN_FAMILY_BYTES
, qual2
, 2, (byte[])null));
584 p2
.add(new KeyValue(row
, COLUMN_FAMILY_BYTES
, qual3
, 3, (byte[])null));
586 // Now try close on top of a failing flush.
587 HBaseTestingUtility
.closeRegionAndWAL(region
);
590 } catch (DroppedSnapshotException dse
) {
592 LOG
.info("Expected DroppedSnapshotException");
594 // Make it so all writes succeed from here on out so can close clean
595 ffs
.fault
.set(false);
596 HBaseTestingUtility
.closeRegionAndWAL(region
);
601 FileSystem
.closeAllForUGI(user
.getUGI());
605 public void testCompactionAffectedByScanners() throws Exception
{
606 byte[] family
= Bytes
.toBytes("family");
607 this.region
= initHRegion(tableName
, method
, CONF
, family
);
609 Put put
= new Put(Bytes
.toBytes("r1"));
610 put
.addColumn(family
, Bytes
.toBytes("q1"), Bytes
.toBytes("v1"));
614 Scan scan
= new Scan();
615 scan
.setMaxVersions(3);
616 // open the first scanner
617 RegionScanner scanner1
= region
.getScanner(scan
);
619 Delete delete
= new Delete(Bytes
.toBytes("r1"));
620 region
.delete(delete
);
623 // open the second scanner
624 RegionScanner scanner2
= region
.getScanner(scan
);
626 List
<Cell
> results
= new ArrayList
<>();
628 System
.out
.println("Smallest read point:" + region
.getSmallestReadPoint());
630 // make a major compaction
631 region
.compact(true);
633 // open the third scanner
634 RegionScanner scanner3
= region
.getScanner(scan
);
636 // get data from scanner 1, 2, 3 after major compaction
637 scanner1
.next(results
);
638 System
.out
.println(results
);
639 assertEquals(1, results
.size());
642 scanner2
.next(results
);
643 System
.out
.println(results
);
644 assertEquals(0, results
.size());
647 scanner3
.next(results
);
648 System
.out
.println(results
);
649 assertEquals(0, results
.size());
653 public void testToShowNPEOnRegionScannerReseek() throws Exception
{
654 byte[] family
= Bytes
.toBytes("family");
655 this.region
= initHRegion(tableName
, method
, CONF
, family
);
657 Put put
= new Put(Bytes
.toBytes("r1"));
658 put
.addColumn(family
, Bytes
.toBytes("q1"), Bytes
.toBytes("v1"));
660 put
= new Put(Bytes
.toBytes("r2"));
661 put
.addColumn(family
, Bytes
.toBytes("q1"), Bytes
.toBytes("v1"));
665 Scan scan
= new Scan();
666 scan
.setMaxVersions(3);
667 // open the first scanner
668 RegionScanner scanner1
= region
.getScanner(scan
);
670 System
.out
.println("Smallest read point:" + region
.getSmallestReadPoint());
672 region
.compact(true);
674 scanner1
.reseek(Bytes
.toBytes("r2"));
675 List
<Cell
> results
= new ArrayList
<>();
676 scanner1
.next(results
);
677 Cell keyValue
= results
.get(0);
678 Assert
.assertTrue(Bytes
.compareTo(CellUtil
.cloneRow(keyValue
), Bytes
.toBytes("r2")) == 0);
683 public void testArchiveRecoveredEditsReplay() throws Exception
{
684 byte[] family
= Bytes
.toBytes("family");
685 this.region
= initHRegion(tableName
, method
, CONF
, family
);
686 final WALFactory wals
= new WALFactory(CONF
, method
);
688 Path regiondir
= region
.getRegionFileSystem().getRegionDir();
689 FileSystem fs
= region
.getRegionFileSystem().getFileSystem();
690 byte[] regionName
= region
.getRegionInfo().getEncodedNameAsBytes();
692 Path recoveredEditsDir
= WALSplitUtil
.getRegionDirRecoveredEditsDir(regiondir
);
694 long maxSeqId
= 1050;
695 long minSeqId
= 1000;
697 for (long i
= minSeqId
; i
<= maxSeqId
; i
+= 10) {
698 Path recoveredEdits
= new Path(recoveredEditsDir
, String
.format("%019d", i
));
699 fs
.create(recoveredEdits
);
700 WALProvider
.Writer writer
= wals
.createRecoveredEditsWriter(fs
, recoveredEdits
);
702 long time
= System
.nanoTime();
703 WALEdit edit
= new WALEdit();
704 edit
.add(new KeyValue(row
, family
, Bytes
.toBytes(i
), time
, KeyValue
.Type
.Put
, Bytes
706 writer
.append(new WAL
.Entry(new WALKeyImpl(regionName
, tableName
, i
, time
,
707 HConstants
.DEFAULT_CLUSTER_ID
), edit
));
711 MonitoredTask status
= TaskMonitor
.get().createStatus(method
);
712 Map
<byte[], Long
> maxSeqIdInStores
= new TreeMap
<>(Bytes
.BYTES_COMPARATOR
);
713 for (HStore store
: region
.getStores()) {
714 maxSeqIdInStores
.put(Bytes
.toBytes(store
.getColumnFamilyName()), minSeqId
- 1);
716 CONF
.set("hbase.region.archive.recovered.edits", "true");
717 CONF
.set(CommonFSUtils
.HBASE_WAL_DIR
, "/custom_wal_dir");
718 long seqId
= region
.replayRecoveredEditsIfAny(maxSeqIdInStores
, null, status
);
719 assertEquals(maxSeqId
, seqId
);
720 region
.getMVCC().advanceTo(seqId
);
721 String fakeFamilyName
= recoveredEditsDir
.getName();
722 Path rootDir
= new Path(CONF
.get(HConstants
.HBASE_DIR
));
723 Path storeArchiveDir
= HFileArchiveUtil
.getStoreArchivePathForRootDir(rootDir
,
724 region
.getRegionInfo(), Bytes
.toBytes(fakeFamilyName
));
725 FileStatus
[] list
= TEST_UTIL
.getTestFileSystem().listStatus(storeArchiveDir
);
726 assertEquals(6, list
.length
);
728 CONF
.set("hbase.region.archive.recovered.edits", "false");
729 CONF
.set(CommonFSUtils
.HBASE_WAL_DIR
, "");
730 HBaseTestingUtility
.closeRegionAndWAL(this.region
);
737 public void testSkipRecoveredEditsReplay() throws Exception
{
738 byte[] family
= Bytes
.toBytes("family");
739 this.region
= initHRegion(tableName
, method
, CONF
, family
);
740 final WALFactory wals
= new WALFactory(CONF
, method
);
742 Path regiondir
= region
.getRegionFileSystem().getRegionDir();
743 FileSystem fs
= region
.getRegionFileSystem().getFileSystem();
744 byte[] regionName
= region
.getRegionInfo().getEncodedNameAsBytes();
746 Path recoveredEditsDir
= WALSplitUtil
.getRegionDirRecoveredEditsDir(regiondir
);
748 long maxSeqId
= 1050;
749 long minSeqId
= 1000;
751 for (long i
= minSeqId
; i
<= maxSeqId
; i
+= 10) {
752 Path recoveredEdits
= new Path(recoveredEditsDir
, String
.format("%019d", i
));
753 fs
.create(recoveredEdits
);
754 WALProvider
.Writer writer
= wals
.createRecoveredEditsWriter(fs
, recoveredEdits
);
756 long time
= System
.nanoTime();
757 WALEdit edit
= new WALEdit();
758 edit
.add(new KeyValue(row
, family
, Bytes
.toBytes(i
), time
, KeyValue
.Type
.Put
, Bytes
760 writer
.append(new WAL
.Entry(new WALKeyImpl(regionName
, tableName
, i
, time
,
761 HConstants
.DEFAULT_CLUSTER_ID
), edit
));
765 MonitoredTask status
= TaskMonitor
.get().createStatus(method
);
766 Map
<byte[], Long
> maxSeqIdInStores
= new TreeMap
<>(Bytes
.BYTES_COMPARATOR
);
767 for (HStore store
: region
.getStores()) {
768 maxSeqIdInStores
.put(Bytes
.toBytes(store
.getColumnFamilyName()), minSeqId
- 1);
770 long seqId
= region
.replayRecoveredEditsIfAny(maxSeqIdInStores
, null, status
);
771 assertEquals(maxSeqId
, seqId
);
772 region
.getMVCC().advanceTo(seqId
);
773 Get get
= new Get(row
);
774 Result result
= region
.get(get
);
775 for (long i
= minSeqId
; i
<= maxSeqId
; i
+= 10) {
776 List
<Cell
> kvs
= result
.getColumnCells(family
, Bytes
.toBytes(i
));
777 assertEquals(1, kvs
.size());
778 assertArrayEquals(Bytes
.toBytes(i
), CellUtil
.cloneValue(kvs
.get(0)));
781 HBaseTestingUtility
.closeRegionAndWAL(this.region
);
788 public void testSkipRecoveredEditsReplaySomeIgnored() throws Exception
{
789 byte[] family
= Bytes
.toBytes("family");
790 this.region
= initHRegion(tableName
, method
, CONF
, family
);
791 final WALFactory wals
= new WALFactory(CONF
, method
);
793 Path regiondir
= region
.getRegionFileSystem().getRegionDir();
794 FileSystem fs
= region
.getRegionFileSystem().getFileSystem();
795 byte[] regionName
= region
.getRegionInfo().getEncodedNameAsBytes();
797 Path recoveredEditsDir
= WALSplitUtil
.getRegionDirRecoveredEditsDir(regiondir
);
799 long maxSeqId
= 1050;
800 long minSeqId
= 1000;
802 for (long i
= minSeqId
; i
<= maxSeqId
; i
+= 10) {
803 Path recoveredEdits
= new Path(recoveredEditsDir
, String
.format("%019d", i
));
804 fs
.create(recoveredEdits
);
805 WALProvider
.Writer writer
= wals
.createRecoveredEditsWriter(fs
, recoveredEdits
);
807 long time
= System
.nanoTime();
808 WALEdit edit
= new WALEdit();
809 edit
.add(new KeyValue(row
, family
, Bytes
.toBytes(i
), time
, KeyValue
.Type
.Put
, Bytes
811 writer
.append(new WAL
.Entry(new WALKeyImpl(regionName
, tableName
, i
, time
,
812 HConstants
.DEFAULT_CLUSTER_ID
), edit
));
816 long recoverSeqId
= 1030;
817 MonitoredTask status
= TaskMonitor
.get().createStatus(method
);
818 Map
<byte[], Long
> maxSeqIdInStores
= new TreeMap
<>(Bytes
.BYTES_COMPARATOR
);
819 for (HStore store
: region
.getStores()) {
820 maxSeqIdInStores
.put(Bytes
.toBytes(store
.getColumnFamilyName()), recoverSeqId
- 1);
822 long seqId
= region
.replayRecoveredEditsIfAny(maxSeqIdInStores
, null, status
);
823 assertEquals(maxSeqId
, seqId
);
824 region
.getMVCC().advanceTo(seqId
);
825 Get get
= new Get(row
);
826 Result result
= region
.get(get
);
827 for (long i
= minSeqId
; i
<= maxSeqId
; i
+= 10) {
828 List
<Cell
> kvs
= result
.getColumnCells(family
, Bytes
.toBytes(i
));
829 if (i
< recoverSeqId
) {
830 assertEquals(0, kvs
.size());
832 assertEquals(1, kvs
.size());
833 assertArrayEquals(Bytes
.toBytes(i
), CellUtil
.cloneValue(kvs
.get(0)));
837 HBaseTestingUtility
.closeRegionAndWAL(this.region
);
844 public void testSkipRecoveredEditsReplayAllIgnored() throws Exception
{
845 byte[] family
= Bytes
.toBytes("family");
846 this.region
= initHRegion(tableName
, method
, CONF
, family
);
847 Path regiondir
= region
.getRegionFileSystem().getRegionDir();
848 FileSystem fs
= region
.getRegionFileSystem().getFileSystem();
850 Path recoveredEditsDir
= WALSplitUtil
.getRegionDirRecoveredEditsDir(regiondir
);
851 for (int i
= 1000; i
< 1050; i
+= 10) {
852 Path recoveredEdits
= new Path(recoveredEditsDir
, String
.format("%019d", i
));
853 FSDataOutputStream dos
= fs
.create(recoveredEdits
);
857 long minSeqId
= 2000;
858 Path recoveredEdits
= new Path(recoveredEditsDir
, String
.format("%019d", minSeqId
- 1));
859 FSDataOutputStream dos
= fs
.create(recoveredEdits
);
862 Map
<byte[], Long
> maxSeqIdInStores
= new TreeMap
<>(Bytes
.BYTES_COMPARATOR
);
863 for (HStore store
: region
.getStores()) {
864 maxSeqIdInStores
.put(Bytes
.toBytes(store
.getColumnFamilyName()), minSeqId
);
866 long seqId
= region
.replayRecoveredEditsIfAny(maxSeqIdInStores
, null, null);
867 assertEquals(minSeqId
, seqId
);
871 public void testSkipRecoveredEditsReplayTheLastFileIgnored() throws Exception
{
872 byte[] family
= Bytes
.toBytes("family");
873 this.region
= initHRegion(tableName
, method
, CONF
, family
);
874 final WALFactory wals
= new WALFactory(CONF
, method
);
876 Path regiondir
= region
.getRegionFileSystem().getRegionDir();
877 FileSystem fs
= region
.getRegionFileSystem().getFileSystem();
878 byte[] regionName
= region
.getRegionInfo().getEncodedNameAsBytes();
879 byte[][] columns
= region
.getTableDescriptor().getColumnFamilyNames().toArray(new byte[0][]);
881 assertEquals(0, region
.getStoreFileList(columns
).size());
883 Path recoveredEditsDir
= WALSplitUtil
.getRegionDirRecoveredEditsDir(regiondir
);
885 long maxSeqId
= 1050;
886 long minSeqId
= 1000;
888 for (long i
= minSeqId
; i
<= maxSeqId
; i
+= 10) {
889 Path recoveredEdits
= new Path(recoveredEditsDir
, String
.format("%019d", i
));
890 fs
.create(recoveredEdits
);
891 WALProvider
.Writer writer
= wals
.createRecoveredEditsWriter(fs
, recoveredEdits
);
893 long time
= System
.nanoTime();
896 edit
= WALEdit
.createCompaction(region
.getRegionInfo(),
897 CompactionDescriptor
.newBuilder()
898 .setTableName(ByteString
.copyFrom(tableName
.getName()))
899 .setFamilyName(ByteString
.copyFrom(regionName
))
900 .setEncodedRegionName(ByteString
.copyFrom(regionName
))
901 .setStoreHomeDirBytes(ByteString
.copyFrom(Bytes
.toBytes(regiondir
.toString())))
902 .setRegionName(ByteString
.copyFrom(region
.getRegionInfo().getRegionName()))
905 edit
= new WALEdit();
906 edit
.add(new KeyValue(row
, family
, Bytes
.toBytes(i
), time
, KeyValue
.Type
.Put
, Bytes
909 writer
.append(new WAL
.Entry(new WALKeyImpl(regionName
, tableName
, i
, time
,
910 HConstants
.DEFAULT_CLUSTER_ID
), edit
));
914 long recoverSeqId
= 1030;
915 Map
<byte[], Long
> maxSeqIdInStores
= new TreeMap
<>(Bytes
.BYTES_COMPARATOR
);
916 MonitoredTask status
= TaskMonitor
.get().createStatus(method
);
917 for (HStore store
: region
.getStores()) {
918 maxSeqIdInStores
.put(Bytes
.toBytes(store
.getColumnFamilyName()), recoverSeqId
- 1);
920 long seqId
= region
.replayRecoveredEditsIfAny(maxSeqIdInStores
, null, status
);
921 assertEquals(maxSeqId
, seqId
);
923 // assert that the files are flushed
924 assertEquals(1, region
.getStoreFileList(columns
).size());
927 HBaseTestingUtility
.closeRegionAndWAL(this.region
);
934 public void testRecoveredEditsReplayCompaction() throws Exception
{
935 testRecoveredEditsReplayCompaction(false);
936 testRecoveredEditsReplayCompaction(true);
939 public void testRecoveredEditsReplayCompaction(boolean mismatchedRegionName
) throws Exception
{
940 CONF
.setClass(HConstants
.REGION_IMPL
, HRegionForTesting
.class, Region
.class);
941 byte[] family
= Bytes
.toBytes("family");
942 this.region
= initHRegion(tableName
, method
, CONF
, family
);
943 final WALFactory wals
= new WALFactory(CONF
, method
);
945 Path regiondir
= region
.getRegionFileSystem().getRegionDir();
946 FileSystem fs
= region
.getRegionFileSystem().getFileSystem();
947 byte[] regionName
= region
.getRegionInfo().getEncodedNameAsBytes();
952 for (long i
= minSeqId
; i
< maxSeqId
; i
++) {
953 Put put
= new Put(Bytes
.toBytes(i
));
954 put
.addColumn(family
, Bytes
.toBytes(i
), Bytes
.toBytes(i
));
959 // this will create a region with 3 files
960 assertEquals(3, region
.getStore(family
).getStorefilesCount());
961 List
<Path
> storeFiles
= new ArrayList
<>(3);
962 for (HStoreFile sf
: region
.getStore(family
).getStorefiles()) {
963 storeFiles
.add(sf
.getPath());
966 // disable compaction completion
967 CONF
.setBoolean("hbase.hstore.compaction.complete", false);
968 region
.compactStores();
970 // ensure that nothing changed
971 assertEquals(3, region
.getStore(family
).getStorefilesCount());
973 // now find the compacted file, and manually add it to the recovered edits
974 Path tmpDir
= new Path(region
.getRegionFileSystem().getTempDir(), Bytes
.toString(family
));
975 FileStatus
[] files
= FSUtils
.listStatus(fs
, tmpDir
);
976 String errorMsg
= "Expected to find 1 file in the region temp directory "
977 + "from the compaction, could not find any";
978 assertNotNull(errorMsg
, files
);
979 assertEquals(errorMsg
, 1, files
.length
);
980 // move the file inside region dir
981 Path newFile
= region
.getRegionFileSystem().commitStoreFile(Bytes
.toString(family
),
984 byte[] encodedNameAsBytes
= this.region
.getRegionInfo().getEncodedNameAsBytes();
985 byte[] fakeEncodedNameAsBytes
= new byte [encodedNameAsBytes
.length
];
986 for (int i
=0; i
< encodedNameAsBytes
.length
; i
++) {
987 // Mix the byte array to have a new encodedName
988 fakeEncodedNameAsBytes
[i
] = (byte) (encodedNameAsBytes
[i
] + 1);
991 CompactionDescriptor compactionDescriptor
= ProtobufUtil
.toCompactionDescriptor(this.region
992 .getRegionInfo(), mismatchedRegionName ? fakeEncodedNameAsBytes
: null, family
,
993 storeFiles
, Lists
.newArrayList(newFile
),
994 region
.getRegionFileSystem().getStoreDir(Bytes
.toString(family
)));
996 WALUtil
.writeCompactionMarker(region
.getWAL(), this.region
.getReplicationScope(),
997 this.region
.getRegionInfo(), compactionDescriptor
, region
.getMVCC());
999 Path recoveredEditsDir
= WALSplitUtil
.getRegionDirRecoveredEditsDir(regiondir
);
1001 Path recoveredEdits
= new Path(recoveredEditsDir
, String
.format("%019d", 1000));
1002 fs
.create(recoveredEdits
);
1003 WALProvider
.Writer writer
= wals
.createRecoveredEditsWriter(fs
, recoveredEdits
);
1005 long time
= System
.nanoTime();
1007 writer
.append(new WAL
.Entry(new WALKeyImpl(regionName
, tableName
, 10, time
,
1008 HConstants
.DEFAULT_CLUSTER_ID
), WALEdit
.createCompaction(region
.getRegionInfo(),
1009 compactionDescriptor
)));
1012 // close the region now, and reopen again
1013 region
.getTableDescriptor();
1014 region
.getRegionInfo();
1015 HBaseTestingUtility
.closeRegionAndWAL(this.region
);
1017 region
= HRegion
.openHRegion(region
, null);
1018 } catch (WrongRegionException wre
) {
1019 fail("Matching encoded region name should not have produced WrongRegionException");
1022 // now check whether we have only one store file, the compacted one
1023 Collection
<HStoreFile
> sfs
= region
.getStore(family
).getStorefiles();
1024 for (HStoreFile sf
: sfs
) {
1025 LOG
.info(Objects
.toString(sf
.getPath()));
1027 if (!mismatchedRegionName
) {
1028 assertEquals(1, region
.getStore(family
).getStorefilesCount());
1030 files
= FSUtils
.listStatus(fs
, tmpDir
);
1031 assertTrue("Expected to find 0 files inside " + tmpDir
, files
== null || files
.length
== 0);
1033 for (long i
= minSeqId
; i
< maxSeqId
; i
++) {
1034 Get get
= new Get(Bytes
.toBytes(i
));
1035 Result result
= region
.get(get
);
1036 byte[] value
= result
.getValue(family
, Bytes
.toBytes(i
));
1037 assertArrayEquals(Bytes
.toBytes(i
), value
);
1040 HBaseTestingUtility
.closeRegionAndWAL(this.region
);
1043 CONF
.setClass(HConstants
.REGION_IMPL
, HRegion
.class, Region
.class);
1048 public void testFlushMarkers() throws Exception
{
1049 // tests that flush markers are written to WAL and handled at recovered edits
1050 byte[] family
= Bytes
.toBytes("family");
1051 Path logDir
= TEST_UTIL
.getDataTestDirOnTestFS(method
+ ".log");
1052 final Configuration walConf
= new Configuration(TEST_UTIL
.getConfiguration());
1053 FSUtils
.setRootDir(walConf
, logDir
);
1054 final WALFactory wals
= new WALFactory(walConf
, method
);
1055 final WAL wal
= wals
.getWAL(RegionInfoBuilder
.newBuilder(tableName
).build());
1057 this.region
= initHRegion(tableName
, HConstants
.EMPTY_START_ROW
,
1058 HConstants
.EMPTY_END_ROW
, false, Durability
.USE_DEFAULT
, wal
, family
);
1060 Path regiondir
= region
.getRegionFileSystem().getRegionDir();
1061 FileSystem fs
= region
.getRegionFileSystem().getFileSystem();
1062 byte[] regionName
= region
.getRegionInfo().getEncodedNameAsBytes();
1067 for (long i
= minSeqId
; i
< maxSeqId
; i
++) {
1068 Put put
= new Put(Bytes
.toBytes(i
));
1069 put
.addColumn(family
, Bytes
.toBytes(i
), Bytes
.toBytes(i
));
1074 // this will create a region with 3 files from flush
1075 assertEquals(3, region
.getStore(family
).getStorefilesCount());
1076 List
<String
> storeFiles
= new ArrayList
<>(3);
1077 for (HStoreFile sf
: region
.getStore(family
).getStorefiles()) {
1078 storeFiles
.add(sf
.getPath().getName());
1081 // now verify that the flush markers are written
1083 WAL
.Reader reader
= WALFactory
.createReader(fs
, AbstractFSWALProvider
.getCurrentFileName(wal
),
1084 TEST_UTIL
.getConfiguration());
1086 List
<WAL
.Entry
> flushDescriptors
= new ArrayList
<>();
1087 long lastFlushSeqId
= -1;
1089 WAL
.Entry entry
= reader
.next();
1090 if (entry
== null) {
1093 Cell cell
= entry
.getEdit().getCells().get(0);
1094 if (WALEdit
.isMetaEditFamily(cell
)) {
1095 FlushDescriptor flushDesc
= WALEdit
.getFlushDescriptor(cell
);
1096 assertNotNull(flushDesc
);
1097 assertArrayEquals(tableName
.getName(), flushDesc
.getTableName().toByteArray());
1098 if (flushDesc
.getAction() == FlushAction
.START_FLUSH
) {
1099 assertTrue(flushDesc
.getFlushSequenceNumber() > lastFlushSeqId
);
1100 } else if (flushDesc
.getAction() == FlushAction
.COMMIT_FLUSH
) {
1101 assertTrue(flushDesc
.getFlushSequenceNumber() == lastFlushSeqId
);
1103 lastFlushSeqId
= flushDesc
.getFlushSequenceNumber();
1104 assertArrayEquals(regionName
, flushDesc
.getEncodedRegionName().toByteArray());
1105 assertEquals(1, flushDesc
.getStoreFlushesCount()); //only one store
1106 StoreFlushDescriptor storeFlushDesc
= flushDesc
.getStoreFlushes(0);
1107 assertArrayEquals(family
, storeFlushDesc
.getFamilyName().toByteArray());
1108 assertEquals("family", storeFlushDesc
.getStoreHomeDir());
1109 if (flushDesc
.getAction() == FlushAction
.START_FLUSH
) {
1110 assertEquals(0, storeFlushDesc
.getFlushOutputCount());
1112 assertEquals(1, storeFlushDesc
.getFlushOutputCount()); //only one file from flush
1113 assertTrue(storeFiles
.contains(storeFlushDesc
.getFlushOutput(0)));
1116 flushDescriptors
.add(entry
);
1120 assertEquals(3 * 2, flushDescriptors
.size()); // START_FLUSH and COMMIT_FLUSH per flush
1122 // now write those markers to the recovered edits again.
1124 Path recoveredEditsDir
= WALSplitUtil
.getRegionDirRecoveredEditsDir(regiondir
);
1126 Path recoveredEdits
= new Path(recoveredEditsDir
, String
.format("%019d", 1000));
1127 fs
.create(recoveredEdits
);
1128 WALProvider
.Writer writer
= wals
.createRecoveredEditsWriter(fs
, recoveredEdits
);
1130 for (WAL
.Entry entry
: flushDescriptors
) {
1131 writer
.append(entry
);
1135 if (null != reader
) {
1138 } catch (IOException exception
) {
1139 LOG
.warn("Problem closing wal: " + exception
.getMessage());
1140 LOG
.debug("exception details", exception
);
1145 // close the region now, and reopen again
1146 HBaseTestingUtility
.closeRegionAndWAL(this.region
);
1147 region
= HRegion
.openHRegion(region
, null);
1149 // now check whether we have can read back the data from region
1150 for (long i
= minSeqId
; i
< maxSeqId
; i
++) {
1151 Get get
= new Get(Bytes
.toBytes(i
));
1152 Result result
= region
.get(get
);
1153 byte[] value
= result
.getValue(family
, Bytes
.toBytes(i
));
1154 assertArrayEquals(Bytes
.toBytes(i
), value
);
1157 HBaseTestingUtility
.closeRegionAndWAL(this.region
);
1163 static class IsFlushWALMarker
implements ArgumentMatcher
<WALEdit
> {
1164 volatile FlushAction
[] actions
;
1165 public IsFlushWALMarker(FlushAction
... actions
) {
1166 this.actions
= actions
;
1169 public boolean matches(WALEdit edit
) {
1170 List
<Cell
> cells
= edit
.getCells();
1171 if (cells
.isEmpty()) {
1174 if (WALEdit
.isMetaEditFamily(cells
.get(0))) {
1175 FlushDescriptor desc
;
1177 desc
= WALEdit
.getFlushDescriptor(cells
.get(0));
1178 } catch (IOException e
) {
1179 LOG
.warn(e
.toString(), e
);
1183 for (FlushAction action
: actions
) {
1184 if (desc
.getAction() == action
) {
1192 public IsFlushWALMarker
set(FlushAction
... actions
) {
1193 this.actions
= actions
;
1199 public void testFlushMarkersWALFail() throws Exception
{
1200 // test the cases where the WAL append for flush markers fail.
1201 byte[] family
= Bytes
.toBytes("family");
1203 // spy an actual WAL implementation to throw exception (was not able to mock)
1204 Path logDir
= TEST_UTIL
.getDataTestDirOnTestFS(method
+ "log");
1206 final Configuration walConf
= new Configuration(TEST_UTIL
.getConfiguration());
1207 FSUtils
.setRootDir(walConf
, logDir
);
1208 // Make up a WAL that we can manipulate at append time.
1209 class FailAppendFlushMarkerWAL
extends FSHLog
{
1210 volatile FlushAction
[] flushActions
= null;
1212 public FailAppendFlushMarkerWAL(FileSystem fs
, Path root
, String logDir
, Configuration conf
)
1213 throws IOException
{
1214 super(fs
, root
, logDir
, conf
);
1218 protected Writer
createWriterInstance(Path path
) throws IOException
{
1219 final Writer w
= super.createWriterInstance(path
);
1220 return new Writer() {
1222 public void close() throws IOException
{
1227 public void sync(boolean forceSync
) throws IOException
{
1232 public void append(Entry entry
) throws IOException
{
1233 List
<Cell
> cells
= entry
.getEdit().getCells();
1234 if (WALEdit
.isMetaEditFamily(cells
.get(0))) {
1235 FlushDescriptor desc
= WALEdit
.getFlushDescriptor(cells
.get(0));
1237 for (FlushAction flushAction
: flushActions
) {
1238 if (desc
.getAction().equals(flushAction
)) {
1239 throw new IOException("Failed to append flush marker! " + flushAction
);
1248 public long getLength() {
1249 return w
.getLength();
1254 FailAppendFlushMarkerWAL wal
=
1255 new FailAppendFlushMarkerWAL(FileSystem
.get(walConf
), FSUtils
.getRootDir(walConf
),
1258 this.region
= initHRegion(tableName
, HConstants
.EMPTY_START_ROW
,
1259 HConstants
.EMPTY_END_ROW
, false, Durability
.USE_DEFAULT
, wal
, family
);
1261 Put put
= new Put(Bytes
.toBytes(i
));
1262 put
.setDurability(Durability
.SKIP_WAL
); // have to skip mocked wal
1263 put
.addColumn(family
, Bytes
.toBytes(i
), Bytes
.toBytes(i
));
1266 // 1. Test case where START_FLUSH throws exception
1267 wal
.flushActions
= new FlushAction
[] {FlushAction
.START_FLUSH
};
1269 // start cache flush will throw exception
1272 fail("This should have thrown exception");
1273 } catch (DroppedSnapshotException unexpected
) {
1274 // this should not be a dropped snapshot exception. Meaning that RS will not abort
1276 } catch (IOException expected
) {
1279 // The WAL is hosed now. It has two edits appended. We cannot roll the log without it
1280 // throwing a DroppedSnapshotException to force an abort. Just clean up the mess.
1284 // 2. Test case where START_FLUSH succeeds but COMMIT_FLUSH will throw exception
1285 wal
.flushActions
= new FlushAction
[] {FlushAction
.COMMIT_FLUSH
};
1286 wal
= new FailAppendFlushMarkerWAL(FileSystem
.get(walConf
), FSUtils
.getRootDir(walConf
),
1289 this.region
= initHRegion(tableName
, HConstants
.EMPTY_START_ROW
,
1290 HConstants
.EMPTY_END_ROW
, false, Durability
.USE_DEFAULT
, wal
, family
);
1292 // 3. Test case where ABORT_FLUSH will throw exception.
1293 // Even if ABORT_FLUSH throws exception, we should not fail with IOE, but continue with
1294 // DroppedSnapshotException. Below COMMIT_FLUSH will cause flush to abort
1295 wal
.flushActions
= new FlushAction
[] {FlushAction
.COMMIT_FLUSH
, FlushAction
.ABORT_FLUSH
};
1299 fail("This should have thrown exception");
1300 } catch (DroppedSnapshotException expected
) {
1301 // we expect this exception, since we were able to write the snapshot, but failed to
1302 // write the flush marker to WAL
1303 } catch (IOException unexpected
) {
1309 public void testGetWhileRegionClose() throws IOException
{
1310 Configuration hc
= initSplit();
1312 byte[][] families
= { fam1
, fam2
, fam3
};
1314 // Setting up region
1315 this.region
= initHRegion(tableName
, method
, hc
, families
);
1316 // Put data in region
1317 final int startRow
= 100;
1318 putData(startRow
, numRows
, qual1
, families
);
1319 putData(startRow
, numRows
, qual2
, families
);
1320 putData(startRow
, numRows
, qual3
, families
);
1321 final AtomicBoolean done
= new AtomicBoolean(false);
1322 final AtomicInteger gets
= new AtomicInteger(0);
1323 GetTillDoneOrException
[] threads
= new GetTillDoneOrException
[10];
1325 // Set ten threads running concurrently getting from the region.
1326 for (int i
= 0; i
< threads
.length
/ 2; i
++) {
1327 threads
[i
] = new GetTillDoneOrException(i
, Bytes
.toBytes("" + startRow
), done
, gets
);
1328 threads
[i
].setDaemon(true);
1331 // Artificially make the condition by setting closing flag explicitly.
1332 // I can't make the issue happen with a call to region.close().
1333 this.region
.closing
.set(true);
1334 for (int i
= threads
.length
/ 2; i
< threads
.length
; i
++) {
1335 threads
[i
] = new GetTillDoneOrException(i
, Bytes
.toBytes("" + startRow
), done
, gets
);
1336 threads
[i
].setDaemon(true);
1340 if (this.region
!= null) {
1341 HBaseTestingUtility
.closeRegionAndWAL(this.region
);
1346 for (GetTillDoneOrException t
: threads
) {
1349 } catch (InterruptedException e
) {
1350 e
.printStackTrace();
1353 LOG
.info("Exception=" + t
.e
);
1354 assertFalse("Found a NPE in " + t
.getName(), t
.e
instanceof NullPointerException
);
1360 * Thread that does get on single row until 'done' flag is flipped. If an
1361 * exception causes us to fail, it records it.
1363 class GetTillDoneOrException
extends Thread
{
1364 private final Get g
;
1365 private final AtomicBoolean done
;
1366 private final AtomicInteger count
;
1367 private Exception e
;
1369 GetTillDoneOrException(final int i
, final byte[] r
, final AtomicBoolean d
,
1370 final AtomicInteger c
) {
1371 super("getter." + i
);
1372 this.g
= new Get(r
);
1379 while (!this.done
.get()) {
1381 assertTrue(region
.get(g
).size() > 0);
1382 this.count
.incrementAndGet();
1383 } catch (Exception e
) {
1392 * An involved filter test. Has multiple column families and deletes in mix.
1395 public void testWeirdCacheBehaviour() throws Exception
{
1396 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
1397 byte[][] FAMILIES
= new byte[][] { Bytes
.toBytes("trans-blob"), Bytes
.toBytes("trans-type"),
1398 Bytes
.toBytes("trans-date"), Bytes
.toBytes("trans-tags"), Bytes
.toBytes("trans-group") };
1399 this.region
= initHRegion(tableName
, method
, CONF
, FAMILIES
);
1400 String value
= "this is the value";
1401 String value2
= "this is some other value";
1402 String keyPrefix1
= "prefix1";
1403 String keyPrefix2
= "prefix2";
1404 String keyPrefix3
= "prefix3";
1405 putRows(this.region
, 3, value
, keyPrefix1
);
1406 putRows(this.region
, 3, value
, keyPrefix2
);
1407 putRows(this.region
, 3, value
, keyPrefix3
);
1408 putRows(this.region
, 3, value2
, keyPrefix1
);
1409 putRows(this.region
, 3, value2
, keyPrefix2
);
1410 putRows(this.region
, 3, value2
, keyPrefix3
);
1411 System
.out
.println("Checking values for key: " + keyPrefix1
);
1412 assertEquals("Got back incorrect number of rows from scan", 3,
1413 getNumberOfRows(keyPrefix1
, value2
, this.region
));
1414 System
.out
.println("Checking values for key: " + keyPrefix2
);
1415 assertEquals("Got back incorrect number of rows from scan", 3,
1416 getNumberOfRows(keyPrefix2
, value2
, this.region
));
1417 System
.out
.println("Checking values for key: " + keyPrefix3
);
1418 assertEquals("Got back incorrect number of rows from scan", 3,
1419 getNumberOfRows(keyPrefix3
, value2
, this.region
));
1420 deleteColumns(this.region
, value2
, keyPrefix1
);
1421 deleteColumns(this.region
, value2
, keyPrefix2
);
1422 deleteColumns(this.region
, value2
, keyPrefix3
);
1423 System
.out
.println("Starting important checks.....");
1424 assertEquals("Got back incorrect number of rows from scan: " + keyPrefix1
, 0,
1425 getNumberOfRows(keyPrefix1
, value2
, this.region
));
1426 assertEquals("Got back incorrect number of rows from scan: " + keyPrefix2
, 0,
1427 getNumberOfRows(keyPrefix2
, value2
, this.region
));
1428 assertEquals("Got back incorrect number of rows from scan: " + keyPrefix3
, 0,
1429 getNumberOfRows(keyPrefix3
, value2
, this.region
));
1433 public void testAppendWithReadOnlyTable() throws Exception
{
1434 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
1435 this.region
= initHRegion(tableName
, method
, CONF
, true, Bytes
.toBytes("somefamily"));
1436 boolean exceptionCaught
= false;
1437 Append append
= new Append(Bytes
.toBytes("somerow"));
1438 append
.setDurability(Durability
.SKIP_WAL
);
1439 append
.addColumn(Bytes
.toBytes("somefamily"), Bytes
.toBytes("somequalifier"),
1440 Bytes
.toBytes("somevalue"));
1442 region
.append(append
);
1443 } catch (IOException e
) {
1444 exceptionCaught
= true;
1446 assertTrue(exceptionCaught
== true);
1450 public void testIncrWithReadOnlyTable() throws Exception
{
1451 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
1452 this.region
= initHRegion(tableName
, method
, CONF
, true, Bytes
.toBytes("somefamily"));
1453 boolean exceptionCaught
= false;
1454 Increment inc
= new Increment(Bytes
.toBytes("somerow"));
1455 inc
.setDurability(Durability
.SKIP_WAL
);
1456 inc
.addColumn(Bytes
.toBytes("somefamily"), Bytes
.toBytes("somequalifier"), 1L);
1458 region
.increment(inc
);
1459 } catch (IOException e
) {
1460 exceptionCaught
= true;
1462 assertTrue(exceptionCaught
== true);
1465 private void deleteColumns(HRegion r
, String value
, String keyPrefix
) throws IOException
{
1466 InternalScanner scanner
= buildScanner(keyPrefix
, value
, r
);
1468 boolean more
= false;
1469 List
<Cell
> results
= new ArrayList
<>();
1471 more
= scanner
.next(results
);
1472 if (results
!= null && !results
.isEmpty())
1476 Delete delete
= new Delete(CellUtil
.cloneRow(results
.get(0)));
1477 delete
.addColumn(Bytes
.toBytes("trans-tags"), Bytes
.toBytes("qual2"));
1481 assertEquals("Did not perform correct number of deletes", 3, count
);
1484 private int getNumberOfRows(String keyPrefix
, String value
, HRegion r
) throws Exception
{
1485 InternalScanner resultScanner
= buildScanner(keyPrefix
, value
, r
);
1486 int numberOfResults
= 0;
1487 List
<Cell
> results
= new ArrayList
<>();
1488 boolean more
= false;
1490 more
= resultScanner
.next(results
);
1491 if (results
!= null && !results
.isEmpty())
1495 for (Cell kv
: results
) {
1496 System
.out
.println("kv=" + kv
.toString() + ", " + Bytes
.toString(CellUtil
.cloneValue(kv
)));
1500 return numberOfResults
;
1503 private InternalScanner
buildScanner(String keyPrefix
, String value
, HRegion r
)
1504 throws IOException
{
1505 // Defaults FilterList.Operator.MUST_PASS_ALL.
1506 FilterList allFilters
= new FilterList();
1507 allFilters
.addFilter(new PrefixFilter(Bytes
.toBytes(keyPrefix
)));
1508 // Only return rows where this column value exists in the row.
1509 SingleColumnValueFilter filter
= new SingleColumnValueFilter(Bytes
.toBytes("trans-tags"),
1510 Bytes
.toBytes("qual2"), CompareOperator
.EQUAL
, Bytes
.toBytes(value
));
1511 filter
.setFilterIfMissing(true);
1512 allFilters
.addFilter(filter
);
1513 Scan scan
= new Scan();
1514 scan
.addFamily(Bytes
.toBytes("trans-blob"));
1515 scan
.addFamily(Bytes
.toBytes("trans-type"));
1516 scan
.addFamily(Bytes
.toBytes("trans-date"));
1517 scan
.addFamily(Bytes
.toBytes("trans-tags"));
1518 scan
.addFamily(Bytes
.toBytes("trans-group"));
1519 scan
.setFilter(allFilters
);
1520 return r
.getScanner(scan
);
1523 private void putRows(HRegion r
, int numRows
, String value
, String key
) throws IOException
{
1524 for (int i
= 0; i
< numRows
; i
++) {
1525 String row
= key
+ "_" + i
/* UUID.randomUUID().toString() */;
1526 System
.out
.println(String
.format("Saving row: %s, with value %s", row
, value
));
1527 Put put
= new Put(Bytes
.toBytes(row
));
1528 put
.setDurability(Durability
.SKIP_WAL
);
1529 put
.addColumn(Bytes
.toBytes("trans-blob"), null, Bytes
.toBytes("value for blob"));
1530 put
.addColumn(Bytes
.toBytes("trans-type"), null, Bytes
.toBytes("statement"));
1531 put
.addColumn(Bytes
.toBytes("trans-date"), null, Bytes
.toBytes("20090921010101999"));
1532 put
.addColumn(Bytes
.toBytes("trans-tags"), Bytes
.toBytes("qual2"), Bytes
.toBytes(value
));
1533 put
.addColumn(Bytes
.toBytes("trans-group"), null, Bytes
.toBytes("adhocTransactionGroupId"));
1539 public void testFamilyWithAndWithoutColon() throws Exception
{
1540 byte[] cf
= Bytes
.toBytes(COLUMN_FAMILY
);
1541 this.region
= initHRegion(tableName
, method
, CONF
, cf
);
1542 Put p
= new Put(tableName
.toBytes());
1543 byte[] cfwithcolon
= Bytes
.toBytes(COLUMN_FAMILY
+ ":");
1544 p
.addColumn(cfwithcolon
, cfwithcolon
, cfwithcolon
);
1545 boolean exception
= false;
1548 } catch (NoSuchColumnFamilyException e
) {
1551 assertTrue(exception
);
1555 public void testBatchPut_whileNoRowLocksHeld() throws IOException
{
1556 final Put
[] puts
= new Put
[10];
1557 MetricsWALSource source
= CompatibilitySingletonFactory
.getInstance(MetricsWALSource
.class);
1558 long syncs
= prepareRegionForBachPut(puts
, source
, false);
1560 OperationStatus
[] codes
= this.region
.batchMutate(puts
);
1561 assertEquals(10, codes
.length
);
1562 for (int i
= 0; i
< 10; i
++) {
1563 assertEquals(OperationStatusCode
.SUCCESS
, codes
[i
].getOperationStatusCode());
1565 metricsAssertHelper
.assertCounter("syncTimeNumOps", syncs
+ 1, source
);
1567 LOG
.info("Next a batch put with one invalid family");
1568 puts
[5].addColumn(Bytes
.toBytes("BAD_CF"), qual
, value
);
1569 codes
= this.region
.batchMutate(puts
);
1570 assertEquals(10, codes
.length
);
1571 for (int i
= 0; i
< 10; i
++) {
1572 assertEquals((i
== 5) ? OperationStatusCode
.BAD_FAMILY
: OperationStatusCode
.SUCCESS
,
1573 codes
[i
].getOperationStatusCode());
1576 metricsAssertHelper
.assertCounter("syncTimeNumOps", syncs
+ 2, source
);
1580 public void testBatchPut_whileMultipleRowLocksHeld() throws Exception
{
1581 final Put
[] puts
= new Put
[10];
1582 MetricsWALSource source
= CompatibilitySingletonFactory
.getInstance(MetricsWALSource
.class);
1583 long syncs
= prepareRegionForBachPut(puts
, source
, false);
1585 puts
[5].addColumn(Bytes
.toBytes("BAD_CF"), qual
, value
);
1587 LOG
.info("batchPut will have to break into four batches to avoid row locks");
1588 RowLock rowLock1
= region
.getRowLock(Bytes
.toBytes("row_2"));
1589 RowLock rowLock2
= region
.getRowLock(Bytes
.toBytes("row_1"));
1590 RowLock rowLock3
= region
.getRowLock(Bytes
.toBytes("row_3"));
1591 RowLock rowLock4
= region
.getRowLock(Bytes
.toBytes("row_3"), true);
1593 MultithreadedTestUtil
.TestContext ctx
= new MultithreadedTestUtil
.TestContext(CONF
);
1594 final AtomicReference
<OperationStatus
[]> retFromThread
= new AtomicReference
<>();
1595 final CountDownLatch startingPuts
= new CountDownLatch(1);
1596 final CountDownLatch startingClose
= new CountDownLatch(1);
1597 TestThread putter
= new TestThread(ctx
) {
1599 public void doWork() throws IOException
{
1600 startingPuts
.countDown();
1601 retFromThread
.set(region
.batchMutate(puts
));
1604 LOG
.info("...starting put thread while holding locks");
1605 ctx
.addThread(putter
);
1608 // Now attempt to close the region from another thread. Prior to HBASE-12565
1609 // this would cause the in-progress batchMutate operation to to fail with
1610 // exception because it use to release and re-acquire the close-guard lock
1611 // between batches. Caller then didn't get status indicating which writes succeeded.
1612 // We now expect this thread to block until the batchMutate call finishes.
1613 Thread regionCloseThread
= new TestThread(ctx
) {
1615 public void doWork() {
1617 startingPuts
.await();
1618 // Give some time for the batch mutate to get in.
1619 // We don't want to race with the mutate
1621 startingClose
.countDown();
1622 HBaseTestingUtility
.closeRegionAndWAL(region
);
1624 } catch (IOException e
) {
1625 throw new RuntimeException(e
);
1626 } catch (InterruptedException e
) {
1627 throw new RuntimeException(e
);
1631 regionCloseThread
.start();
1633 startingClose
.await();
1634 startingPuts
.await();
1636 LOG
.info("...releasing row lock 1, which should let put thread continue");
1640 waitForCounter(source
, "syncTimeNumOps", syncs
+ 1);
1642 LOG
.info("...joining on put thread");
1644 regionCloseThread
.join();
1646 OperationStatus
[] codes
= retFromThread
.get();
1647 for (int i
= 0; i
< codes
.length
; i
++) {
1648 assertEquals((i
== 5) ? OperationStatusCode
.BAD_FAMILY
: OperationStatusCode
.SUCCESS
,
1649 codes
[i
].getOperationStatusCode());
1654 private void waitForCounter(MetricsWALSource source
, String metricName
, long expectedCount
)
1655 throws InterruptedException
{
1656 long startWait
= System
.currentTimeMillis();
1658 while ((currentCount
= metricsAssertHelper
.getCounter(metricName
, source
)) < expectedCount
) {
1660 if (System
.currentTimeMillis() - startWait
> 10000) {
1661 fail(String
.format("Timed out waiting for '%s' >= '%s', currentCount=%s", metricName
,
1662 expectedCount
, currentCount
));
1668 public void testAtomicBatchPut() throws IOException
{
1669 final Put
[] puts
= new Put
[10];
1670 MetricsWALSource source
= CompatibilitySingletonFactory
.getInstance(MetricsWALSource
.class);
1671 long syncs
= prepareRegionForBachPut(puts
, source
, false);
1673 // 1. Straight forward case, should succeed
1674 MutationBatchOperation batchOp
= new MutationBatchOperation(region
, puts
, true,
1675 HConstants
.NO_NONCE
, HConstants
.NO_NONCE
);
1676 OperationStatus
[] codes
= this.region
.batchMutate(batchOp
);
1677 assertEquals(10, codes
.length
);
1678 for (int i
= 0; i
< 10; i
++) {
1679 assertEquals(OperationStatusCode
.SUCCESS
, codes
[i
].getOperationStatusCode());
1681 metricsAssertHelper
.assertCounter("syncTimeNumOps", syncs
+ 1, source
);
1683 // 2. Failed to get lock
1684 RowLock lock
= region
.getRowLock(Bytes
.toBytes("row_" + 3));
1685 // Method {@link HRegion#getRowLock(byte[])} is reentrant. As 'row_3' is locked in this
1686 // thread, need to run {@link HRegion#batchMutate(HRegion.BatchOperation)} in different thread
1687 MultithreadedTestUtil
.TestContext ctx
= new MultithreadedTestUtil
.TestContext(CONF
);
1688 final AtomicReference
<IOException
> retFromThread
= new AtomicReference
<>();
1689 final CountDownLatch finishedPuts
= new CountDownLatch(1);
1690 final MutationBatchOperation finalBatchOp
= new MutationBatchOperation(region
, puts
, true,
1693 HConstants
.NO_NONCE
);
1694 TestThread putter
= new TestThread(ctx
) {
1696 public void doWork() throws IOException
{
1698 region
.batchMutate(finalBatchOp
);
1699 } catch (IOException ioe
) {
1700 LOG
.error("test failed!", ioe
);
1701 retFromThread
.set(ioe
);
1703 finishedPuts
.countDown();
1706 LOG
.info("...starting put thread while holding locks");
1707 ctx
.addThread(putter
);
1709 LOG
.info("...waiting for batch puts while holding locks");
1711 finishedPuts
.await();
1712 } catch (InterruptedException e
) {
1713 LOG
.error("Interrupted!", e
);
1719 assertNotNull(retFromThread
.get());
1720 metricsAssertHelper
.assertCounter("syncTimeNumOps", syncs
+ 1, source
);
1722 // 3. Exception thrown in validation
1723 LOG
.info("Next a batch put with one invalid family");
1724 puts
[5].addColumn(Bytes
.toBytes("BAD_CF"), qual
, value
);
1725 batchOp
= new MutationBatchOperation(region
, puts
, true, HConstants
.NO_NONCE
,
1726 HConstants
.NO_NONCE
);
1727 thrown
.expect(NoSuchColumnFamilyException
.class);
1728 this.region
.batchMutate(batchOp
);
1732 public void testBatchPutWithTsSlop() throws Exception
{
1733 // add data with a timestamp that is too recent for range. Ensure assert
1734 CONF
.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
1735 final Put
[] puts
= new Put
[10];
1736 MetricsWALSource source
= CompatibilitySingletonFactory
.getInstance(MetricsWALSource
.class);
1738 long syncs
= prepareRegionForBachPut(puts
, source
, true);
1740 OperationStatus
[] codes
= this.region
.batchMutate(puts
);
1741 assertEquals(10, codes
.length
);
1742 for (int i
= 0; i
< 10; i
++) {
1743 assertEquals(OperationStatusCode
.SANITY_CHECK_FAILURE
, codes
[i
].getOperationStatusCode());
1745 metricsAssertHelper
.assertCounter("syncTimeNumOps", syncs
, source
);
1749 * @return syncs initial syncTimeNumOps
1751 private long prepareRegionForBachPut(final Put
[] puts
, final MetricsWALSource source
,
1752 boolean slop
) throws IOException
{
1753 this.region
= initHRegion(tableName
, method
, CONF
, COLUMN_FAMILY_BYTES
);
1755 LOG
.info("First a batch put with all valid puts");
1756 for (int i
= 0; i
< puts
.length
; i
++) {
1757 puts
[i
] = slop ?
new Put(Bytes
.toBytes("row_" + i
), Long
.MAX_VALUE
- 100) :
1758 new Put(Bytes
.toBytes("row_" + i
));
1759 puts
[i
].addColumn(COLUMN_FAMILY_BYTES
, qual
, value
);
1762 long syncs
= metricsAssertHelper
.getCounter("syncTimeNumOps", source
);
1763 metricsAssertHelper
.assertCounter("syncTimeNumOps", syncs
, source
);
1767 // ////////////////////////////////////////////////////////////////////////////
1768 // checkAndMutate tests
1769 // ////////////////////////////////////////////////////////////////////////////
1771 public void testCheckAndMutate_WithEmptyRowValue() throws IOException
{
1772 byte[] row1
= Bytes
.toBytes("row1");
1773 byte[] fam1
= Bytes
.toBytes("fam1");
1774 byte[] qf1
= Bytes
.toBytes("qualifier");
1775 byte[] emptyVal
= new byte[] {};
1776 byte[] val1
= Bytes
.toBytes("value1");
1777 byte[] val2
= Bytes
.toBytes("value2");
1779 // Setting up region
1780 this.region
= initHRegion(tableName
, method
, CONF
, fam1
);
1781 // Putting empty data in key
1782 Put put
= new Put(row1
);
1783 put
.addColumn(fam1
, qf1
, emptyVal
);
1785 // checkAndPut with empty value
1786 boolean res
= region
.checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.EQUAL
,
1787 new BinaryComparator(emptyVal
), put
);
1790 // Putting data in key
1791 put
= new Put(row1
);
1792 put
.addColumn(fam1
, qf1
, val1
);
1794 // checkAndPut with correct value
1795 res
= region
.checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.EQUAL
,
1796 new BinaryComparator(emptyVal
), put
);
1799 // not empty anymore
1800 res
= region
.checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.EQUAL
,
1801 new BinaryComparator(emptyVal
), put
);
1804 Delete delete
= new Delete(row1
);
1805 delete
.addColumn(fam1
, qf1
);
1806 res
= region
.checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.EQUAL
,
1807 new BinaryComparator(emptyVal
), delete
);
1810 put
= new Put(row1
);
1811 put
.addColumn(fam1
, qf1
, val2
);
1812 // checkAndPut with correct value
1813 res
= region
.checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.EQUAL
,
1814 new BinaryComparator(val1
), put
);
1817 // checkAndDelete with correct value
1818 delete
= new Delete(row1
);
1819 delete
.addColumn(fam1
, qf1
);
1820 delete
.addColumn(fam1
, qf1
);
1821 res
= region
.checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.EQUAL
,
1822 new BinaryComparator(val2
), delete
);
1825 delete
= new Delete(row1
);
1826 res
= region
.checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.EQUAL
,
1827 new BinaryComparator(emptyVal
), delete
);
1830 // checkAndPut looking for a null value
1831 put
= new Put(row1
);
1832 put
.addColumn(fam1
, qf1
, val1
);
1835 .checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.EQUAL
, new NullComparator(), put
);
1840 public void testCheckAndMutate_WithWrongValue() throws IOException
{
1841 byte[] row1
= Bytes
.toBytes("row1");
1842 byte[] fam1
= Bytes
.toBytes("fam1");
1843 byte[] qf1
= Bytes
.toBytes("qualifier");
1844 byte[] val1
= Bytes
.toBytes("value1");
1845 byte[] val2
= Bytes
.toBytes("value2");
1846 BigDecimal bd1
= new BigDecimal(Double
.MAX_VALUE
);
1847 BigDecimal bd2
= new BigDecimal(Double
.MIN_VALUE
);
1849 // Setting up region
1850 this.region
= initHRegion(tableName
, method
, CONF
, fam1
);
1851 // Putting data in key
1852 Put put
= new Put(row1
);
1853 put
.addColumn(fam1
, qf1
, val1
);
1856 // checkAndPut with wrong value
1857 boolean res
= region
.checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.EQUAL
,
1858 new BinaryComparator(val2
), put
);
1859 assertEquals(false, res
);
1861 // checkAndDelete with wrong value
1862 Delete delete
= new Delete(row1
);
1863 delete
.addFamily(fam1
);
1864 res
= region
.checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.EQUAL
,
1865 new BinaryComparator(val2
), put
);
1866 assertEquals(false, res
);
1868 // Putting data in key
1869 put
= new Put(row1
);
1870 put
.addColumn(fam1
, qf1
, Bytes
.toBytes(bd1
));
1873 // checkAndPut with wrong value
1875 region
.checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.EQUAL
,
1876 new BigDecimalComparator(bd2
), put
);
1877 assertEquals(false, res
);
1879 // checkAndDelete with wrong value
1880 delete
= new Delete(row1
);
1881 delete
.addFamily(fam1
);
1883 region
.checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.EQUAL
,
1884 new BigDecimalComparator(bd2
), put
);
1885 assertEquals(false, res
);
1889 public void testCheckAndMutate_WithCorrectValue() throws IOException
{
1890 byte[] row1
= Bytes
.toBytes("row1");
1891 byte[] fam1
= Bytes
.toBytes("fam1");
1892 byte[] qf1
= Bytes
.toBytes("qualifier");
1893 byte[] val1
= Bytes
.toBytes("value1");
1894 BigDecimal bd1
= new BigDecimal(Double
.MIN_VALUE
);
1896 // Setting up region
1897 this.region
= initHRegion(tableName
, method
, CONF
, fam1
);
1898 // Putting data in key
1899 Put put
= new Put(row1
);
1900 put
.addColumn(fam1
, qf1
, val1
);
1903 // checkAndPut with correct value
1904 boolean res
= region
.checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.EQUAL
,
1905 new BinaryComparator(val1
), put
);
1906 assertEquals(true, res
);
1908 // checkAndDelete with correct value
1909 Delete delete
= new Delete(row1
);
1910 delete
.addColumn(fam1
, qf1
);
1911 res
= region
.checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.EQUAL
, new BinaryComparator(val1
),
1913 assertEquals(true, res
);
1915 // Putting data in key
1916 put
= new Put(row1
);
1917 put
.addColumn(fam1
, qf1
, Bytes
.toBytes(bd1
));
1920 // checkAndPut with correct value
1922 region
.checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.EQUAL
, new BigDecimalComparator(
1924 assertEquals(true, res
);
1926 // checkAndDelete with correct value
1927 delete
= new Delete(row1
);
1928 delete
.addColumn(fam1
, qf1
);
1930 region
.checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.EQUAL
, new BigDecimalComparator(
1932 assertEquals(true, res
);
1936 public void testCheckAndMutate_WithNonEqualCompareOp() throws IOException
{
1937 byte[] row1
= Bytes
.toBytes("row1");
1938 byte[] fam1
= Bytes
.toBytes("fam1");
1939 byte[] qf1
= Bytes
.toBytes("qualifier");
1940 byte[] val1
= Bytes
.toBytes("value1");
1941 byte[] val2
= Bytes
.toBytes("value2");
1942 byte[] val3
= Bytes
.toBytes("value3");
1943 byte[] val4
= Bytes
.toBytes("value4");
1945 // Setting up region
1946 this.region
= initHRegion(tableName
, method
, CONF
, fam1
);
1947 // Putting val3 in key
1948 Put put
= new Put(row1
);
1949 put
.addColumn(fam1
, qf1
, val3
);
1952 // Test CompareOp.LESS: original = val3, compare with val3, fail
1953 boolean res
= region
.checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.LESS
,
1954 new BinaryComparator(val3
), put
);
1955 assertEquals(false, res
);
1957 // Test CompareOp.LESS: original = val3, compare with val4, fail
1958 res
= region
.checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.LESS
,
1959 new BinaryComparator(val4
), put
);
1960 assertEquals(false, res
);
1962 // Test CompareOp.LESS: original = val3, compare with val2,
1963 // succeed (now value = val2)
1964 put
= new Put(row1
);
1965 put
.addColumn(fam1
, qf1
, val2
);
1966 res
= region
.checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.LESS
,
1967 new BinaryComparator(val2
), put
);
1968 assertEquals(true, res
);
1970 // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val3, fail
1971 res
= region
.checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.LESS_OR_EQUAL
,
1972 new BinaryComparator(val3
), put
);
1973 assertEquals(false, res
);
1975 // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val2,
1976 // succeed (value still = val2)
1977 res
= region
.checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.LESS_OR_EQUAL
,
1978 new BinaryComparator(val2
), put
);
1979 assertEquals(true, res
);
1981 // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val1,
1982 // succeed (now value = val3)
1983 put
= new Put(row1
);
1984 put
.addColumn(fam1
, qf1
, val3
);
1985 res
= region
.checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.LESS_OR_EQUAL
,
1986 new BinaryComparator(val1
), put
);
1987 assertEquals(true, res
);
1989 // Test CompareOp.GREATER: original = val3, compare with val3, fail
1990 res
= region
.checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.GREATER
,
1991 new BinaryComparator(val3
), put
);
1992 assertEquals(false, res
);
1994 // Test CompareOp.GREATER: original = val3, compare with val2, fail
1995 res
= region
.checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.GREATER
,
1996 new BinaryComparator(val2
), put
);
1997 assertEquals(false, res
);
1999 // Test CompareOp.GREATER: original = val3, compare with val4,
2000 // succeed (now value = val2)
2001 put
= new Put(row1
);
2002 put
.addColumn(fam1
, qf1
, val2
);
2003 res
= region
.checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.GREATER
,
2004 new BinaryComparator(val4
), put
);
2005 assertEquals(true, res
);
2007 // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val1, fail
2008 res
= region
.checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.GREATER_OR_EQUAL
,
2009 new BinaryComparator(val1
), put
);
2010 assertEquals(false, res
);
2012 // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val2,
2013 // succeed (value still = val2)
2014 res
= region
.checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.GREATER_OR_EQUAL
,
2015 new BinaryComparator(val2
), put
);
2016 assertEquals(true, res
);
2018 // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val3, succeed
2019 res
= region
.checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.GREATER_OR_EQUAL
,
2020 new BinaryComparator(val3
), put
);
2021 assertEquals(true, res
);
2025 public void testCheckAndPut_ThatPutWasWritten() throws IOException
{
2026 byte[] row1
= Bytes
.toBytes("row1");
2027 byte[] fam1
= Bytes
.toBytes("fam1");
2028 byte[] fam2
= Bytes
.toBytes("fam2");
2029 byte[] qf1
= Bytes
.toBytes("qualifier");
2030 byte[] val1
= Bytes
.toBytes("value1");
2031 byte[] val2
= Bytes
.toBytes("value2");
2033 byte[][] families
= { fam1
, fam2
};
2035 // Setting up region
2036 this.region
= initHRegion(tableName
, method
, CONF
, families
);
2037 // Putting data in the key to check
2038 Put put
= new Put(row1
);
2039 put
.addColumn(fam1
, qf1
, val1
);
2042 // Creating put to add
2043 long ts
= System
.currentTimeMillis();
2044 KeyValue kv
= new KeyValue(row1
, fam2
, qf1
, ts
, KeyValue
.Type
.Put
, val2
);
2045 put
= new Put(row1
);
2048 // checkAndPut with wrong value
2049 boolean res
= region
.checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.EQUAL
,
2050 new BinaryComparator(val1
), put
);
2051 assertEquals(true, res
);
2053 Get get
= new Get(row1
);
2054 get
.addColumn(fam2
, qf1
);
2055 Cell
[] actual
= region
.get(get
).rawCells();
2057 Cell
[] expected
= { kv
};
2059 assertEquals(expected
.length
, actual
.length
);
2060 for (int i
= 0; i
< actual
.length
; i
++) {
2061 assertEquals(expected
[i
], actual
[i
]);
2066 public void testCheckAndPut_wrongRowInPut() throws IOException
{
2067 this.region
= initHRegion(tableName
, method
, CONF
, COLUMNS
);
2068 Put put
= new Put(row2
);
2069 put
.addColumn(fam1
, qual1
, value1
);
2071 region
.checkAndMutate(row
, fam1
, qual1
, CompareOperator
.EQUAL
,
2072 new BinaryComparator(value2
), put
);
2074 } catch (org
.apache
.hadoop
.hbase
.DoNotRetryIOException expected
) {
2075 // expected exception.
2080 public void testCheckAndDelete_ThatDeleteWasWritten() throws IOException
{
2081 byte[] row1
= Bytes
.toBytes("row1");
2082 byte[] fam1
= Bytes
.toBytes("fam1");
2083 byte[] fam2
= Bytes
.toBytes("fam2");
2084 byte[] qf1
= Bytes
.toBytes("qualifier1");
2085 byte[] qf2
= Bytes
.toBytes("qualifier2");
2086 byte[] qf3
= Bytes
.toBytes("qualifier3");
2087 byte[] val1
= Bytes
.toBytes("value1");
2088 byte[] val2
= Bytes
.toBytes("value2");
2089 byte[] val3
= Bytes
.toBytes("value3");
2090 byte[] emptyVal
= new byte[] {};
2092 byte[][] families
= { fam1
, fam2
};
2094 // Setting up region
2095 this.region
= initHRegion(tableName
, method
, CONF
, families
);
2097 Put put
= new Put(row1
);
2098 put
.addColumn(fam1
, qf1
, val1
);
2102 put
= new Put(row1
);
2103 put
.addColumn(fam1
, qf1
, val2
);
2104 put
.addColumn(fam2
, qf1
, val3
);
2105 put
.addColumn(fam2
, qf2
, val2
);
2106 put
.addColumn(fam2
, qf3
, val1
);
2107 put
.addColumn(fam1
, qf3
, val1
);
2110 // Multi-column delete
2111 Delete delete
= new Delete(row1
);
2112 delete
.addColumn(fam1
, qf1
);
2113 delete
.addColumn(fam2
, qf1
);
2114 delete
.addColumn(fam1
, qf3
);
2115 boolean res
= region
.checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.EQUAL
,
2116 new BinaryComparator(val2
), delete
);
2117 assertEquals(true, res
);
2119 Get get
= new Get(row1
);
2120 get
.addColumn(fam1
, qf1
);
2121 get
.addColumn(fam1
, qf3
);
2122 get
.addColumn(fam2
, qf2
);
2123 Result r
= region
.get(get
);
2124 assertEquals(2, r
.size());
2125 assertArrayEquals(val1
, r
.getValue(fam1
, qf1
));
2126 assertArrayEquals(val2
, r
.getValue(fam2
, qf2
));
2129 delete
= new Delete(row1
);
2130 delete
.addFamily(fam2
);
2131 res
= region
.checkAndMutate(row1
, fam2
, qf1
, CompareOperator
.EQUAL
,
2132 new BinaryComparator(emptyVal
), delete
);
2133 assertEquals(true, res
);
2135 get
= new Get(row1
);
2136 r
= region
.get(get
);
2137 assertEquals(1, r
.size());
2138 assertArrayEquals(val1
, r
.getValue(fam1
, qf1
));
2141 delete
= new Delete(row1
);
2142 res
= region
.checkAndMutate(row1
, fam1
, qf1
, CompareOperator
.EQUAL
, new BinaryComparator(val1
),
2144 assertEquals(true, res
);
2145 get
= new Get(row1
);
2146 r
= region
.get(get
);
2147 assertEquals(0, r
.size());
2150 // ////////////////////////////////////////////////////////////////////////////
2152 // ////////////////////////////////////////////////////////////////////////////
2154 public void testDelete_multiDeleteColumn() throws IOException
{
2155 byte[] row1
= Bytes
.toBytes("row1");
2156 byte[] fam1
= Bytes
.toBytes("fam1");
2157 byte[] qual
= Bytes
.toBytes("qualifier");
2158 byte[] value
= Bytes
.toBytes("value");
2160 Put put
= new Put(row1
);
2161 put
.addColumn(fam1
, qual
, 1, value
);
2162 put
.addColumn(fam1
, qual
, 2, value
);
2164 this.region
= initHRegion(tableName
, method
, CONF
, fam1
);
2167 // We do support deleting more than 1 'latest' version
2168 Delete delete
= new Delete(row1
);
2169 delete
.addColumn(fam1
, qual
);
2170 delete
.addColumn(fam1
, qual
);
2171 region
.delete(delete
);
2173 Get get
= new Get(row1
);
2174 get
.addFamily(fam1
);
2175 Result r
= region
.get(get
);
2176 assertEquals(0, r
.size());
2180 public void testDelete_CheckFamily() throws IOException
{
2181 byte[] row1
= Bytes
.toBytes("row1");
2182 byte[] fam1
= Bytes
.toBytes("fam1");
2183 byte[] fam2
= Bytes
.toBytes("fam2");
2184 byte[] fam3
= Bytes
.toBytes("fam3");
2185 byte[] fam4
= Bytes
.toBytes("fam4");
2187 // Setting up region
2188 this.region
= initHRegion(tableName
, method
, CONF
, fam1
, fam2
, fam3
);
2189 List
<Cell
> kvs
= new ArrayList
<>();
2190 kvs
.add(new KeyValue(row1
, fam4
, null, null));
2192 // testing existing family
2193 byte[] family
= fam2
;
2194 NavigableMap
<byte[], List
<Cell
>> deleteMap
= new TreeMap
<>(Bytes
.BYTES_COMPARATOR
);
2195 deleteMap
.put(family
, kvs
);
2196 region
.delete(deleteMap
, Durability
.SYNC_WAL
);
2198 // testing non existing family
2202 deleteMap
= new TreeMap
<>(Bytes
.BYTES_COMPARATOR
);
2203 deleteMap
.put(family
, kvs
);
2204 region
.delete(deleteMap
, Durability
.SYNC_WAL
);
2205 } catch (Exception e
) {
2208 assertTrue("Family " + new String(family
, StandardCharsets
.UTF_8
) + " does exist", ok
);
2212 public void testDelete_mixed() throws IOException
, InterruptedException
{
2213 byte[] fam
= Bytes
.toBytes("info");
2214 byte[][] families
= { fam
};
2215 this.region
= initHRegion(tableName
, method
, CONF
, families
);
2216 EnvironmentEdgeManagerTestHelper
.injectEdge(new IncrementingEnvironmentEdge());
2218 byte[] row
= Bytes
.toBytes("table_name");
2220 byte[] serverinfo
= Bytes
.toBytes("serverinfo");
2221 byte[] splitA
= Bytes
.toBytes("splitA");
2222 byte[] splitB
= Bytes
.toBytes("splitB");
2225 Put put
= new Put(row
);
2226 put
.addColumn(fam
, splitA
, Bytes
.toBytes("reference_A"));
2230 put
.addColumn(fam
, splitB
, Bytes
.toBytes("reference_B"));
2234 put
.addColumn(fam
, serverinfo
, Bytes
.toBytes("ip_address"));
2237 // ok now delete a split:
2238 Delete delete
= new Delete(row
);
2239 delete
.addColumns(fam
, splitA
);
2240 region
.delete(delete
);
2242 // assert some things:
2243 Get get
= new Get(row
).addColumn(fam
, serverinfo
);
2244 Result result
= region
.get(get
);
2245 assertEquals(1, result
.size());
2247 get
= new Get(row
).addColumn(fam
, splitA
);
2248 result
= region
.get(get
);
2249 assertEquals(0, result
.size());
2251 get
= new Get(row
).addColumn(fam
, splitB
);
2252 result
= region
.get(get
);
2253 assertEquals(1, result
.size());
2255 // Assert that after a delete, I can put.
2257 put
.addColumn(fam
, splitA
, Bytes
.toBytes("reference_A"));
2260 result
= region
.get(get
);
2261 assertEquals(3, result
.size());
2263 // Now delete all... then test I can add stuff back
2264 delete
= new Delete(row
);
2265 region
.delete(delete
);
2266 assertEquals(0, region
.get(get
).size());
2268 region
.put(new Put(row
).addColumn(fam
, splitA
, Bytes
.toBytes("reference_A")));
2269 result
= region
.get(get
);
2270 assertEquals(1, result
.size());
2274 public void testDeleteRowWithFutureTs() throws IOException
{
2275 byte[] fam
= Bytes
.toBytes("info");
2276 byte[][] families
= { fam
};
2277 this.region
= initHRegion(tableName
, method
, CONF
, families
);
2278 byte[] row
= Bytes
.toBytes("table_name");
2280 byte[] serverinfo
= Bytes
.toBytes("serverinfo");
2282 // add data in the far future
2283 Put put
= new Put(row
);
2284 put
.addColumn(fam
, serverinfo
, HConstants
.LATEST_TIMESTAMP
- 5, Bytes
.toBytes("value"));
2287 // now delete something in the present
2288 Delete delete
= new Delete(row
);
2289 region
.delete(delete
);
2291 // make sure we still see our data
2292 Get get
= new Get(row
).addColumn(fam
, serverinfo
);
2293 Result result
= region
.get(get
);
2294 assertEquals(1, result
.size());
2296 // delete the future row
2297 delete
= new Delete(row
, HConstants
.LATEST_TIMESTAMP
- 3);
2298 region
.delete(delete
);
2300 // make sure it is gone
2301 get
= new Get(row
).addColumn(fam
, serverinfo
);
2302 result
= region
.get(get
);
2303 assertEquals(0, result
.size());
2307 * Tests that the special LATEST_TIMESTAMP option for puts gets replaced by
2308 * the actual timestamp
2311 public void testPutWithLatestTS() throws IOException
{
2312 byte[] fam
= Bytes
.toBytes("info");
2313 byte[][] families
= { fam
};
2314 this.region
= initHRegion(tableName
, method
, CONF
, families
);
2315 byte[] row
= Bytes
.toBytes("row1");
2317 byte[] qual
= Bytes
.toBytes("qual");
2319 // add data with LATEST_TIMESTAMP, put without WAL
2320 Put put
= new Put(row
);
2321 put
.addColumn(fam
, qual
, HConstants
.LATEST_TIMESTAMP
, Bytes
.toBytes("value"));
2324 // Make sure it shows up with an actual timestamp
2325 Get get
= new Get(row
).addColumn(fam
, qual
);
2326 Result result
= region
.get(get
);
2327 assertEquals(1, result
.size());
2328 Cell kv
= result
.rawCells()[0];
2329 LOG
.info("Got: " + kv
);
2330 assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp",
2331 kv
.getTimestamp() != HConstants
.LATEST_TIMESTAMP
);
2333 // Check same with WAL enabled (historically these took different
2334 // code paths, so check both)
2335 row
= Bytes
.toBytes("row2");
2337 put
.addColumn(fam
, qual
, HConstants
.LATEST_TIMESTAMP
, Bytes
.toBytes("value"));
2340 // Make sure it shows up with an actual timestamp
2341 get
= new Get(row
).addColumn(fam
, qual
);
2342 result
= region
.get(get
);
2343 assertEquals(1, result
.size());
2344 kv
= result
.rawCells()[0];
2345 LOG
.info("Got: " + kv
);
2346 assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp",
2347 kv
.getTimestamp() != HConstants
.LATEST_TIMESTAMP
);
2351 * Tests that there is server-side filtering for invalid timestamp upper
2352 * bound. Note that the timestamp lower bound is automatically handled for us
2356 public void testPutWithTsSlop() throws IOException
{
2357 byte[] fam
= Bytes
.toBytes("info");
2358 byte[][] families
= { fam
};
2360 // add data with a timestamp that is too recent for range. Ensure assert
2361 CONF
.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
2362 this.region
= initHRegion(tableName
, method
, CONF
, families
);
2363 boolean caughtExcep
= false;
2365 // no TS specified == use latest. should not error
2366 region
.put(new Put(row
).addColumn(fam
, Bytes
.toBytes("qual"), Bytes
.toBytes("value")));
2367 // TS out of range. should error
2368 region
.put(new Put(row
).addColumn(fam
, Bytes
.toBytes("qual"),
2369 System
.currentTimeMillis() + 2000, Bytes
.toBytes("value")));
2370 fail("Expected IOE for TS out of configured timerange");
2371 } catch (FailedSanityCheckException ioe
) {
2372 LOG
.debug("Received expected exception", ioe
);
2375 assertTrue("Should catch FailedSanityCheckException", caughtExcep
);
2379 public void testScanner_DeleteOneFamilyNotAnother() throws IOException
{
2380 byte[] fam1
= Bytes
.toBytes("columnA");
2381 byte[] fam2
= Bytes
.toBytes("columnB");
2382 this.region
= initHRegion(tableName
, method
, CONF
, fam1
, fam2
);
2383 byte[] rowA
= Bytes
.toBytes("rowA");
2384 byte[] rowB
= Bytes
.toBytes("rowB");
2386 byte[] value
= Bytes
.toBytes("value");
2388 Delete delete
= new Delete(rowA
);
2389 delete
.addFamily(fam1
);
2391 region
.delete(delete
);
2394 Put put
= new Put(rowA
);
2395 put
.addColumn(fam2
, null, value
);
2398 put
= new Put(rowB
);
2399 put
.addColumn(fam1
, null, value
);
2400 put
.addColumn(fam2
, null, value
);
2403 Scan scan
= new Scan();
2404 scan
.addFamily(fam1
).addFamily(fam2
);
2405 InternalScanner s
= region
.getScanner(scan
);
2406 List
<Cell
> results
= new ArrayList
<>();
2408 assertTrue(CellUtil
.matchingRows(results
.get(0), rowA
));
2412 assertTrue(CellUtil
.matchingRows(results
.get(0), rowB
));
2416 public void testDataInMemoryWithoutWAL() throws IOException
{
2417 FileSystem fs
= FileSystem
.get(CONF
);
2418 Path rootDir
= new Path(dir
+ "testDataInMemoryWithoutWAL");
2419 FSHLog hLog
= new FSHLog(fs
, rootDir
, "testDataInMemoryWithoutWAL", CONF
);
2421 // This chunk creation is done throughout the code base. Do we want to move it into core?
2422 // It is missing from this test. W/o it we NPE.
2423 region
= initHRegion(tableName
, null, null, false, Durability
.SYNC_WAL
, hLog
,
2424 COLUMN_FAMILY_BYTES
);
2426 Cell originalCell
= ExtendedCellBuilderFactory
.create(CellBuilderType
.DEEP_COPY
)
2428 .setFamily(COLUMN_FAMILY_BYTES
)
2429 .setQualifier(qual1
)
2430 .setTimestamp(System
.currentTimeMillis())
2431 .setType(KeyValue
.Type
.Put
.getCode())
2434 final long originalSize
= originalCell
.getSerializedSize();
2436 Cell addCell
= ExtendedCellBuilderFactory
.create(CellBuilderType
.DEEP_COPY
)
2438 .setFamily(COLUMN_FAMILY_BYTES
)
2439 .setQualifier(qual1
)
2440 .setTimestamp(System
.currentTimeMillis())
2441 .setType(KeyValue
.Type
.Put
.getCode())
2442 .setValue(Bytes
.toBytes("xxxxxxxxxx"))
2444 final long addSize
= addCell
.getSerializedSize();
2446 LOG
.info("originalSize:" + originalSize
2447 + ", addSize:" + addSize
);
2448 // start test. We expect that the addPut's durability will be replaced
2449 // by originalPut's durability.
2452 testDataInMemoryWithoutWAL(region
,
2453 new Put(row
).add(originalCell
).setDurability(Durability
.SKIP_WAL
),
2454 new Put(row
).add(addCell
).setDurability(Durability
.SKIP_WAL
),
2455 originalSize
+ addSize
);
2458 testDataInMemoryWithoutWAL(region
,
2459 new Put(row
).add(originalCell
).setDurability(Durability
.SKIP_WAL
),
2460 new Put(row
).add(addCell
).setDurability(Durability
.SYNC_WAL
),
2461 originalSize
+ addSize
);
2464 testDataInMemoryWithoutWAL(region
,
2465 new Put(row
).add(originalCell
).setDurability(Durability
.SYNC_WAL
),
2466 new Put(row
).add(addCell
).setDurability(Durability
.SKIP_WAL
),
2470 testDataInMemoryWithoutWAL(region
,
2471 new Put(row
).add(originalCell
).setDurability(Durability
.SYNC_WAL
),
2472 new Put(row
).add(addCell
).setDurability(Durability
.SYNC_WAL
),
2476 private static void testDataInMemoryWithoutWAL(HRegion region
, Put originalPut
,
2477 final Put addPut
, long delta
) throws IOException
{
2478 final long initSize
= region
.getDataInMemoryWithoutWAL();
2479 // save normalCPHost and replaced by mockedCPHost
2480 RegionCoprocessorHost normalCPHost
= region
.getCoprocessorHost();
2481 RegionCoprocessorHost mockedCPHost
= Mockito
.mock(RegionCoprocessorHost
.class);
2482 // Because the preBatchMutate returns void, we can't do usual Mockito when...then form. Must
2483 // do below format (from Mockito doc).
2484 Mockito
.doAnswer(new Answer
<Void
>() {
2486 public Void
answer(InvocationOnMock invocation
) throws Throwable
{
2487 MiniBatchOperationInProgress
<Mutation
> mb
= invocation
.getArgument(0);
2488 mb
.addOperationsFromCP(0, new Mutation
[]{addPut
});
2491 }).when(mockedCPHost
).preBatchMutate(Mockito
.isA(MiniBatchOperationInProgress
.class));
2492 ColumnFamilyDescriptorBuilder builder
= ColumnFamilyDescriptorBuilder
.
2493 newBuilder(COLUMN_FAMILY_BYTES
);
2494 ScanInfo info
= new ScanInfo(CONF
, builder
.build(), Long
.MAX_VALUE
,
2495 Long
.MAX_VALUE
, region
.getCellComparator());
2496 Mockito
.when(mockedCPHost
.preFlushScannerOpen(Mockito
.any(HStore
.class),
2497 Mockito
.any())).thenReturn(info
);
2498 Mockito
.when(mockedCPHost
.preFlush(Mockito
.any(), Mockito
.any(StoreScanner
.class),
2499 Mockito
.any())).thenAnswer(i
-> i
.getArgument(1));
2500 region
.setCoprocessorHost(mockedCPHost
);
2502 region
.put(originalPut
);
2503 region
.setCoprocessorHost(normalCPHost
);
2504 final long finalSize
= region
.getDataInMemoryWithoutWAL();
2505 assertEquals("finalSize:" + finalSize
+ ", initSize:"
2506 + initSize
+ ", delta:" + delta
,finalSize
, initSize
+ delta
);
2510 public void testDeleteColumns_PostInsert() throws IOException
, InterruptedException
{
2511 Delete delete
= new Delete(row
);
2512 delete
.addColumns(fam1
, qual1
);
2513 doTestDelete_AndPostInsert(delete
);
2517 public void testaddFamily_PostInsert() throws IOException
, InterruptedException
{
2518 Delete delete
= new Delete(row
);
2519 delete
.addFamily(fam1
);
2520 doTestDelete_AndPostInsert(delete
);
2523 public void doTestDelete_AndPostInsert(Delete delete
) throws IOException
, InterruptedException
{
2524 this.region
= initHRegion(tableName
, method
, CONF
, fam1
);
2525 EnvironmentEdgeManagerTestHelper
.injectEdge(new IncrementingEnvironmentEdge());
2526 Put put
= new Put(row
);
2527 put
.addColumn(fam1
, qual1
, value1
);
2530 // now delete the value:
2531 region
.delete(delete
);
2535 put
.addColumn(fam1
, qual1
, value2
);
2539 Get get
= new Get(row
);
2540 get
.addColumn(fam1
, qual1
);
2542 Result r
= region
.get(get
);
2543 assertEquals(1, r
.size());
2544 assertArrayEquals(value2
, r
.getValue(fam1
, qual1
));
2547 Scan scan
= new Scan(row
);
2548 scan
.addColumn(fam1
, qual1
);
2549 InternalScanner s
= region
.getScanner(scan
);
2551 List
<Cell
> results
= new ArrayList
<>();
2552 assertEquals(false, s
.next(results
));
2553 assertEquals(1, results
.size());
2554 Cell kv
= results
.get(0);
2556 assertArrayEquals(value2
, CellUtil
.cloneValue(kv
));
2557 assertArrayEquals(fam1
, CellUtil
.cloneFamily(kv
));
2558 assertArrayEquals(qual1
, CellUtil
.cloneQualifier(kv
));
2559 assertArrayEquals(row
, CellUtil
.cloneRow(kv
));
2563 public void testDelete_CheckTimestampUpdated() throws IOException
{
2564 byte[] row1
= Bytes
.toBytes("row1");
2565 byte[] col1
= Bytes
.toBytes("col1");
2566 byte[] col2
= Bytes
.toBytes("col2");
2567 byte[] col3
= Bytes
.toBytes("col3");
2569 // Setting up region
2570 this.region
= initHRegion(tableName
, method
, CONF
, fam1
);
2571 // Building checkerList
2572 List
<Cell
> kvs
= new ArrayList
<>();
2573 kvs
.add(new KeyValue(row1
, fam1
, col1
, null));
2574 kvs
.add(new KeyValue(row1
, fam1
, col2
, null));
2575 kvs
.add(new KeyValue(row1
, fam1
, col3
, null));
2577 NavigableMap
<byte[], List
<Cell
>> deleteMap
= new TreeMap
<>(Bytes
.BYTES_COMPARATOR
);
2578 deleteMap
.put(fam1
, kvs
);
2579 region
.delete(deleteMap
, Durability
.SYNC_WAL
);
2581 // extract the key values out the memstore:
2582 // This is kinda hacky, but better than nothing...
2583 long now
= System
.currentTimeMillis();
2584 AbstractMemStore memstore
= (AbstractMemStore
)region
.getStore(fam1
).memstore
;
2585 Cell firstCell
= memstore
.getActive().first();
2586 assertTrue(firstCell
.getTimestamp() <= now
);
2587 now
= firstCell
.getTimestamp();
2588 for (Cell cell
: memstore
.getActive().getCellSet()) {
2589 assertTrue(cell
.getTimestamp() <= now
);
2590 now
= cell
.getTimestamp();
2594 // ////////////////////////////////////////////////////////////////////////////
2596 // ////////////////////////////////////////////////////////////////////////////
2598 public void testGet_FamilyChecker() throws IOException
{
2599 byte[] row1
= Bytes
.toBytes("row1");
2600 byte[] fam1
= Bytes
.toBytes("fam1");
2601 byte[] fam2
= Bytes
.toBytes("False");
2602 byte[] col1
= Bytes
.toBytes("col1");
2604 // Setting up region
2605 this.region
= initHRegion(tableName
, method
, CONF
, fam1
);
2606 Get get
= new Get(row1
);
2607 get
.addColumn(fam2
, col1
);
2612 fail("Expecting DoNotRetryIOException in get but did not get any");
2613 } catch (org
.apache
.hadoop
.hbase
.DoNotRetryIOException e
) {
2614 LOG
.info("Got expected DoNotRetryIOException successfully");
2619 public void testGet_Basic() throws IOException
{
2620 byte[] row1
= Bytes
.toBytes("row1");
2621 byte[] fam1
= Bytes
.toBytes("fam1");
2622 byte[] col1
= Bytes
.toBytes("col1");
2623 byte[] col2
= Bytes
.toBytes("col2");
2624 byte[] col3
= Bytes
.toBytes("col3");
2625 byte[] col4
= Bytes
.toBytes("col4");
2626 byte[] col5
= Bytes
.toBytes("col5");
2628 // Setting up region
2629 this.region
= initHRegion(tableName
, method
, CONF
, fam1
);
2631 Put put
= new Put(row1
);
2632 put
.addColumn(fam1
, col1
, null);
2633 put
.addColumn(fam1
, col2
, null);
2634 put
.addColumn(fam1
, col3
, null);
2635 put
.addColumn(fam1
, col4
, null);
2636 put
.addColumn(fam1
, col5
, null);
2639 Get get
= new Get(row1
);
2640 get
.addColumn(fam1
, col2
);
2641 get
.addColumn(fam1
, col4
);
2643 KeyValue kv1
= new KeyValue(row1
, fam1
, col2
);
2644 KeyValue kv2
= new KeyValue(row1
, fam1
, col4
);
2645 KeyValue
[] expected
= { kv1
, kv2
};
2648 Result res
= region
.get(get
);
2649 assertEquals(expected
.length
, res
.size());
2650 for (int i
= 0; i
< res
.size(); i
++) {
2651 assertTrue(CellUtil
.matchingRows(expected
[i
], res
.rawCells()[i
]));
2652 assertTrue(CellUtil
.matchingFamily(expected
[i
], res
.rawCells()[i
]));
2653 assertTrue(CellUtil
.matchingQualifier(expected
[i
], res
.rawCells()[i
]));
2656 // Test using a filter on a Get
2657 Get g
= new Get(row1
);
2658 final int count
= 2;
2659 g
.setFilter(new ColumnCountGetFilter(count
));
2660 res
= region
.get(g
);
2661 assertEquals(count
, res
.size());
2665 public void testGet_Empty() throws IOException
{
2666 byte[] row
= Bytes
.toBytes("row");
2667 byte[] fam
= Bytes
.toBytes("fam");
2669 this.region
= initHRegion(tableName
, method
, CONF
, fam
);
2670 Get get
= new Get(row
);
2672 Result r
= region
.get(get
);
2674 assertTrue(r
.isEmpty());
2678 public void testGetWithFilter() throws IOException
, InterruptedException
{
2679 byte[] row1
= Bytes
.toBytes("row1");
2680 byte[] fam1
= Bytes
.toBytes("fam1");
2681 byte[] col1
= Bytes
.toBytes("col1");
2682 byte[] value1
= Bytes
.toBytes("value1");
2683 byte[] value2
= Bytes
.toBytes("value2");
2685 final int maxVersions
= 3;
2686 HColumnDescriptor hcd
= new HColumnDescriptor(fam1
);
2687 hcd
.setMaxVersions(maxVersions
);
2688 HTableDescriptor htd
= new HTableDescriptor(TableName
.valueOf("testFilterAndColumnTracker"));
2690 ChunkCreator
.initialize(MemStoreLABImpl
.CHUNK_SIZE_DEFAULT
, false, 0, 0, 0, null);
2691 HRegionInfo info
= new HRegionInfo(htd
.getTableName(), null, null, false);
2692 Path logDir
= TEST_UTIL
.getDataTestDirOnTestFS(method
+ ".log");
2693 final WAL wal
= HBaseTestingUtility
.createWal(TEST_UTIL
.getConfiguration(), logDir
, info
);
2694 this.region
= TEST_UTIL
.createLocalHRegion(info
, htd
, wal
);
2696 // Put 4 version to memstore
2698 Put put
= new Put(row1
, ts
);
2699 put
.addColumn(fam1
, col1
, value1
);
2701 put
= new Put(row1
, ts
+ 1);
2702 put
.addColumn(fam1
, col1
, Bytes
.toBytes("filter1"));
2704 put
= new Put(row1
, ts
+ 2);
2705 put
.addColumn(fam1
, col1
, Bytes
.toBytes("filter2"));
2707 put
= new Put(row1
, ts
+ 3);
2708 put
.addColumn(fam1
, col1
, value2
);
2711 Get get
= new Get(row1
);
2712 get
.readAllVersions();
2713 Result res
= region
.get(get
);
2714 // Get 3 versions, the oldest version has gone from user view
2715 assertEquals(maxVersions
, res
.size());
2717 get
.setFilter(new ValueFilter(CompareOperator
.EQUAL
, new SubstringComparator("value")));
2718 res
= region
.get(get
);
2719 // When use value filter, the oldest version should still gone from user view and it
2720 // should only return one key vaule
2721 assertEquals(1, res
.size());
2722 assertTrue(CellUtil
.matchingValue(new KeyValue(row1
, fam1
, col1
, value2
), res
.rawCells()[0]));
2723 assertEquals(ts
+ 3, res
.rawCells()[0].getTimestamp());
2726 region
.compact(true);
2728 res
= region
.get(get
);
2729 // After flush and compact, the result should be consistent with previous result
2730 assertEquals(1, res
.size());
2731 assertTrue(CellUtil
.matchingValue(new KeyValue(row1
, fam1
, col1
, value2
), res
.rawCells()[0]));
2734 // ////////////////////////////////////////////////////////////////////////////
2736 // ////////////////////////////////////////////////////////////////////////////
2738 public void testGetScanner_WithOkFamilies() throws IOException
{
2739 byte[] fam1
= Bytes
.toBytes("fam1");
2740 byte[] fam2
= Bytes
.toBytes("fam2");
2742 byte[][] families
= { fam1
, fam2
};
2744 // Setting up region
2745 this.region
= initHRegion(tableName
, method
, CONF
, families
);
2746 Scan scan
= new Scan();
2747 scan
.addFamily(fam1
);
2748 scan
.addFamily(fam2
);
2750 region
.getScanner(scan
);
2751 } catch (Exception e
) {
2752 assertTrue("Families could not be found in Region", false);
2757 public void testGetScanner_WithNotOkFamilies() throws IOException
{
2758 byte[] fam1
= Bytes
.toBytes("fam1");
2759 byte[] fam2
= Bytes
.toBytes("fam2");
2761 byte[][] families
= { fam1
};
2763 // Setting up region
2764 this.region
= initHRegion(tableName
, method
, CONF
, families
);
2765 Scan scan
= new Scan();
2766 scan
.addFamily(fam2
);
2769 region
.getScanner(scan
);
2770 } catch (Exception e
) {
2773 assertTrue("Families could not be found in Region", ok
);
2777 public void testGetScanner_WithNoFamilies() throws IOException
{
2778 byte[] row1
= Bytes
.toBytes("row1");
2779 byte[] fam1
= Bytes
.toBytes("fam1");
2780 byte[] fam2
= Bytes
.toBytes("fam2");
2781 byte[] fam3
= Bytes
.toBytes("fam3");
2782 byte[] fam4
= Bytes
.toBytes("fam4");
2784 byte[][] families
= { fam1
, fam2
, fam3
, fam4
};
2786 // Setting up region
2787 this.region
= initHRegion(tableName
, method
, CONF
, families
);
2788 // Putting data in Region
2789 Put put
= new Put(row1
);
2790 put
.addColumn(fam1
, null, null);
2791 put
.addColumn(fam2
, null, null);
2792 put
.addColumn(fam3
, null, null);
2793 put
.addColumn(fam4
, null, null);
2797 HRegion
.RegionScannerImpl is
= null;
2799 // Testing to see how many scanners that is produced by getScanner,
2801 // with known number, 2 - current = 1
2803 scan
.addFamily(fam2
);
2804 scan
.addFamily(fam4
);
2805 is
= region
.getScanner(scan
);
2806 assertEquals(1, is
.storeHeap
.getHeap().size());
2809 is
= region
.getScanner(scan
);
2810 assertEquals(families
.length
- 1, is
.storeHeap
.getHeap().size());
2814 * This method tests https://issues.apache.org/jira/browse/HBASE-2516.
2816 * @throws IOException
2819 public void testGetScanner_WithRegionClosed() throws IOException
{
2820 byte[] fam1
= Bytes
.toBytes("fam1");
2821 byte[] fam2
= Bytes
.toBytes("fam2");
2823 byte[][] families
= { fam1
, fam2
};
2825 // Setting up region
2827 this.region
= initHRegion(tableName
, method
, CONF
, families
);
2828 } catch (IOException e
) {
2829 e
.printStackTrace();
2830 fail("Got IOException during initHRegion, " + e
.getMessage());
2832 region
.closed
.set(true);
2834 region
.getScanner(null);
2835 fail("Expected to get an exception during getScanner on a region that is closed");
2836 } catch (NotServingRegionException e
) {
2837 // this is the correct exception that is expected
2838 } catch (IOException e
) {
2839 fail("Got wrong type of exception - should be a NotServingRegionException, " +
2840 "but was an IOException: "
2846 public void testRegionScanner_Next() throws IOException
{
2847 byte[] row1
= Bytes
.toBytes("row1");
2848 byte[] row2
= Bytes
.toBytes("row2");
2849 byte[] fam1
= Bytes
.toBytes("fam1");
2850 byte[] fam2
= Bytes
.toBytes("fam2");
2851 byte[] fam3
= Bytes
.toBytes("fam3");
2852 byte[] fam4
= Bytes
.toBytes("fam4");
2854 byte[][] families
= { fam1
, fam2
, fam3
, fam4
};
2855 long ts
= System
.currentTimeMillis();
2857 // Setting up region
2858 this.region
= initHRegion(tableName
, method
, CONF
, families
);
2859 // Putting data in Region
2861 put
= new Put(row1
);
2862 put
.addColumn(fam1
, (byte[]) null, ts
, null);
2863 put
.addColumn(fam2
, (byte[]) null, ts
, null);
2864 put
.addColumn(fam3
, (byte[]) null, ts
, null);
2865 put
.addColumn(fam4
, (byte[]) null, ts
, null);
2868 put
= new Put(row2
);
2869 put
.addColumn(fam1
, (byte[]) null, ts
, null);
2870 put
.addColumn(fam2
, (byte[]) null, ts
, null);
2871 put
.addColumn(fam3
, (byte[]) null, ts
, null);
2872 put
.addColumn(fam4
, (byte[]) null, ts
, null);
2875 Scan scan
= new Scan();
2876 scan
.addFamily(fam2
);
2877 scan
.addFamily(fam4
);
2878 InternalScanner is
= region
.getScanner(scan
);
2880 List
<Cell
> res
= null;
2883 List
<Cell
> expected1
= new ArrayList
<>();
2884 expected1
.add(new KeyValue(row1
, fam2
, null, ts
, KeyValue
.Type
.Put
, null));
2885 expected1
.add(new KeyValue(row1
, fam4
, null, ts
, KeyValue
.Type
.Put
, null));
2887 res
= new ArrayList
<>();
2889 for (int i
= 0; i
< res
.size(); i
++) {
2890 assertTrue(PrivateCellUtil
.equalsIgnoreMvccVersion(expected1
.get(i
), res
.get(i
)));
2894 List
<Cell
> expected2
= new ArrayList
<>();
2895 expected2
.add(new KeyValue(row2
, fam2
, null, ts
, KeyValue
.Type
.Put
, null));
2896 expected2
.add(new KeyValue(row2
, fam4
, null, ts
, KeyValue
.Type
.Put
, null));
2898 res
= new ArrayList
<>();
2900 for (int i
= 0; i
< res
.size(); i
++) {
2901 assertTrue(PrivateCellUtil
.equalsIgnoreMvccVersion(expected2
.get(i
), res
.get(i
)));
2906 public void testScanner_ExplicitColumns_FromMemStore_EnforceVersions() throws IOException
{
2907 byte[] row1
= Bytes
.toBytes("row1");
2908 byte[] qf1
= Bytes
.toBytes("qualifier1");
2909 byte[] qf2
= Bytes
.toBytes("qualifier2");
2910 byte[] fam1
= Bytes
.toBytes("fam1");
2911 byte[][] families
= { fam1
};
2913 long ts1
= System
.currentTimeMillis();
2917 // Setting up region
2918 this.region
= initHRegion(tableName
, method
, CONF
, families
);
2919 // Putting data in Region
2921 KeyValue kv13
= new KeyValue(row1
, fam1
, qf1
, ts3
, KeyValue
.Type
.Put
, null);
2922 KeyValue kv12
= new KeyValue(row1
, fam1
, qf1
, ts2
, KeyValue
.Type
.Put
, null);
2923 KeyValue kv11
= new KeyValue(row1
, fam1
, qf1
, ts1
, KeyValue
.Type
.Put
, null);
2925 KeyValue kv23
= new KeyValue(row1
, fam1
, qf2
, ts3
, KeyValue
.Type
.Put
, null);
2926 KeyValue kv22
= new KeyValue(row1
, fam1
, qf2
, ts2
, KeyValue
.Type
.Put
, null);
2927 KeyValue kv21
= new KeyValue(row1
, fam1
, qf2
, ts1
, KeyValue
.Type
.Put
, null);
2929 put
= new Put(row1
);
2939 List
<Cell
> expected
= new ArrayList
<>();
2943 Scan scan
= new Scan(row1
);
2944 scan
.addColumn(fam1
, qf1
);
2945 scan
.setMaxVersions(MAX_VERSIONS
);
2946 List
<Cell
> actual
= new ArrayList
<>();
2947 InternalScanner scanner
= region
.getScanner(scan
);
2949 boolean hasNext
= scanner
.next(actual
);
2950 assertEquals(false, hasNext
);
2953 for (int i
= 0; i
< expected
.size(); i
++) {
2954 assertEquals(expected
.get(i
), actual
.get(i
));
2959 public void testScanner_ExplicitColumns_FromFilesOnly_EnforceVersions() throws IOException
{
2960 byte[] row1
= Bytes
.toBytes("row1");
2961 byte[] qf1
= Bytes
.toBytes("qualifier1");
2962 byte[] qf2
= Bytes
.toBytes("qualifier2");
2963 byte[] fam1
= Bytes
.toBytes("fam1");
2964 byte[][] families
= { fam1
};
2966 long ts1
= 1; // System.currentTimeMillis();
2970 // Setting up region
2971 this.region
= initHRegion(tableName
, method
, CONF
, families
);
2972 // Putting data in Region
2974 KeyValue kv13
= new KeyValue(row1
, fam1
, qf1
, ts3
, KeyValue
.Type
.Put
, null);
2975 KeyValue kv12
= new KeyValue(row1
, fam1
, qf1
, ts2
, KeyValue
.Type
.Put
, null);
2976 KeyValue kv11
= new KeyValue(row1
, fam1
, qf1
, ts1
, KeyValue
.Type
.Put
, null);
2978 KeyValue kv23
= new KeyValue(row1
, fam1
, qf2
, ts3
, KeyValue
.Type
.Put
, null);
2979 KeyValue kv22
= new KeyValue(row1
, fam1
, qf2
, ts2
, KeyValue
.Type
.Put
, null);
2980 KeyValue kv21
= new KeyValue(row1
, fam1
, qf2
, ts1
, KeyValue
.Type
.Put
, null);
2982 put
= new Put(row1
);
2993 List
<Cell
> expected
= new ArrayList
<>();
2999 Scan scan
= new Scan(row1
);
3000 scan
.addColumn(fam1
, qf1
);
3001 scan
.addColumn(fam1
, qf2
);
3002 scan
.setMaxVersions(MAX_VERSIONS
);
3003 List
<Cell
> actual
= new ArrayList
<>();
3004 InternalScanner scanner
= region
.getScanner(scan
);
3006 boolean hasNext
= scanner
.next(actual
);
3007 assertEquals(false, hasNext
);
3010 for (int i
= 0; i
< expected
.size(); i
++) {
3011 assertTrue(PrivateCellUtil
.equalsIgnoreMvccVersion(expected
.get(i
), actual
.get(i
)));
3016 public void testScanner_ExplicitColumns_FromMemStoreAndFiles_EnforceVersions() throws
3018 byte[] row1
= Bytes
.toBytes("row1");
3019 byte[] fam1
= Bytes
.toBytes("fam1");
3020 byte[][] families
= { fam1
};
3021 byte[] qf1
= Bytes
.toBytes("qualifier1");
3022 byte[] qf2
= Bytes
.toBytes("qualifier2");
3029 // Setting up region
3030 this.region
= initHRegion(tableName
, method
, CONF
, families
);
3031 // Putting data in Region
3032 KeyValue kv14
= new KeyValue(row1
, fam1
, qf1
, ts4
, KeyValue
.Type
.Put
, null);
3033 KeyValue kv13
= new KeyValue(row1
, fam1
, qf1
, ts3
, KeyValue
.Type
.Put
, null);
3034 KeyValue kv12
= new KeyValue(row1
, fam1
, qf1
, ts2
, KeyValue
.Type
.Put
, null);
3035 KeyValue kv11
= new KeyValue(row1
, fam1
, qf1
, ts1
, KeyValue
.Type
.Put
, null);
3037 KeyValue kv24
= new KeyValue(row1
, fam1
, qf2
, ts4
, KeyValue
.Type
.Put
, null);
3038 KeyValue kv23
= new KeyValue(row1
, fam1
, qf2
, ts3
, KeyValue
.Type
.Put
, null);
3039 KeyValue kv22
= new KeyValue(row1
, fam1
, qf2
, ts2
, KeyValue
.Type
.Put
, null);
3040 KeyValue kv21
= new KeyValue(row1
, fam1
, qf2
, ts1
, KeyValue
.Type
.Put
, null);
3043 put
= new Put(row1
);
3049 put
= new Put(row1
);
3055 put
= new Put(row1
);
3061 put
= new Put(row1
);
3067 List
<Cell
> expected
= new ArrayList
<>();
3075 Scan scan
= new Scan(row1
);
3076 scan
.addColumn(fam1
, qf1
);
3077 scan
.addColumn(fam1
, qf2
);
3079 scan
.setMaxVersions(versions
);
3080 List
<Cell
> actual
= new ArrayList
<>();
3081 InternalScanner scanner
= region
.getScanner(scan
);
3083 boolean hasNext
= scanner
.next(actual
);
3084 assertEquals(false, hasNext
);
3087 for (int i
= 0; i
< expected
.size(); i
++) {
3088 assertTrue(PrivateCellUtil
.equalsIgnoreMvccVersion(expected
.get(i
), actual
.get(i
)));
3093 public void testScanner_Wildcard_FromMemStore_EnforceVersions() throws IOException
{
3094 byte[] row1
= Bytes
.toBytes("row1");
3095 byte[] qf1
= Bytes
.toBytes("qualifier1");
3096 byte[] qf2
= Bytes
.toBytes("qualifier2");
3097 byte[] fam1
= Bytes
.toBytes("fam1");
3098 byte[][] families
= { fam1
};
3100 long ts1
= System
.currentTimeMillis();
3104 // Setting up region
3105 this.region
= initHRegion(tableName
, method
, CONF
, families
);
3106 // Putting data in Region
3108 KeyValue kv13
= new KeyValue(row1
, fam1
, qf1
, ts3
, KeyValue
.Type
.Put
, null);
3109 KeyValue kv12
= new KeyValue(row1
, fam1
, qf1
, ts2
, KeyValue
.Type
.Put
, null);
3110 KeyValue kv11
= new KeyValue(row1
, fam1
, qf1
, ts1
, KeyValue
.Type
.Put
, null);
3112 KeyValue kv23
= new KeyValue(row1
, fam1
, qf2
, ts3
, KeyValue
.Type
.Put
, null);
3113 KeyValue kv22
= new KeyValue(row1
, fam1
, qf2
, ts2
, KeyValue
.Type
.Put
, null);
3114 KeyValue kv21
= new KeyValue(row1
, fam1
, qf2
, ts1
, KeyValue
.Type
.Put
, null);
3116 put
= new Put(row1
);
3126 List
<Cell
> expected
= new ArrayList
<>();
3132 Scan scan
= new Scan(row1
);
3133 scan
.addFamily(fam1
);
3134 scan
.setMaxVersions(MAX_VERSIONS
);
3135 List
<Cell
> actual
= new ArrayList
<>();
3136 InternalScanner scanner
= region
.getScanner(scan
);
3138 boolean hasNext
= scanner
.next(actual
);
3139 assertEquals(false, hasNext
);
3142 for (int i
= 0; i
< expected
.size(); i
++) {
3143 assertEquals(expected
.get(i
), actual
.get(i
));
3148 public void testScanner_Wildcard_FromFilesOnly_EnforceVersions() throws IOException
{
3149 byte[] row1
= Bytes
.toBytes("row1");
3150 byte[] qf1
= Bytes
.toBytes("qualifier1");
3151 byte[] qf2
= Bytes
.toBytes("qualifier2");
3152 byte[] fam1
= Bytes
.toBytes("fam1");
3154 long ts1
= 1; // System.currentTimeMillis();
3158 // Setting up region
3159 this.region
= initHRegion(tableName
, method
, CONF
, fam1
);
3160 // Putting data in Region
3162 KeyValue kv13
= new KeyValue(row1
, fam1
, qf1
, ts3
, KeyValue
.Type
.Put
, null);
3163 KeyValue kv12
= new KeyValue(row1
, fam1
, qf1
, ts2
, KeyValue
.Type
.Put
, null);
3164 KeyValue kv11
= new KeyValue(row1
, fam1
, qf1
, ts1
, KeyValue
.Type
.Put
, null);
3166 KeyValue kv23
= new KeyValue(row1
, fam1
, qf2
, ts3
, KeyValue
.Type
.Put
, null);
3167 KeyValue kv22
= new KeyValue(row1
, fam1
, qf2
, ts2
, KeyValue
.Type
.Put
, null);
3168 KeyValue kv21
= new KeyValue(row1
, fam1
, qf2
, ts1
, KeyValue
.Type
.Put
, null);
3170 put
= new Put(row1
);
3181 List
<Cell
> expected
= new ArrayList
<>();
3187 Scan scan
= new Scan(row1
);
3188 scan
.addFamily(fam1
);
3189 scan
.setMaxVersions(MAX_VERSIONS
);
3190 List
<Cell
> actual
= new ArrayList
<>();
3191 InternalScanner scanner
= region
.getScanner(scan
);
3193 boolean hasNext
= scanner
.next(actual
);
3194 assertEquals(false, hasNext
);
3197 for (int i
= 0; i
< expected
.size(); i
++) {
3198 assertTrue(PrivateCellUtil
.equalsIgnoreMvccVersion(expected
.get(i
), actual
.get(i
)));
3203 public void testScanner_StopRow1542() throws IOException
{
3204 byte[] family
= Bytes
.toBytes("testFamily");
3205 this.region
= initHRegion(tableName
, method
, CONF
, family
);
3206 byte[] row1
= Bytes
.toBytes("row111");
3207 byte[] row2
= Bytes
.toBytes("row222");
3208 byte[] row3
= Bytes
.toBytes("row333");
3209 byte[] row4
= Bytes
.toBytes("row444");
3210 byte[] row5
= Bytes
.toBytes("row555");
3212 byte[] col1
= Bytes
.toBytes("Pub111");
3213 byte[] col2
= Bytes
.toBytes("Pub222");
3215 Put put
= new Put(row1
);
3216 put
.addColumn(family
, col1
, Bytes
.toBytes(10L));
3219 put
= new Put(row2
);
3220 put
.addColumn(family
, col1
, Bytes
.toBytes(15L));
3223 put
= new Put(row3
);
3224 put
.addColumn(family
, col2
, Bytes
.toBytes(20L));
3227 put
= new Put(row4
);
3228 put
.addColumn(family
, col2
, Bytes
.toBytes(30L));
3231 put
= new Put(row5
);
3232 put
.addColumn(family
, col1
, Bytes
.toBytes(40L));
3235 Scan scan
= new Scan(row3
, row4
);
3236 scan
.setMaxVersions();
3237 scan
.addColumn(family
, col1
);
3238 InternalScanner s
= region
.getScanner(scan
);
3240 List
<Cell
> results
= new ArrayList
<>();
3241 assertEquals(false, s
.next(results
));
3242 assertEquals(0, results
.size());
3246 public void testScanner_Wildcard_FromMemStoreAndFiles_EnforceVersions() throws IOException
{
3247 byte[] row1
= Bytes
.toBytes("row1");
3248 byte[] fam1
= Bytes
.toBytes("fam1");
3249 byte[] qf1
= Bytes
.toBytes("qualifier1");
3250 byte[] qf2
= Bytes
.toBytes("quateslifier2");
3257 // Setting up region
3258 this.region
= initHRegion(tableName
, method
, CONF
, fam1
);
3259 // Putting data in Region
3260 KeyValue kv14
= new KeyValue(row1
, fam1
, qf1
, ts4
, KeyValue
.Type
.Put
, null);
3261 KeyValue kv13
= new KeyValue(row1
, fam1
, qf1
, ts3
, KeyValue
.Type
.Put
, null);
3262 KeyValue kv12
= new KeyValue(row1
, fam1
, qf1
, ts2
, KeyValue
.Type
.Put
, null);
3263 KeyValue kv11
= new KeyValue(row1
, fam1
, qf1
, ts1
, KeyValue
.Type
.Put
, null);
3265 KeyValue kv24
= new KeyValue(row1
, fam1
, qf2
, ts4
, KeyValue
.Type
.Put
, null);
3266 KeyValue kv23
= new KeyValue(row1
, fam1
, qf2
, ts3
, KeyValue
.Type
.Put
, null);
3267 KeyValue kv22
= new KeyValue(row1
, fam1
, qf2
, ts2
, KeyValue
.Type
.Put
, null);
3268 KeyValue kv21
= new KeyValue(row1
, fam1
, qf2
, ts1
, KeyValue
.Type
.Put
, null);
3271 put
= new Put(row1
);
3277 put
= new Put(row1
);
3283 put
= new Put(row1
);
3289 put
= new Put(row1
);
3295 List
<KeyValue
> expected
= new ArrayList
<>();
3303 Scan scan
= new Scan(row1
);
3305 scan
.setMaxVersions(versions
);
3306 List
<Cell
> actual
= new ArrayList
<>();
3307 InternalScanner scanner
= region
.getScanner(scan
);
3309 boolean hasNext
= scanner
.next(actual
);
3310 assertEquals(false, hasNext
);
3313 for (int i
= 0; i
< expected
.size(); i
++) {
3314 assertTrue(PrivateCellUtil
.equalsIgnoreMvccVersion(expected
.get(i
), actual
.get(i
)));
3319 * Added for HBASE-5416
3321 * Here we test scan optimization when only subset of CFs are used in filter
3325 public void testScanner_JoinedScanners() throws IOException
{
3326 byte[] cf_essential
= Bytes
.toBytes("essential");
3327 byte[] cf_joined
= Bytes
.toBytes("joined");
3328 byte[] cf_alpha
= Bytes
.toBytes("alpha");
3329 this.region
= initHRegion(tableName
, method
, CONF
, cf_essential
, cf_joined
, cf_alpha
);
3330 byte[] row1
= Bytes
.toBytes("row1");
3331 byte[] row2
= Bytes
.toBytes("row2");
3332 byte[] row3
= Bytes
.toBytes("row3");
3334 byte[] col_normal
= Bytes
.toBytes("d");
3335 byte[] col_alpha
= Bytes
.toBytes("a");
3337 byte[] filtered_val
= Bytes
.toBytes(3);
3339 Put put
= new Put(row1
);
3340 put
.addColumn(cf_essential
, col_normal
, Bytes
.toBytes(1));
3341 put
.addColumn(cf_joined
, col_alpha
, Bytes
.toBytes(1));
3344 put
= new Put(row2
);
3345 put
.addColumn(cf_essential
, col_alpha
, Bytes
.toBytes(2));
3346 put
.addColumn(cf_joined
, col_normal
, Bytes
.toBytes(2));
3347 put
.addColumn(cf_alpha
, col_alpha
, Bytes
.toBytes(2));
3350 put
= new Put(row3
);
3351 put
.addColumn(cf_essential
, col_normal
, filtered_val
);
3352 put
.addColumn(cf_joined
, col_normal
, filtered_val
);
3355 // Check two things:
3356 // 1. result list contains expected values
3357 // 2. result list is sorted properly
3359 Scan scan
= new Scan();
3360 Filter filter
= new SingleColumnValueExcludeFilter(cf_essential
, col_normal
,
3361 CompareOperator
.NOT_EQUAL
, filtered_val
);
3362 scan
.setFilter(filter
);
3363 scan
.setLoadColumnFamiliesOnDemand(true);
3364 InternalScanner s
= region
.getScanner(scan
);
3366 List
<Cell
> results
= new ArrayList
<>();
3367 assertTrue(s
.next(results
));
3368 assertEquals(1, results
.size());
3371 assertTrue(s
.next(results
));
3372 assertEquals(3, results
.size());
3373 assertTrue("orderCheck", CellUtil
.matchingFamily(results
.get(0), cf_alpha
));
3374 assertTrue("orderCheck", CellUtil
.matchingFamily(results
.get(1), cf_essential
));
3375 assertTrue("orderCheck", CellUtil
.matchingFamily(results
.get(2), cf_joined
));
3378 assertFalse(s
.next(results
));
3379 assertEquals(0, results
.size());
3385 * Test case when scan limits amount of KVs returned on each next() call.
3388 public void testScanner_JoinedScannersWithLimits() throws IOException
{
3389 final byte[] cf_first
= Bytes
.toBytes("first");
3390 final byte[] cf_second
= Bytes
.toBytes("second");
3392 this.region
= initHRegion(tableName
, method
, CONF
, cf_first
, cf_second
);
3393 final byte[] col_a
= Bytes
.toBytes("a");
3394 final byte[] col_b
= Bytes
.toBytes("b");
3398 for (int i
= 0; i
< 10; i
++) {
3399 put
= new Put(Bytes
.toBytes("r" + Integer
.toString(i
)));
3400 put
.addColumn(cf_first
, col_a
, Bytes
.toBytes(i
));
3402 put
.addColumn(cf_first
, col_b
, Bytes
.toBytes(i
));
3403 put
.addColumn(cf_second
, col_a
, Bytes
.toBytes(i
));
3404 put
.addColumn(cf_second
, col_b
, Bytes
.toBytes(i
));
3409 Scan scan
= new Scan();
3410 scan
.setLoadColumnFamiliesOnDemand(true);
3411 Filter bogusFilter
= new FilterBase() {
3413 public ReturnCode
filterCell(final Cell ignored
) throws IOException
{
3414 return ReturnCode
.INCLUDE
;
3417 public boolean isFamilyEssential(byte[] name
) {
3418 return Bytes
.equals(name
, cf_first
);
3422 scan
.setFilter(bogusFilter
);
3423 InternalScanner s
= region
.getScanner(scan
);
3425 // Our data looks like this:
3426 // r0: first:a, first:b, second:a, second:b
3427 // r1: first:a, first:b, second:a, second:b
3428 // r2: first:a, first:b, second:a, second:b
3429 // r3: first:a, first:b, second:a, second:b
3430 // r4: first:a, first:b, second:a, second:b
3437 // But due to next's limit set to 3, we should get this:
3438 // r0: first:a, first:b, second:a
3440 // r1: first:a, first:b, second:a
3442 // r2: first:a, first:b, second:a
3444 // r3: first:a, first:b, second:a
3446 // r4: first:a, first:b, second:a
3454 List
<Cell
> results
= new ArrayList
<>();
3456 ScannerContext scannerContext
= ScannerContext
.newBuilder().setBatchLimit(3).build();
3458 boolean more
= s
.next(results
, scannerContext
);
3459 if ((index
>> 1) < 5) {
3460 if (index
% 2 == 0) {
3461 assertEquals(3, results
.size());
3463 assertEquals(1, results
.size());
3466 assertEquals(1, results
.size());
3477 * Write an HFile block full with Cells whose qualifier that are identical between
3478 * 0 and Short.MAX_VALUE. See HBASE-13329.
3482 public void testLongQualifier() throws Exception
{
3483 byte[] family
= Bytes
.toBytes("family");
3484 this.region
= initHRegion(tableName
, method
, CONF
, family
);
3485 byte[] q
= new byte[Short
.MAX_VALUE
+2];
3486 Arrays
.fill(q
, 0, q
.length
-1, (byte)42);
3487 for (byte i
=0; i
<10; i
++) {
3488 Put p
= new Put(Bytes
.toBytes("row"));
3489 // qualifiers that differ past Short.MAX_VALUE
3491 p
.addColumn(family
, q
, q
);
3494 region
.flush(false);
3498 * Flushes the cache in a thread while scanning. The tests verify that the
3499 * scan is coherent - e.g. the returned results are always of the same or
3500 * later update as the previous results.
3502 * @throws IOException
3504 * @throws InterruptedException
3508 public void testFlushCacheWhileScanning() throws IOException
, InterruptedException
{
3509 byte[] family
= Bytes
.toBytes("family");
3511 int flushAndScanInterval
= 10;
3512 int compactInterval
= 10 * flushAndScanInterval
;
3514 this.region
= initHRegion(tableName
, method
, CONF
, family
);
3515 FlushThread flushThread
= new FlushThread();
3517 flushThread
.start();
3519 Scan scan
= new Scan();
3520 scan
.addFamily(family
);
3521 scan
.setFilter(new SingleColumnValueFilter(family
, qual1
, CompareOperator
.EQUAL
,
3522 new BinaryComparator(Bytes
.toBytes(5L))));
3524 int expectedCount
= 0;
3525 List
<Cell
> res
= new ArrayList
<>();
3527 boolean toggle
= true;
3528 for (long i
= 0; i
< numRows
; i
++) {
3529 Put put
= new Put(Bytes
.toBytes(i
));
3530 put
.setDurability(Durability
.SKIP_WAL
);
3531 put
.addColumn(family
, qual1
, Bytes
.toBytes(i
% 10));
3534 if (i
!= 0 && i
% compactInterval
== 0) {
3535 LOG
.debug("iteration = " + i
+ " ts="+System
.currentTimeMillis());
3536 region
.compact(true);
3543 if (i
!= 0 && i
% flushAndScanInterval
== 0) {
3545 InternalScanner scanner
= region
.getScanner(scan
);
3547 flushThread
.flush();
3549 while (scanner
.next(res
))
3552 flushThread
.flush();
3554 assertEquals("toggle="+toggle
+"i=" + i
+ " ts="+System
.currentTimeMillis(),
3555 expectedCount
, res
.size());
3564 flushThread
.checkNoError();
3565 } catch (InterruptedException ie
) {
3566 LOG
.warn("Caught exception when joining with flushThread", ie
);
3568 HBaseTestingUtility
.closeRegionAndWAL(this.region
);
3573 protected class FlushThread
extends Thread
{
3574 private volatile boolean done
;
3575 private Throwable error
= null;
3578 super("FlushThread");
3581 public void done() {
3583 synchronized (this) {
3588 public void checkNoError() {
3589 if (error
!= null) {
3598 synchronized (this) {
3601 } catch (InterruptedException ignored
) {
3609 } catch (IOException e
) {
3611 LOG
.error("Error while flushing cache", e
);
3615 } catch (Throwable t
) {
3616 LOG
.error("Uncaught exception", t
);
3622 public void flush() {
3623 synchronized (this) {
3630 * Writes very wide records and scans for the latest every time.. Flushes and
3631 * compacts the region every now and then to keep things realistic.
3633 * @throws IOException
3634 * by flush / scan / compaction
3635 * @throws InterruptedException
3636 * when joining threads
3639 public void testWritesWhileScanning() throws IOException
, InterruptedException
{
3640 int testCount
= 100;
3642 int numFamilies
= 10;
3643 int numQualifiers
= 100;
3644 int flushInterval
= 7;
3645 int compactInterval
= 5 * flushInterval
;
3646 byte[][] families
= new byte[numFamilies
][];
3647 for (int i
= 0; i
< numFamilies
; i
++) {
3648 families
[i
] = Bytes
.toBytes("family" + i
);
3650 byte[][] qualifiers
= new byte[numQualifiers
][];
3651 for (int i
= 0; i
< numQualifiers
; i
++) {
3652 qualifiers
[i
] = Bytes
.toBytes("qual" + i
);
3655 this.region
= initHRegion(tableName
, method
, CONF
, families
);
3656 FlushThread flushThread
= new FlushThread();
3657 PutThread putThread
= new PutThread(numRows
, families
, qualifiers
);
3660 putThread
.waitForFirstPut();
3662 flushThread
.start();
3664 Scan scan
= new Scan(Bytes
.toBytes("row0"), Bytes
.toBytes("row1"));
3666 int expectedCount
= numFamilies
* numQualifiers
;
3667 List
<Cell
> res
= new ArrayList
<>();
3669 long prevTimestamp
= 0L;
3670 for (int i
= 0; i
< testCount
; i
++) {
3672 if (i
!= 0 && i
% compactInterval
== 0) {
3673 region
.compact(true);
3674 for (HStore store
: region
.getStores()) {
3675 store
.closeAndArchiveCompactedFiles();
3679 if (i
!= 0 && i
% flushInterval
== 0) {
3680 flushThread
.flush();
3683 boolean previousEmpty
= res
.isEmpty();
3685 try (InternalScanner scanner
= region
.getScanner(scan
)) {
3688 moreRows
= scanner
.next(res
);
3691 if (!res
.isEmpty() || !previousEmpty
|| i
> compactInterval
) {
3692 assertEquals("i=" + i
, expectedCount
, res
.size());
3693 long timestamp
= res
.get(0).getTimestamp();
3694 assertTrue("Timestamps were broke: " + timestamp
+ " prev: " + prevTimestamp
,
3695 timestamp
>= prevTimestamp
);
3696 prevTimestamp
= timestamp
;
3708 flushThread
.checkNoError();
3711 putThread
.checkNoError();
3712 } catch (InterruptedException ie
) {
3713 LOG
.warn("Caught exception when joining with flushThread", ie
);
3717 HBaseTestingUtility
.closeRegionAndWAL(this.region
);
3718 } catch (DroppedSnapshotException dse
) {
3719 // We could get this on way out because we interrupt the background flusher and it could
3720 // fail anywhere causing a DSE over in the background flusher... only it is not properly
3721 // dealt with so could still be memory hanging out when we get to here -- memory we can't
3722 // flush because the accounting is 'off' since original DSE.
3728 protected class PutThread
extends Thread
{
3729 private volatile boolean done
;
3730 private volatile int numPutsFinished
= 0;
3732 private Throwable error
= null;
3733 private int numRows
;
3734 private byte[][] families
;
3735 private byte[][] qualifiers
;
3737 private PutThread(int numRows
, byte[][] families
, byte[][] qualifiers
) {
3739 this.numRows
= numRows
;
3740 this.families
= families
;
3741 this.qualifiers
= qualifiers
;
3745 * Block calling thread until this instance of PutThread has put at least one row.
3747 public void waitForFirstPut() throws InterruptedException
{
3748 // wait until put thread actually puts some data
3749 while (isAlive() && numPutsFinished
== 0) {
3755 public void done() {
3757 synchronized (this) {
3762 public void checkNoError() {
3763 if (error
!= null) {
3773 for (int r
= 0; r
< numRows
; r
++) {
3774 byte[] row
= Bytes
.toBytes("row" + r
);
3775 Put put
= new Put(row
);
3776 put
.setDurability(Durability
.SKIP_WAL
);
3777 byte[] value
= Bytes
.toBytes(String
.valueOf(numPutsFinished
));
3778 for (byte[] family
: families
) {
3779 for (byte[] qualifier
: qualifiers
) {
3780 put
.addColumn(family
, qualifier
, numPutsFinished
, value
);
3785 if (numPutsFinished
> 0 && numPutsFinished
% 47 == 0) {
3786 LOG
.debug("put iteration = {}", numPutsFinished
);
3787 Delete delete
= new Delete(row
, (long) numPutsFinished
- 30);
3788 region
.delete(delete
);
3792 } catch (InterruptedIOException e
) {
3793 // This is fine. It means we are done, or didn't get the lock on time
3794 LOG
.info("Interrupted", e
);
3795 } catch (IOException e
) {
3796 LOG
.error("Error while putting records", e
);
3807 * Writes very wide records and gets the latest row every time.. Flushes and
3808 * compacts the region aggressivly to catch issues.
3810 * @throws IOException
3811 * by flush / scan / compaction
3812 * @throws InterruptedException
3813 * when joining threads
3816 public void testWritesWhileGetting() throws Exception
{
3819 int numFamilies
= 10;
3820 int numQualifiers
= 100;
3821 int compactInterval
= 100;
3822 byte[][] families
= new byte[numFamilies
][];
3823 for (int i
= 0; i
< numFamilies
; i
++) {
3824 families
[i
] = Bytes
.toBytes("family" + i
);
3826 byte[][] qualifiers
= new byte[numQualifiers
][];
3827 for (int i
= 0; i
< numQualifiers
; i
++) {
3828 qualifiers
[i
] = Bytes
.toBytes("qual" + i
);
3832 // This test flushes constantly and can cause many files to be created,
3834 // extending over the ulimit. Make sure compactions are aggressive in
3836 // the number of HFiles created.
3837 Configuration conf
= HBaseConfiguration
.create(CONF
);
3838 conf
.setInt("hbase.hstore.compaction.min", 1);
3839 conf
.setInt("hbase.hstore.compaction.max", 1000);
3840 this.region
= initHRegion(tableName
, method
, conf
, families
);
3841 PutThread putThread
= null;
3842 MultithreadedTestUtil
.TestContext ctx
= new MultithreadedTestUtil
.TestContext(conf
);
3844 putThread
= new PutThread(numRows
, families
, qualifiers
);
3846 putThread
.waitForFirstPut();
3848 // Add a thread that flushes as fast as possible
3849 ctx
.addThread(new RepeatingTestThread(ctx
) {
3852 public void doAnAction() throws Exception
{
3854 // Compact regularly to avoid creating too many files and exceeding
3856 region
.compact(false);
3857 for (HStore store
: region
.getStores()) {
3858 store
.closeAndArchiveCompactedFiles();
3864 Get get
= new Get(Bytes
.toBytes("row0"));
3865 Result result
= null;
3867 int expectedCount
= numFamilies
* numQualifiers
;
3869 long prevTimestamp
= 0L;
3870 for (int i
= 0; i
< testCount
; i
++) {
3871 LOG
.info("testWritesWhileGetting verify turn " + i
);
3872 boolean previousEmpty
= result
== null || result
.isEmpty();
3873 result
= region
.get(get
);
3874 if (!result
.isEmpty() || !previousEmpty
|| i
> compactInterval
) {
3875 assertEquals("i=" + i
, expectedCount
, result
.size());
3876 // TODO this was removed, now what dangit?!
3877 // search looking for the qualifier in question?
3879 for (Cell kv
: result
.rawCells()) {
3880 if (CellUtil
.matchingFamily(kv
, families
[0])
3881 && CellUtil
.matchingQualifier(kv
, qualifiers
[0])) {
3882 timestamp
= kv
.getTimestamp();
3885 assertTrue(timestamp
>= prevTimestamp
);
3886 prevTimestamp
= timestamp
;
3887 Cell previousKV
= null;
3889 for (Cell kv
: result
.rawCells()) {
3890 byte[] thisValue
= CellUtil
.cloneValue(kv
);
3891 if (previousKV
!= null) {
3892 if (Bytes
.compareTo(CellUtil
.cloneValue(previousKV
), thisValue
) != 0) {
3893 LOG
.warn("These two KV should have the same value." + " Previous KV:" + previousKV
3894 + "(memStoreTS:" + previousKV
.getSequenceId() + ")" + ", New KV: " + kv
3895 + "(memStoreTS:" + kv
.getSequenceId() + ")");
3896 assertEquals(0, Bytes
.compareTo(CellUtil
.cloneValue(previousKV
), thisValue
));
3904 if (putThread
!= null)
3909 if (putThread
!= null) {
3911 putThread
.checkNoError();
3915 HBaseTestingUtility
.closeRegionAndWAL(this.region
);
3921 public void testHolesInMeta() throws Exception
{
3922 byte[] family
= Bytes
.toBytes("family");
3923 this.region
= initHRegion(tableName
, Bytes
.toBytes("x"), Bytes
.toBytes("z"), method
, CONF
,
3925 byte[] rowNotServed
= Bytes
.toBytes("a");
3926 Get g
= new Get(rowNotServed
);
3930 } catch (WrongRegionException x
) {
3933 byte[] row
= Bytes
.toBytes("y");
3939 public void testIndexesScanWithOneDeletedRow() throws IOException
{
3940 byte[] family
= Bytes
.toBytes("family");
3942 // Setting up region
3943 this.region
= initHRegion(tableName
, method
, CONF
, family
);
3944 Put put
= new Put(Bytes
.toBytes(1L));
3945 put
.addColumn(family
, qual1
, 1L, Bytes
.toBytes(1L));
3950 Delete delete
= new Delete(Bytes
.toBytes(1L), 1L);
3951 region
.delete(delete
);
3953 put
= new Put(Bytes
.toBytes(2L));
3954 put
.addColumn(family
, qual1
, 2L, Bytes
.toBytes(2L));
3957 Scan idxScan
= new Scan();
3958 idxScan
.addFamily(family
);
3959 idxScan
.setFilter(new FilterList(FilterList
.Operator
.MUST_PASS_ALL
, Arrays
.<Filter
> asList(
3960 new SingleColumnValueFilter(family
, qual1
, CompareOperator
.GREATER_OR_EQUAL
,
3961 new BinaryComparator(Bytes
.toBytes(0L))), new SingleColumnValueFilter(family
, qual1
,
3962 CompareOperator
.LESS_OR_EQUAL
, new BinaryComparator(Bytes
.toBytes(3L))))));
3963 InternalScanner scanner
= region
.getScanner(idxScan
);
3964 List
<Cell
> res
= new ArrayList
<>();
3966 while (scanner
.next(res
)) {
3967 // Ignore res value.
3969 assertEquals(1L, res
.size());
3972 // ////////////////////////////////////////////////////////////////////////////
3973 // Bloom filter test
3974 // ////////////////////////////////////////////////////////////////////////////
3976 public void testBloomFilterSize() throws IOException
{
3977 byte[] fam1
= Bytes
.toBytes("fam1");
3978 byte[] qf1
= Bytes
.toBytes("col");
3979 byte[] val1
= Bytes
.toBytes("value1");
3981 HColumnDescriptor hcd
= new HColumnDescriptor(fam1
).setMaxVersions(Integer
.MAX_VALUE
)
3982 .setBloomFilterType(BloomType
.ROWCOL
);
3984 HTableDescriptor htd
= new HTableDescriptor(tableName
);
3986 HRegionInfo info
= new HRegionInfo(htd
.getTableName(), null, null, false);
3987 this.region
= TEST_UTIL
.createLocalHRegion(info
, htd
);
3988 int num_unique_rows
= 10;
3989 int duplicate_multiplier
= 2;
3990 int num_storefiles
= 4;
3993 for (int f
= 0; f
< num_storefiles
; f
++) {
3994 for (int i
= 0; i
< duplicate_multiplier
; i
++) {
3995 for (int j
= 0; j
< num_unique_rows
; j
++) {
3996 Put put
= new Put(Bytes
.toBytes("row" + j
));
3997 put
.setDurability(Durability
.SKIP_WAL
);
3998 long ts
= version
++;
3999 put
.addColumn(fam1
, qf1
, ts
, val1
);
4005 // before compaction
4006 HStore store
= region
.getStore(fam1
);
4007 Collection
<HStoreFile
> storeFiles
= store
.getStorefiles();
4008 for (HStoreFile storefile
: storeFiles
) {
4009 StoreFileReader reader
= storefile
.getReader();
4010 reader
.loadFileInfo();
4011 reader
.loadBloomfilter();
4012 assertEquals(num_unique_rows
* duplicate_multiplier
, reader
.getEntries());
4013 assertEquals(num_unique_rows
, reader
.getFilterEntries());
4016 region
.compact(true);
4019 storeFiles
= store
.getStorefiles();
4020 for (HStoreFile storefile
: storeFiles
) {
4021 StoreFileReader reader
= storefile
.getReader();
4022 reader
.loadFileInfo();
4023 reader
.loadBloomfilter();
4024 assertEquals(num_unique_rows
* duplicate_multiplier
* num_storefiles
, reader
.getEntries());
4025 assertEquals(num_unique_rows
, reader
.getFilterEntries());
4030 public void testAllColumnsWithBloomFilter() throws IOException
{
4031 byte[] TABLE
= Bytes
.toBytes(name
.getMethodName());
4032 byte[] FAMILY
= Bytes
.toBytes("family");
4035 HColumnDescriptor hcd
= new HColumnDescriptor(FAMILY
).setMaxVersions(Integer
.MAX_VALUE
)
4036 .setBloomFilterType(BloomType
.ROWCOL
);
4037 HTableDescriptor htd
= new HTableDescriptor(TableName
.valueOf(TABLE
));
4039 HRegionInfo info
= new HRegionInfo(htd
.getTableName(), null, null, false);
4040 this.region
= TEST_UTIL
.createLocalHRegion(info
, htd
);
4041 // For row:0, col:0: insert versions 1 through 5.
4042 byte[] row
= Bytes
.toBytes("row:" + 0);
4043 byte[] column
= Bytes
.toBytes("column:" + 0);
4044 Put put
= new Put(row
);
4045 put
.setDurability(Durability
.SKIP_WAL
);
4046 for (long idx
= 1; idx
<= 4; idx
++) {
4047 put
.addColumn(FAMILY
, column
, idx
, Bytes
.toBytes("value-version-" + idx
));
4055 Get get
= new Get(row
);
4056 get
.readAllVersions();
4057 Cell
[] kvs
= region
.get(get
).rawCells();
4059 // Check if rows are correct
4060 assertEquals(4, kvs
.length
);
4061 checkOneCell(kvs
[0], FAMILY
, 0, 0, 4);
4062 checkOneCell(kvs
[1], FAMILY
, 0, 0, 3);
4063 checkOneCell(kvs
[2], FAMILY
, 0, 0, 2);
4064 checkOneCell(kvs
[3], FAMILY
, 0, 0, 1);
4068 * Testcase to cover bug-fix for HBASE-2823 Ensures correct delete when
4069 * issuing delete row on columns with bloom filter set to row+col
4070 * (BloomType.ROWCOL)
4073 public void testDeleteRowWithBloomFilter() throws IOException
{
4074 byte[] familyName
= Bytes
.toBytes("familyName");
4077 HColumnDescriptor hcd
= new HColumnDescriptor(familyName
).setMaxVersions(Integer
.MAX_VALUE
)
4078 .setBloomFilterType(BloomType
.ROWCOL
);
4080 HTableDescriptor htd
= new HTableDescriptor(tableName
);
4082 HRegionInfo info
= new HRegionInfo(htd
.getTableName(), null, null, false);
4083 this.region
= TEST_UTIL
.createLocalHRegion(info
, htd
);
4085 byte[] row
= Bytes
.toBytes("row1");
4086 byte[] col
= Bytes
.toBytes("col1");
4088 Put put
= new Put(row
);
4089 put
.addColumn(familyName
, col
, 1, Bytes
.toBytes("SomeRandomValue"));
4093 Delete del
= new Delete(row
);
4097 // Get remaining rows (should have none)
4098 Get get
= new Get(row
);
4099 get
.addColumn(familyName
, col
);
4101 Cell
[] keyValues
= region
.get(get
).rawCells();
4102 assertEquals(0, keyValues
.length
);
4106 public void testgetHDFSBlocksDistribution() throws Exception
{
4107 HBaseTestingUtility htu
= new HBaseTestingUtility();
4108 // Why do we set the block size in this test? If we set it smaller than the kvs, then we'll
4109 // break up the file in to more pieces that can be distributed across the three nodes and we
4110 // won't be able to have the condition this test asserts; that at least one node has
4111 // a copy of all replicas -- if small block size, then blocks are spread evenly across the
4112 // the three nodes. hfilev3 with tags seems to put us over the block size. St.Ack.
4113 // final int DEFAULT_BLOCK_SIZE = 1024;
4114 // htu.getConfiguration().setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE);
4115 htu
.getConfiguration().setInt("dfs.replication", 2);
4117 // set up a cluster with 3 nodes
4118 MiniHBaseCluster cluster
= null;
4119 String dataNodeHosts
[] = new String
[] { "host1", "host2", "host3" };
4120 int regionServersCount
= 3;
4123 StartMiniClusterOption option
= StartMiniClusterOption
.builder()
4124 .numRegionServers(regionServersCount
).dataNodeHosts(dataNodeHosts
).build();
4125 cluster
= htu
.startMiniCluster(option
);
4126 byte[][] families
= { fam1
, fam2
};
4127 Table ht
= htu
.createTable(tableName
, families
);
4129 // Setting up region
4130 byte row
[] = Bytes
.toBytes("row1");
4131 byte col
[] = Bytes
.toBytes("col1");
4133 Put put
= new Put(row
);
4134 put
.addColumn(fam1
, col
, 1, Bytes
.toBytes("test1"));
4135 put
.addColumn(fam2
, col
, 1, Bytes
.toBytes("test2"));
4138 HRegion firstRegion
= htu
.getHBaseCluster().getRegions(tableName
).get(0);
4139 firstRegion
.flush(true);
4140 HDFSBlocksDistribution blocksDistribution1
= firstRegion
.getHDFSBlocksDistribution();
4142 // Given the default replication factor is 2 and we have 2 HFiles,
4143 // we will have total of 4 replica of blocks on 3 datanodes; thus there
4144 // must be at least one host that have replica for 2 HFiles. That host's
4145 // weight will be equal to the unique block weight.
4146 long uniqueBlocksWeight1
= blocksDistribution1
.getUniqueBlocksTotalWeight();
4147 StringBuilder sb
= new StringBuilder();
4148 for (String host
: blocksDistribution1
.getTopHosts()) {
4149 if (sb
.length() > 0) sb
.append(", ");
4152 sb
.append(blocksDistribution1
.getWeight(host
));
4155 String topHost
= blocksDistribution1
.getTopHosts().get(0);
4156 long topHostWeight
= blocksDistribution1
.getWeight(topHost
);
4157 String msg
= "uniqueBlocksWeight=" + uniqueBlocksWeight1
+ ", topHostWeight=" +
4158 topHostWeight
+ ", topHost=" + topHost
+ "; " + sb
.toString();
4160 assertTrue(msg
, uniqueBlocksWeight1
== topHostWeight
);
4162 // use the static method to compute the value, it should be the same.
4163 // static method is used by load balancer or other components
4164 HDFSBlocksDistribution blocksDistribution2
= HRegion
.computeHDFSBlocksDistribution(
4165 htu
.getConfiguration(), firstRegion
.getTableDescriptor(), firstRegion
.getRegionInfo());
4166 long uniqueBlocksWeight2
= blocksDistribution2
.getUniqueBlocksTotalWeight();
4168 assertTrue(uniqueBlocksWeight1
== uniqueBlocksWeight2
);
4172 if (cluster
!= null) {
4173 htu
.shutdownMiniCluster();
4179 * Testcase to check state of region initialization task set to ABORTED or not
4180 * if any exceptions during initialization
4185 public void testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization() throws Exception
{
4188 FileSystem fs
= Mockito
.mock(FileSystem
.class);
4189 Mockito
.when(fs
.exists((Path
) Mockito
.anyObject())).thenThrow(new IOException());
4190 HTableDescriptor htd
= new HTableDescriptor(tableName
);
4191 htd
.addFamily(new HColumnDescriptor("cf"));
4192 info
= new HRegionInfo(htd
.getTableName(), HConstants
.EMPTY_BYTE_ARRAY
,
4193 HConstants
.EMPTY_BYTE_ARRAY
, false);
4194 Path path
= new Path(dir
+ "testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization");
4195 region
= HRegion
.newHRegion(path
, null, fs
, CONF
, info
, htd
, null);
4196 // region initialization throws IOException and set task state to ABORTED.
4197 region
.initialize();
4198 fail("Region initialization should fail due to IOException");
4199 } catch (IOException io
) {
4200 List
<MonitoredTask
> tasks
= TaskMonitor
.get().getTasks();
4201 for (MonitoredTask monitoredTask
: tasks
) {
4202 if (!(monitoredTask
instanceof MonitoredRPCHandler
)
4203 && monitoredTask
.getDescription().contains(region
.toString())) {
4204 assertTrue("Region state should be ABORTED.",
4205 monitoredTask
.getState().equals(MonitoredTask
.State
.ABORTED
));
4213 * Verifies that the .regioninfo file is written on region creation and that
4214 * is recreated if missing during region opening.
4217 public void testRegionInfoFileCreation() throws IOException
{
4218 Path rootDir
= new Path(dir
+ "testRegionInfoFileCreation");
4220 HTableDescriptor htd
= new HTableDescriptor(TableName
.valueOf(name
.getMethodName()));
4221 htd
.addFamily(new HColumnDescriptor("cf"));
4223 HRegionInfo hri
= new HRegionInfo(htd
.getTableName());
4225 // Create a region and skip the initialization (like CreateTableHandler)
4226 region
= HBaseTestingUtility
.createRegionAndWAL(hri
, rootDir
, CONF
, htd
, false);
4227 Path regionDir
= region
.getRegionFileSystem().getRegionDir();
4228 FileSystem fs
= region
.getRegionFileSystem().getFileSystem();
4229 HBaseTestingUtility
.closeRegionAndWAL(region
);
4231 Path regionInfoFile
= new Path(regionDir
, HRegionFileSystem
.REGION_INFO_FILE
);
4233 // Verify that the .regioninfo file is present
4234 assertTrue(HRegionFileSystem
.REGION_INFO_FILE
+ " should be present in the region dir",
4235 fs
.exists(regionInfoFile
));
4237 // Try to open the region
4238 region
= HRegion
.openHRegion(rootDir
, hri
, htd
, null, CONF
);
4239 assertEquals(regionDir
, region
.getRegionFileSystem().getRegionDir());
4240 HBaseTestingUtility
.closeRegionAndWAL(region
);
4242 // Verify that the .regioninfo file is still there
4243 assertTrue(HRegionFileSystem
.REGION_INFO_FILE
+ " should be present in the region dir",
4244 fs
.exists(regionInfoFile
));
4246 // Remove the .regioninfo file and verify is recreated on region open
4247 fs
.delete(regionInfoFile
, true);
4248 assertFalse(HRegionFileSystem
.REGION_INFO_FILE
+ " should be removed from the region dir",
4249 fs
.exists(regionInfoFile
));
4251 region
= HRegion
.openHRegion(rootDir
, hri
, htd
, null, CONF
);
4252 // region = TEST_UTIL.openHRegion(hri, htd);
4253 assertEquals(regionDir
, region
.getRegionFileSystem().getRegionDir());
4254 HBaseTestingUtility
.closeRegionAndWAL(region
);
4256 // Verify that the .regioninfo file is still there
4257 assertTrue(HRegionFileSystem
.REGION_INFO_FILE
+ " should be present in the region dir",
4258 fs
.exists(new Path(regionDir
, HRegionFileSystem
.REGION_INFO_FILE
)));
4264 * TestCase for increment
4266 private static class Incrementer
implements Runnable
{
4267 private HRegion region
;
4268 private final static byte[] incRow
= Bytes
.toBytes("incRow");
4269 private final static byte[] family
= Bytes
.toBytes("family");
4270 private final static byte[] qualifier
= Bytes
.toBytes("qualifier");
4271 private final static long ONE
= 1L;
4272 private int incCounter
;
4274 public Incrementer(HRegion region
, int incCounter
) {
4275 this.region
= region
;
4276 this.incCounter
= incCounter
;
4282 while (count
< incCounter
) {
4283 Increment inc
= new Increment(incRow
);
4284 inc
.addColumn(family
, qualifier
, ONE
);
4287 region
.increment(inc
);
4288 } catch (IOException e
) {
4289 LOG
.info("Count=" + count
+ ", " + e
);
4297 * Test case to check increment function with memstore flushing
4301 public void testParallelIncrementWithMemStoreFlush() throws Exception
{
4302 byte[] family
= Incrementer
.family
;
4303 this.region
= initHRegion(tableName
, method
, CONF
, family
);
4304 final HRegion region
= this.region
;
4305 final AtomicBoolean incrementDone
= new AtomicBoolean(false);
4306 Runnable flusher
= new Runnable() {
4309 while (!incrementDone
.get()) {
4312 } catch (Exception e
) {
4313 e
.printStackTrace();
4319 // after all increment finished, the row will increment to 20*100 = 2000
4321 int incCounter
= 100;
4322 long expected
= (long) threadNum
* incCounter
;
4323 Thread
[] incrementers
= new Thread
[threadNum
];
4324 Thread flushThread
= new Thread(flusher
);
4325 for (int i
= 0; i
< threadNum
; i
++) {
4326 incrementers
[i
] = new Thread(new Incrementer(this.region
, incCounter
));
4327 incrementers
[i
].start();
4329 flushThread
.start();
4330 for (int i
= 0; i
< threadNum
; i
++) {
4331 incrementers
[i
].join();
4334 incrementDone
.set(true);
4337 Get get
= new Get(Incrementer
.incRow
);
4338 get
.addColumn(Incrementer
.family
, Incrementer
.qualifier
);
4339 get
.readVersions(1);
4340 Result res
= this.region
.get(get
);
4341 List
<Cell
> kvs
= res
.getColumnCells(Incrementer
.family
, Incrementer
.qualifier
);
4343 // we just got the latest version
4344 assertEquals(1, kvs
.size());
4345 Cell kv
= kvs
.get(0);
4346 assertEquals(expected
, Bytes
.toLong(kv
.getValueArray(), kv
.getValueOffset()));
4350 * TestCase for append
4352 private static class Appender
implements Runnable
{
4353 private HRegion region
;
4354 private final static byte[] appendRow
= Bytes
.toBytes("appendRow");
4355 private final static byte[] family
= Bytes
.toBytes("family");
4356 private final static byte[] qualifier
= Bytes
.toBytes("qualifier");
4357 private final static byte[] CHAR
= Bytes
.toBytes("a");
4358 private int appendCounter
;
4360 public Appender(HRegion region
, int appendCounter
) {
4361 this.region
= region
;
4362 this.appendCounter
= appendCounter
;
4368 while (count
< appendCounter
) {
4369 Append app
= new Append(appendRow
);
4370 app
.addColumn(family
, qualifier
, CHAR
);
4374 } catch (IOException e
) {
4375 LOG
.info("Count=" + count
+ ", max=" + appendCounter
+ ", " + e
);
4383 * Test case to check append function with memstore flushing
4387 public void testParallelAppendWithMemStoreFlush() throws Exception
{
4388 byte[] family
= Appender
.family
;
4389 this.region
= initHRegion(tableName
, method
, CONF
, family
);
4390 final HRegion region
= this.region
;
4391 final AtomicBoolean appendDone
= new AtomicBoolean(false);
4392 Runnable flusher
= new Runnable() {
4395 while (!appendDone
.get()) {
4398 } catch (Exception e
) {
4399 e
.printStackTrace();
4405 // After all append finished, the value will append to threadNum *
4406 // appendCounter Appender.CHAR
4408 int appendCounter
= 100;
4409 byte[] expected
= new byte[threadNum
* appendCounter
];
4410 for (int i
= 0; i
< threadNum
* appendCounter
; i
++) {
4411 System
.arraycopy(Appender
.CHAR
, 0, expected
, i
, 1);
4413 Thread
[] appenders
= new Thread
[threadNum
];
4414 Thread flushThread
= new Thread(flusher
);
4415 for (int i
= 0; i
< threadNum
; i
++) {
4416 appenders
[i
] = new Thread(new Appender(this.region
, appendCounter
));
4417 appenders
[i
].start();
4419 flushThread
.start();
4420 for (int i
= 0; i
< threadNum
; i
++) {
4421 appenders
[i
].join();
4424 appendDone
.set(true);
4427 Get get
= new Get(Appender
.appendRow
);
4428 get
.addColumn(Appender
.family
, Appender
.qualifier
);
4429 get
.readVersions(1);
4430 Result res
= this.region
.get(get
);
4431 List
<Cell
> kvs
= res
.getColumnCells(Appender
.family
, Appender
.qualifier
);
4433 // we just got the latest version
4434 assertEquals(1, kvs
.size());
4435 Cell kv
= kvs
.get(0);
4436 byte[] appendResult
= new byte[kv
.getValueLength()];
4437 System
.arraycopy(kv
.getValueArray(), kv
.getValueOffset(), appendResult
, 0, kv
.getValueLength());
4438 assertArrayEquals(expected
, appendResult
);
4442 * Test case to check put function with memstore flushing for same row, same ts
4446 public void testPutWithMemStoreFlush() throws Exception
{
4447 byte[] family
= Bytes
.toBytes("family");
4448 byte[] qualifier
= Bytes
.toBytes("qualifier");
4449 byte[] row
= Bytes
.toBytes("putRow");
4450 byte[] value
= null;
4451 this.region
= initHRegion(tableName
, method
, CONF
, family
);
4454 List
<Cell
> kvs
= null;
4458 value
= Bytes
.toBytes("value0");
4459 put
.addColumn(family
, qualifier
, 1234567L, value
);
4462 get
.addColumn(family
, qualifier
);
4463 get
.readAllVersions();
4464 res
= this.region
.get(get
);
4465 kvs
= res
.getColumnCells(family
, qualifier
);
4466 assertEquals(1, kvs
.size());
4467 assertArrayEquals(Bytes
.toBytes("value0"), CellUtil
.cloneValue(kvs
.get(0)));
4471 get
.addColumn(family
, qualifier
);
4472 get
.readAllVersions();
4473 res
= this.region
.get(get
);
4474 kvs
= res
.getColumnCells(family
, qualifier
);
4475 assertEquals(1, kvs
.size());
4476 assertArrayEquals(Bytes
.toBytes("value0"), CellUtil
.cloneValue(kvs
.get(0)));
4479 value
= Bytes
.toBytes("value1");
4480 put
.addColumn(family
, qualifier
, 1234567L, value
);
4483 get
.addColumn(family
, qualifier
);
4484 get
.readAllVersions();
4485 res
= this.region
.get(get
);
4486 kvs
= res
.getColumnCells(family
, qualifier
);
4487 assertEquals(1, kvs
.size());
4488 assertArrayEquals(Bytes
.toBytes("value1"), CellUtil
.cloneValue(kvs
.get(0)));
4492 get
.addColumn(family
, qualifier
);
4493 get
.readAllVersions();
4494 res
= this.region
.get(get
);
4495 kvs
= res
.getColumnCells(family
, qualifier
);
4496 assertEquals(1, kvs
.size());
4497 assertArrayEquals(Bytes
.toBytes("value1"), CellUtil
.cloneValue(kvs
.get(0)));
4501 public void testDurability() throws Exception
{
4502 // there are 5 x 5 cases:
4503 // table durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT) x mutation
4504 // durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT)
4506 // expected cases for append and sync wal
4507 durabilityTest(method
, Durability
.SYNC_WAL
, Durability
.SYNC_WAL
, 0, true, true, false);
4508 durabilityTest(method
, Durability
.SYNC_WAL
, Durability
.FSYNC_WAL
, 0, true, true, false);
4509 durabilityTest(method
, Durability
.SYNC_WAL
, Durability
.USE_DEFAULT
, 0, true, true, false);
4511 durabilityTest(method
, Durability
.FSYNC_WAL
, Durability
.SYNC_WAL
, 0, true, true, false);
4512 durabilityTest(method
, Durability
.FSYNC_WAL
, Durability
.FSYNC_WAL
, 0, true, true, false);
4513 durabilityTest(method
, Durability
.FSYNC_WAL
, Durability
.USE_DEFAULT
, 0, true, true, false);
4515 durabilityTest(method
, Durability
.ASYNC_WAL
, Durability
.SYNC_WAL
, 0, true, true, false);
4516 durabilityTest(method
, Durability
.ASYNC_WAL
, Durability
.FSYNC_WAL
, 0, true, true, false);
4518 durabilityTest(method
, Durability
.SKIP_WAL
, Durability
.SYNC_WAL
, 0, true, true, false);
4519 durabilityTest(method
, Durability
.SKIP_WAL
, Durability
.FSYNC_WAL
, 0, true, true, false);
4521 durabilityTest(method
, Durability
.USE_DEFAULT
, Durability
.SYNC_WAL
, 0, true, true, false);
4522 durabilityTest(method
, Durability
.USE_DEFAULT
, Durability
.FSYNC_WAL
, 0, true, true, false);
4523 durabilityTest(method
, Durability
.USE_DEFAULT
, Durability
.USE_DEFAULT
, 0, true, true, false);
4525 // expected cases for async wal
4526 durabilityTest(method
, Durability
.SYNC_WAL
, Durability
.ASYNC_WAL
, 0, true, false, false);
4527 durabilityTest(method
, Durability
.FSYNC_WAL
, Durability
.ASYNC_WAL
, 0, true, false, false);
4528 durabilityTest(method
, Durability
.ASYNC_WAL
, Durability
.ASYNC_WAL
, 0, true, false, false);
4529 durabilityTest(method
, Durability
.SKIP_WAL
, Durability
.ASYNC_WAL
, 0, true, false, false);
4530 durabilityTest(method
, Durability
.USE_DEFAULT
, Durability
.ASYNC_WAL
, 0, true, false, false);
4531 durabilityTest(method
, Durability
.ASYNC_WAL
, Durability
.USE_DEFAULT
, 0, true, false, false);
4533 durabilityTest(method
, Durability
.SYNC_WAL
, Durability
.ASYNC_WAL
, 5000, true, false, true);
4534 durabilityTest(method
, Durability
.FSYNC_WAL
, Durability
.ASYNC_WAL
, 5000, true, false, true);
4535 durabilityTest(method
, Durability
.ASYNC_WAL
, Durability
.ASYNC_WAL
, 5000, true, false, true);
4536 durabilityTest(method
, Durability
.SKIP_WAL
, Durability
.ASYNC_WAL
, 5000, true, false, true);
4537 durabilityTest(method
, Durability
.USE_DEFAULT
, Durability
.ASYNC_WAL
, 5000, true, false, true);
4538 durabilityTest(method
, Durability
.ASYNC_WAL
, Durability
.USE_DEFAULT
, 5000, true, false, true);
4540 // expect skip wal cases
4541 durabilityTest(method
, Durability
.SYNC_WAL
, Durability
.SKIP_WAL
, 0, false, false, false);
4542 durabilityTest(method
, Durability
.FSYNC_WAL
, Durability
.SKIP_WAL
, 0, false, false, false);
4543 durabilityTest(method
, Durability
.ASYNC_WAL
, Durability
.SKIP_WAL
, 0, false, false, false);
4544 durabilityTest(method
, Durability
.SKIP_WAL
, Durability
.SKIP_WAL
, 0, false, false, false);
4545 durabilityTest(method
, Durability
.USE_DEFAULT
, Durability
.SKIP_WAL
, 0, false, false, false);
4546 durabilityTest(method
, Durability
.SKIP_WAL
, Durability
.USE_DEFAULT
, 0, false, false, false);
4550 private void durabilityTest(String method
, Durability tableDurability
,
4551 Durability mutationDurability
, long timeout
, boolean expectAppend
, final boolean expectSync
,
4552 final boolean expectSyncFromLogSyncer
) throws Exception
{
4553 Configuration conf
= HBaseConfiguration
.create(CONF
);
4554 method
= method
+ "_" + tableDurability
.name() + "_" + mutationDurability
.name();
4555 byte[] family
= Bytes
.toBytes("family");
4556 Path logDir
= new Path(new Path(dir
+ method
), "log");
4557 final Configuration walConf
= new Configuration(conf
);
4558 FSUtils
.setRootDir(walConf
, logDir
);
4559 // XXX: The spied AsyncFSWAL can not work properly because of a Mockito defect that can not
4560 // deal with classes which have a field of an inner class. See discussions in HBASE-15536.
4561 walConf
.set(WALFactory
.WAL_PROVIDER
, "filesystem");
4562 final WALFactory wals
= new WALFactory(walConf
, TEST_UTIL
.getRandomUUID().toString());
4563 final WAL wal
= spy(wals
.getWAL(RegionInfoBuilder
.newBuilder(tableName
).build()));
4564 this.region
= initHRegion(tableName
, HConstants
.EMPTY_START_ROW
,
4565 HConstants
.EMPTY_END_ROW
, false, tableDurability
, wal
,
4566 new byte[][] { family
});
4568 Put put
= new Put(Bytes
.toBytes("r1"));
4569 put
.addColumn(family
, Bytes
.toBytes("q1"), Bytes
.toBytes("v1"));
4570 put
.setDurability(mutationDurability
);
4573 // verify append called or not
4574 verify(wal
, expectAppend ?
times(1) : never()).appendData((HRegionInfo
) any(),
4575 (WALKeyImpl
) any(), (WALEdit
) any());
4577 // verify sync called or not
4578 if (expectSync
|| expectSyncFromLogSyncer
) {
4579 TEST_UTIL
.waitFor(timeout
, new Waiter
.Predicate
<Exception
>() {
4581 public boolean evaluate() throws Exception
{
4584 verify(wal
, times(1)).sync(anyLong()); // Hregion calls this one
4585 } else if (expectSyncFromLogSyncer
) {
4586 verify(wal
, times(1)).sync(); // wal syncer calls this one
4588 } catch (Throwable ignore
) {
4594 //verify(wal, never()).sync(anyLong());
4595 verify(wal
, never()).sync();
4598 HBaseTestingUtility
.closeRegionAndWAL(this.region
);
4604 public void testRegionReplicaSecondary() throws IOException
{
4605 // create a primary region, load some data and flush
4606 // create a secondary region, and do a get against that
4607 Path rootDir
= new Path(dir
+ name
.getMethodName());
4608 FSUtils
.setRootDir(TEST_UTIL
.getConfiguration(), rootDir
);
4610 byte[][] families
= new byte[][] {
4611 Bytes
.toBytes("cf1"), Bytes
.toBytes("cf2"), Bytes
.toBytes("cf3")
4613 byte[] cq
= Bytes
.toBytes("cq");
4614 HTableDescriptor htd
= new HTableDescriptor(TableName
.valueOf(name
.getMethodName()));
4615 for (byte[] family
: families
) {
4616 htd
.addFamily(new HColumnDescriptor(family
));
4619 long time
= System
.currentTimeMillis();
4620 HRegionInfo primaryHri
= new HRegionInfo(htd
.getTableName(),
4621 HConstants
.EMPTY_START_ROW
, HConstants
.EMPTY_END_ROW
,
4623 HRegionInfo secondaryHri
= new HRegionInfo(htd
.getTableName(),
4624 HConstants
.EMPTY_START_ROW
, HConstants
.EMPTY_END_ROW
,
4627 HRegion primaryRegion
= null, secondaryRegion
= null;
4630 primaryRegion
= HBaseTestingUtility
.createRegionAndWAL(primaryHri
,
4631 rootDir
, TEST_UTIL
.getConfiguration(), htd
);
4634 putData(primaryRegion
, 0, 1000, cq
, families
);
4637 primaryRegion
.flush(true);
4639 // open secondary region
4640 secondaryRegion
= HRegion
.openHRegion(rootDir
, secondaryHri
, htd
, null, CONF
);
4642 verifyData(secondaryRegion
, 0, 1000, cq
, families
);
4644 if (primaryRegion
!= null) {
4645 HBaseTestingUtility
.closeRegionAndWAL(primaryRegion
);
4647 if (secondaryRegion
!= null) {
4648 HBaseTestingUtility
.closeRegionAndWAL(secondaryRegion
);
4654 public void testRegionReplicaSecondaryIsReadOnly() throws IOException
{
4655 // create a primary region, load some data and flush
4656 // create a secondary region, and do a put against that
4657 Path rootDir
= new Path(dir
+ name
.getMethodName());
4658 FSUtils
.setRootDir(TEST_UTIL
.getConfiguration(), rootDir
);
4660 byte[][] families
= new byte[][] {
4661 Bytes
.toBytes("cf1"), Bytes
.toBytes("cf2"), Bytes
.toBytes("cf3")
4663 byte[] cq
= Bytes
.toBytes("cq");
4664 HTableDescriptor htd
= new HTableDescriptor(TableName
.valueOf(name
.getMethodName()));
4665 for (byte[] family
: families
) {
4666 htd
.addFamily(new HColumnDescriptor(family
));
4669 long time
= System
.currentTimeMillis();
4670 HRegionInfo primaryHri
= new HRegionInfo(htd
.getTableName(),
4671 HConstants
.EMPTY_START_ROW
, HConstants
.EMPTY_END_ROW
,
4673 HRegionInfo secondaryHri
= new HRegionInfo(htd
.getTableName(),
4674 HConstants
.EMPTY_START_ROW
, HConstants
.EMPTY_END_ROW
,
4677 HRegion primaryRegion
= null, secondaryRegion
= null;
4680 primaryRegion
= HBaseTestingUtility
.createRegionAndWAL(primaryHri
,
4681 rootDir
, TEST_UTIL
.getConfiguration(), htd
);
4684 putData(primaryRegion
, 0, 1000, cq
, families
);
4687 primaryRegion
.flush(true);
4689 // open secondary region
4690 secondaryRegion
= HRegion
.openHRegion(rootDir
, secondaryHri
, htd
, null, CONF
);
4693 putData(secondaryRegion
, 0, 1000, cq
, families
);
4694 fail("Should have thrown exception");
4695 } catch (IOException ex
) {
4699 if (primaryRegion
!= null) {
4700 HBaseTestingUtility
.closeRegionAndWAL(primaryRegion
);
4702 if (secondaryRegion
!= null) {
4703 HBaseTestingUtility
.closeRegionAndWAL(secondaryRegion
);
4708 static WALFactory
createWALFactory(Configuration conf
, Path rootDir
) throws IOException
{
4709 Configuration confForWAL
= new Configuration(conf
);
4710 confForWAL
.set(HConstants
.HBASE_DIR
, rootDir
.toString());
4711 return new WALFactory(confForWAL
, "hregion-" + RandomStringUtils
.randomNumeric(8));
4715 public void testCompactionFromPrimary() throws IOException
{
4716 Path rootDir
= new Path(dir
+ name
.getMethodName());
4717 FSUtils
.setRootDir(TEST_UTIL
.getConfiguration(), rootDir
);
4719 byte[][] families
= new byte[][] {
4720 Bytes
.toBytes("cf1"), Bytes
.toBytes("cf2"), Bytes
.toBytes("cf3")
4722 byte[] cq
= Bytes
.toBytes("cq");
4723 HTableDescriptor htd
= new HTableDescriptor(TableName
.valueOf(name
.getMethodName()));
4724 for (byte[] family
: families
) {
4725 htd
.addFamily(new HColumnDescriptor(family
));
4728 long time
= System
.currentTimeMillis();
4729 HRegionInfo primaryHri
= new HRegionInfo(htd
.getTableName(),
4730 HConstants
.EMPTY_START_ROW
, HConstants
.EMPTY_END_ROW
,
4732 HRegionInfo secondaryHri
= new HRegionInfo(htd
.getTableName(),
4733 HConstants
.EMPTY_START_ROW
, HConstants
.EMPTY_END_ROW
,
4736 HRegion primaryRegion
= null, secondaryRegion
= null;
4739 primaryRegion
= HBaseTestingUtility
.createRegionAndWAL(primaryHri
,
4740 rootDir
, TEST_UTIL
.getConfiguration(), htd
);
4743 putData(primaryRegion
, 0, 1000, cq
, families
);
4746 primaryRegion
.flush(true);
4748 // open secondary region
4749 secondaryRegion
= HRegion
.openHRegion(rootDir
, secondaryHri
, htd
, null, CONF
);
4751 // move the file of the primary region to the archive, simulating a compaction
4752 Collection
<HStoreFile
> storeFiles
= primaryRegion
.getStore(families
[0]).getStorefiles();
4753 primaryRegion
.getRegionFileSystem().removeStoreFiles(Bytes
.toString(families
[0]), storeFiles
);
4754 Collection
<StoreFileInfo
> storeFileInfos
= primaryRegion
.getRegionFileSystem()
4755 .getStoreFiles(families
[0]);
4756 Assert
.assertTrue(storeFileInfos
== null || storeFileInfos
.isEmpty());
4758 verifyData(secondaryRegion
, 0, 1000, cq
, families
);
4760 if (primaryRegion
!= null) {
4761 HBaseTestingUtility
.closeRegionAndWAL(primaryRegion
);
4763 if (secondaryRegion
!= null) {
4764 HBaseTestingUtility
.closeRegionAndWAL(secondaryRegion
);
4769 private void putData(int startRow
, int numRows
, byte[] qf
, byte[]... families
) throws
4771 putData(this.region
, startRow
, numRows
, qf
, families
);
4774 private void putData(HRegion region
,
4775 int startRow
, int numRows
, byte[] qf
, byte[]... families
) throws IOException
{
4776 putData(region
, Durability
.SKIP_WAL
, startRow
, numRows
, qf
, families
);
4779 static void putData(HRegion region
, Durability durability
,
4780 int startRow
, int numRows
, byte[] qf
, byte[]... families
) throws IOException
{
4781 for (int i
= startRow
; i
< startRow
+ numRows
; i
++) {
4782 Put put
= new Put(Bytes
.toBytes("" + i
));
4783 put
.setDurability(durability
);
4784 for (byte[] family
: families
) {
4785 put
.addColumn(family
, qf
, null);
4788 LOG
.info(put
.toString());
4792 static void verifyData(HRegion newReg
, int startRow
, int numRows
, byte[] qf
, byte[]... families
)
4793 throws IOException
{
4794 for (int i
= startRow
; i
< startRow
+ numRows
; i
++) {
4795 byte[] row
= Bytes
.toBytes("" + i
);
4796 Get get
= new Get(row
);
4797 for (byte[] family
: families
) {
4798 get
.addColumn(family
, qf
);
4800 Result result
= newReg
.get(get
);
4801 Cell
[] raw
= result
.rawCells();
4802 assertEquals(families
.length
, result
.size());
4803 for (int j
= 0; j
< families
.length
; j
++) {
4804 assertTrue(CellUtil
.matchingRows(raw
[j
], row
));
4805 assertTrue(CellUtil
.matchingFamily(raw
[j
], families
[j
]));
4806 assertTrue(CellUtil
.matchingQualifier(raw
[j
], qf
));
4811 static void assertGet(final HRegion r
, final byte[] family
, final byte[] k
) throws IOException
{
4812 // Now I have k, get values out and assert they are as expected.
4813 Get get
= new Get(k
).addFamily(family
).readAllVersions();
4814 Cell
[] results
= r
.get(get
).rawCells();
4815 for (int j
= 0; j
< results
.length
; j
++) {
4816 byte[] tmp
= CellUtil
.cloneValue(results
[j
]);
4817 // Row should be equal to value every time.
4818 assertTrue(Bytes
.equals(k
, tmp
));
4823 * Assert first value in the passed region is <code>firstValue</code>.
4831 * @throws IOException
4833 protected void assertScan(final HRegion r
, final byte[] fs
, final byte[] firstValue
)
4834 throws IOException
{
4835 byte[][] families
= { fs
};
4836 Scan scan
= new Scan();
4837 for (int i
= 0; i
< families
.length
; i
++)
4838 scan
.addFamily(families
[i
]);
4839 InternalScanner s
= r
.getScanner(scan
);
4841 List
<Cell
> curVals
= new ArrayList
<>();
4842 boolean first
= true;
4843 OUTER_LOOP
: while (s
.next(curVals
)) {
4844 for (Cell kv
: curVals
) {
4845 byte[] val
= CellUtil
.cloneValue(kv
);
4846 byte[] curval
= val
;
4849 assertTrue(Bytes
.compareTo(curval
, firstValue
) == 0);
4851 // Not asserting anything. Might as well break.
4862 * Test that we get the expected flush results back
4865 public void testFlushResult() throws IOException
{
4866 byte[] family
= Bytes
.toBytes("family");
4868 this.region
= initHRegion(tableName
, method
, family
);
4870 // empty memstore, flush doesn't run
4871 HRegion
.FlushResult fr
= region
.flush(true);
4872 assertFalse(fr
.isFlushSucceeded());
4873 assertFalse(fr
.isCompactionNeeded());
4875 // Flush enough files to get up to the threshold, doesn't need compactions
4876 for (int i
= 0; i
< 2; i
++) {
4877 Put put
= new Put(tableName
.toBytes()).addColumn(family
, family
, tableName
.toBytes());
4879 fr
= region
.flush(true);
4880 assertTrue(fr
.isFlushSucceeded());
4881 assertFalse(fr
.isCompactionNeeded());
4884 // Two flushes after the threshold, compactions are needed
4885 for (int i
= 0; i
< 2; i
++) {
4886 Put put
= new Put(tableName
.toBytes()).addColumn(family
, family
, tableName
.toBytes());
4888 fr
= region
.flush(true);
4889 assertTrue(fr
.isFlushSucceeded());
4890 assertTrue(fr
.isCompactionNeeded());
4894 protected Configuration
initSplit() {
4895 // Always compact if there is more than one store file.
4896 CONF
.setInt("hbase.hstore.compactionThreshold", 2);
4898 CONF
.setInt(HConstants
.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD
, 10 * 1000);
4900 // Increase the amount of time between client retries
4901 CONF
.setLong("hbase.client.pause", 15 * 1000);
4903 // This size should make it so we always split using the addContent
4904 // below. After adding all data, the first region is 1.3M
4905 CONF
.setLong(HConstants
.HREGION_MAX_FILESIZE
, 1024 * 128);
4910 * @return A region on which you must call
4911 * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
4913 protected HRegion
initHRegion(TableName tableName
, String callingMethod
, Configuration conf
,
4914 byte[]... families
) throws IOException
{
4915 return initHRegion(tableName
, callingMethod
, conf
, false, families
);
4919 * @return A region on which you must call
4920 * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
4922 protected HRegion
initHRegion(TableName tableName
, String callingMethod
, Configuration conf
,
4923 boolean isReadOnly
, byte[]... families
) throws IOException
{
4924 return initHRegion(tableName
, null, null, callingMethod
, conf
, isReadOnly
, families
);
4927 protected HRegion
initHRegion(TableName tableName
, byte[] startKey
, byte[] stopKey
,
4928 String callingMethod
, Configuration conf
, boolean isReadOnly
, byte[]... families
)
4929 throws IOException
{
4930 Path logDir
= TEST_UTIL
.getDataTestDirOnTestFS(callingMethod
+ ".log");
4931 HRegionInfo hri
= new HRegionInfo(tableName
, startKey
, stopKey
);
4932 final WAL wal
= HBaseTestingUtility
.createWal(conf
, logDir
, hri
);
4933 return initHRegion(tableName
, startKey
, stopKey
, isReadOnly
,
4934 Durability
.SYNC_WAL
, wal
, families
);
4938 * @return A region on which you must call
4939 * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
4941 public HRegion
initHRegion(TableName tableName
, byte[] startKey
, byte[] stopKey
,
4942 boolean isReadOnly
, Durability durability
, WAL wal
, byte[]... families
) throws IOException
{
4943 ChunkCreator
.initialize(MemStoreLABImpl
.CHUNK_SIZE_DEFAULT
, false, 0, 0, 0, null);
4944 return TEST_UTIL
.createLocalHRegion(tableName
, startKey
, stopKey
,
4945 isReadOnly
, durability
, wal
, families
);
4949 * Assert that the passed in Cell has expected contents for the specified row,
4950 * column & timestamp.
4952 private void checkOneCell(Cell kv
, byte[] cf
, int rowIdx
, int colIdx
, long ts
) {
4953 String ctx
= "rowIdx=" + rowIdx
+ "; colIdx=" + colIdx
+ "; ts=" + ts
;
4954 assertEquals("Row mismatch which checking: " + ctx
, "row:" + rowIdx
,
4955 Bytes
.toString(CellUtil
.cloneRow(kv
)));
4956 assertEquals("ColumnFamily mismatch while checking: " + ctx
, Bytes
.toString(cf
),
4957 Bytes
.toString(CellUtil
.cloneFamily(kv
)));
4958 assertEquals("Column qualifier mismatch while checking: " + ctx
, "column:" + colIdx
,
4959 Bytes
.toString(CellUtil
.cloneQualifier(kv
)));
4960 assertEquals("Timestamp mismatch while checking: " + ctx
, ts
, kv
.getTimestamp());
4961 assertEquals("Value mismatch while checking: " + ctx
, "value-version-" + ts
,
4962 Bytes
.toString(CellUtil
.cloneValue(kv
)));
4966 public void testReverseScanner_FromMemStore_SingleCF_Normal()
4967 throws IOException
{
4968 byte[] rowC
= Bytes
.toBytes("rowC");
4969 byte[] rowA
= Bytes
.toBytes("rowA");
4970 byte[] rowB
= Bytes
.toBytes("rowB");
4971 byte[] cf
= Bytes
.toBytes("CF");
4972 byte[][] families
= { cf
};
4973 byte[] col
= Bytes
.toBytes("C");
4975 this.region
= initHRegion(tableName
, method
, families
);
4976 KeyValue kv1
= new KeyValue(rowC
, cf
, col
, ts
, KeyValue
.Type
.Put
, null);
4977 KeyValue kv11
= new KeyValue(rowC
, cf
, col
, ts
+ 1, KeyValue
.Type
.Put
,
4979 KeyValue kv2
= new KeyValue(rowA
, cf
, col
, ts
, KeyValue
.Type
.Put
, null);
4980 KeyValue kv3
= new KeyValue(rowB
, cf
, col
, ts
, KeyValue
.Type
.Put
, null);
4982 put
= new Put(rowC
);
4986 put
= new Put(rowA
);
4989 put
= new Put(rowB
);
4993 Scan scan
= new Scan(rowC
);
4994 scan
.setMaxVersions(5);
4995 scan
.setReversed(true);
4996 InternalScanner scanner
= region
.getScanner(scan
);
4997 List
<Cell
> currRow
= new ArrayList
<>();
4998 boolean hasNext
= scanner
.next(currRow
);
4999 assertEquals(2, currRow
.size());
5000 assertTrue(Bytes
.equals(currRow
.get(0).getRowArray(), currRow
.get(0).getRowOffset(), currRow
5001 .get(0).getRowLength(), rowC
, 0, rowC
.length
));
5002 assertTrue(hasNext
);
5004 hasNext
= scanner
.next(currRow
);
5005 assertEquals(1, currRow
.size());
5006 assertTrue(Bytes
.equals(currRow
.get(0).getRowArray(), currRow
.get(0).getRowOffset(), currRow
5007 .get(0).getRowLength(), rowB
, 0, rowB
.length
));
5008 assertTrue(hasNext
);
5010 hasNext
= scanner
.next(currRow
);
5011 assertEquals(1, currRow
.size());
5012 assertTrue(Bytes
.equals(currRow
.get(0).getRowArray(), currRow
.get(0).getRowOffset(), currRow
5013 .get(0).getRowLength(), rowA
, 0, rowA
.length
));
5014 assertFalse(hasNext
);
5019 public void testReverseScanner_FromMemStore_SingleCF_LargerKey()
5020 throws IOException
{
5021 byte[] rowC
= Bytes
.toBytes("rowC");
5022 byte[] rowA
= Bytes
.toBytes("rowA");
5023 byte[] rowB
= Bytes
.toBytes("rowB");
5024 byte[] rowD
= Bytes
.toBytes("rowD");
5025 byte[] cf
= Bytes
.toBytes("CF");
5026 byte[][] families
= { cf
};
5027 byte[] col
= Bytes
.toBytes("C");
5029 this.region
= initHRegion(tableName
, method
, families
);
5030 KeyValue kv1
= new KeyValue(rowC
, cf
, col
, ts
, KeyValue
.Type
.Put
, null);
5031 KeyValue kv11
= new KeyValue(rowC
, cf
, col
, ts
+ 1, KeyValue
.Type
.Put
,
5033 KeyValue kv2
= new KeyValue(rowA
, cf
, col
, ts
, KeyValue
.Type
.Put
, null);
5034 KeyValue kv3
= new KeyValue(rowB
, cf
, col
, ts
, KeyValue
.Type
.Put
, null);
5036 put
= new Put(rowC
);
5040 put
= new Put(rowA
);
5043 put
= new Put(rowB
);
5047 Scan scan
= new Scan(rowD
);
5048 List
<Cell
> currRow
= new ArrayList
<>();
5049 scan
.setReversed(true);
5050 scan
.setMaxVersions(5);
5051 InternalScanner scanner
= region
.getScanner(scan
);
5052 boolean hasNext
= scanner
.next(currRow
);
5053 assertEquals(2, currRow
.size());
5054 assertTrue(Bytes
.equals(currRow
.get(0).getRowArray(), currRow
.get(0).getRowOffset(), currRow
5055 .get(0).getRowLength(), rowC
, 0, rowC
.length
));
5056 assertTrue(hasNext
);
5058 hasNext
= scanner
.next(currRow
);
5059 assertEquals(1, currRow
.size());
5060 assertTrue(Bytes
.equals(currRow
.get(0).getRowArray(), currRow
.get(0).getRowOffset(), currRow
5061 .get(0).getRowLength(), rowB
, 0, rowB
.length
));
5062 assertTrue(hasNext
);
5064 hasNext
= scanner
.next(currRow
);
5065 assertEquals(1, currRow
.size());
5066 assertTrue(Bytes
.equals(currRow
.get(0).getRowArray(), currRow
.get(0).getRowOffset(), currRow
5067 .get(0).getRowLength(), rowA
, 0, rowA
.length
));
5068 assertFalse(hasNext
);
5073 public void testReverseScanner_FromMemStore_SingleCF_FullScan()
5074 throws IOException
{
5075 byte[] rowC
= Bytes
.toBytes("rowC");
5076 byte[] rowA
= Bytes
.toBytes("rowA");
5077 byte[] rowB
= Bytes
.toBytes("rowB");
5078 byte[] cf
= Bytes
.toBytes("CF");
5079 byte[][] families
= { cf
};
5080 byte[] col
= Bytes
.toBytes("C");
5082 this.region
= initHRegion(tableName
, method
, families
);
5083 KeyValue kv1
= new KeyValue(rowC
, cf
, col
, ts
, KeyValue
.Type
.Put
, null);
5084 KeyValue kv11
= new KeyValue(rowC
, cf
, col
, ts
+ 1, KeyValue
.Type
.Put
,
5086 KeyValue kv2
= new KeyValue(rowA
, cf
, col
, ts
, KeyValue
.Type
.Put
, null);
5087 KeyValue kv3
= new KeyValue(rowB
, cf
, col
, ts
, KeyValue
.Type
.Put
, null);
5089 put
= new Put(rowC
);
5093 put
= new Put(rowA
);
5096 put
= new Put(rowB
);
5099 Scan scan
= new Scan();
5100 List
<Cell
> currRow
= new ArrayList
<>();
5101 scan
.setReversed(true);
5102 InternalScanner scanner
= region
.getScanner(scan
);
5103 boolean hasNext
= scanner
.next(currRow
);
5104 assertEquals(1, currRow
.size());
5105 assertTrue(Bytes
.equals(currRow
.get(0).getRowArray(), currRow
.get(0).getRowOffset(), currRow
5106 .get(0).getRowLength(), rowC
, 0, rowC
.length
));
5107 assertTrue(hasNext
);
5109 hasNext
= scanner
.next(currRow
);
5110 assertEquals(1, currRow
.size());
5111 assertTrue(Bytes
.equals(currRow
.get(0).getRowArray(), currRow
.get(0).getRowOffset(), currRow
5112 .get(0).getRowLength(), rowB
, 0, rowB
.length
));
5113 assertTrue(hasNext
);
5115 hasNext
= scanner
.next(currRow
);
5116 assertEquals(1, currRow
.size());
5117 assertTrue(Bytes
.equals(currRow
.get(0).getRowArray(), currRow
.get(0).getRowOffset(), currRow
5118 .get(0).getRowLength(), rowA
, 0, rowA
.length
));
5119 assertFalse(hasNext
);
5124 public void testReverseScanner_moreRowsMayExistAfter() throws IOException
{
5125 // case for "INCLUDE_AND_SEEK_NEXT_ROW & SEEK_NEXT_ROW" endless loop
5126 byte[] rowA
= Bytes
.toBytes("rowA");
5127 byte[] rowB
= Bytes
.toBytes("rowB");
5128 byte[] rowC
= Bytes
.toBytes("rowC");
5129 byte[] rowD
= Bytes
.toBytes("rowD");
5130 byte[] rowE
= Bytes
.toBytes("rowE");
5131 byte[] cf
= Bytes
.toBytes("CF");
5132 byte[][] families
= { cf
};
5133 byte[] col1
= Bytes
.toBytes("col1");
5134 byte[] col2
= Bytes
.toBytes("col2");
5136 this.region
= initHRegion(tableName
, method
, families
);
5137 KeyValue kv1
= new KeyValue(rowA
, cf
, col1
, ts
, KeyValue
.Type
.Put
, null);
5138 KeyValue kv2
= new KeyValue(rowB
, cf
, col1
, ts
, KeyValue
.Type
.Put
, null);
5139 KeyValue kv3
= new KeyValue(rowC
, cf
, col1
, ts
, KeyValue
.Type
.Put
, null);
5140 KeyValue kv4_1
= new KeyValue(rowD
, cf
, col1
, ts
, KeyValue
.Type
.Put
, null);
5141 KeyValue kv4_2
= new KeyValue(rowD
, cf
, col2
, ts
, KeyValue
.Type
.Put
, null);
5142 KeyValue kv5
= new KeyValue(rowE
, cf
, col1
, ts
, KeyValue
.Type
.Put
, null);
5144 put
= new Put(rowA
);
5147 put
= new Put(rowB
);
5150 put
= new Put(rowC
);
5153 put
= new Put(rowD
);
5156 put
= new Put(rowD
);
5159 put
= new Put(rowE
);
5163 Scan scan
= new Scan(rowD
, rowA
);
5164 scan
.addColumn(families
[0], col1
);
5165 scan
.setReversed(true);
5166 List
<Cell
> currRow
= new ArrayList
<>();
5167 InternalScanner scanner
= region
.getScanner(scan
);
5168 boolean hasNext
= scanner
.next(currRow
);
5169 assertEquals(1, currRow
.size());
5170 assertTrue(Bytes
.equals(currRow
.get(0).getRowArray(), currRow
.get(0).getRowOffset(), currRow
5171 .get(0).getRowLength(), rowD
, 0, rowD
.length
));
5172 assertTrue(hasNext
);
5174 hasNext
= scanner
.next(currRow
);
5175 assertEquals(1, currRow
.size());
5176 assertTrue(Bytes
.equals(currRow
.get(0).getRowArray(), currRow
.get(0).getRowOffset(), currRow
5177 .get(0).getRowLength(), rowC
, 0, rowC
.length
));
5178 assertTrue(hasNext
);
5180 hasNext
= scanner
.next(currRow
);
5181 assertEquals(1, currRow
.size());
5182 assertTrue(Bytes
.equals(currRow
.get(0).getRowArray(), currRow
.get(0).getRowOffset(), currRow
5183 .get(0).getRowLength(), rowB
, 0, rowB
.length
));
5184 assertFalse(hasNext
);
5187 scan
= new Scan(rowD
, rowA
);
5188 scan
.addColumn(families
[0], col2
);
5189 scan
.setReversed(true);
5191 scanner
= region
.getScanner(scan
);
5192 hasNext
= scanner
.next(currRow
);
5193 assertEquals(1, currRow
.size());
5194 assertTrue(Bytes
.equals(currRow
.get(0).getRowArray(), currRow
.get(0).getRowOffset(), currRow
5195 .get(0).getRowLength(), rowD
, 0, rowD
.length
));
5200 public void testReverseScanner_smaller_blocksize() throws IOException
{
5201 // case to ensure no conflict with HFile index optimization
5202 byte[] rowA
= Bytes
.toBytes("rowA");
5203 byte[] rowB
= Bytes
.toBytes("rowB");
5204 byte[] rowC
= Bytes
.toBytes("rowC");
5205 byte[] rowD
= Bytes
.toBytes("rowD");
5206 byte[] rowE
= Bytes
.toBytes("rowE");
5207 byte[] cf
= Bytes
.toBytes("CF");
5208 byte[][] families
= { cf
};
5209 byte[] col1
= Bytes
.toBytes("col1");
5210 byte[] col2
= Bytes
.toBytes("col2");
5212 HBaseConfiguration config
= new HBaseConfiguration();
5213 config
.setInt("test.block.size", 1);
5214 this.region
= initHRegion(tableName
, method
, config
, families
);
5215 KeyValue kv1
= new KeyValue(rowA
, cf
, col1
, ts
, KeyValue
.Type
.Put
, null);
5216 KeyValue kv2
= new KeyValue(rowB
, cf
, col1
, ts
, KeyValue
.Type
.Put
, null);
5217 KeyValue kv3
= new KeyValue(rowC
, cf
, col1
, ts
, KeyValue
.Type
.Put
, null);
5218 KeyValue kv4_1
= new KeyValue(rowD
, cf
, col1
, ts
, KeyValue
.Type
.Put
, null);
5219 KeyValue kv4_2
= new KeyValue(rowD
, cf
, col2
, ts
, KeyValue
.Type
.Put
, null);
5220 KeyValue kv5
= new KeyValue(rowE
, cf
, col1
, ts
, KeyValue
.Type
.Put
, null);
5222 put
= new Put(rowA
);
5225 put
= new Put(rowB
);
5228 put
= new Put(rowC
);
5231 put
= new Put(rowD
);
5234 put
= new Put(rowD
);
5237 put
= new Put(rowE
);
5241 Scan scan
= new Scan(rowD
, rowA
);
5242 scan
.addColumn(families
[0], col1
);
5243 scan
.setReversed(true);
5244 List
<Cell
> currRow
= new ArrayList
<>();
5245 InternalScanner scanner
= region
.getScanner(scan
);
5246 boolean hasNext
= scanner
.next(currRow
);
5247 assertEquals(1, currRow
.size());
5248 assertTrue(Bytes
.equals(currRow
.get(0).getRowArray(), currRow
.get(0).getRowOffset(), currRow
5249 .get(0).getRowLength(), rowD
, 0, rowD
.length
));
5250 assertTrue(hasNext
);
5252 hasNext
= scanner
.next(currRow
);
5253 assertEquals(1, currRow
.size());
5254 assertTrue(Bytes
.equals(currRow
.get(0).getRowArray(), currRow
.get(0).getRowOffset(), currRow
5255 .get(0).getRowLength(), rowC
, 0, rowC
.length
));
5256 assertTrue(hasNext
);
5258 hasNext
= scanner
.next(currRow
);
5259 assertEquals(1, currRow
.size());
5260 assertTrue(Bytes
.equals(currRow
.get(0).getRowArray(), currRow
.get(0).getRowOffset(), currRow
5261 .get(0).getRowLength(), rowB
, 0, rowB
.length
));
5262 assertFalse(hasNext
);
5265 scan
= new Scan(rowD
, rowA
);
5266 scan
.addColumn(families
[0], col2
);
5267 scan
.setReversed(true);
5269 scanner
= region
.getScanner(scan
);
5270 hasNext
= scanner
.next(currRow
);
5271 assertEquals(1, currRow
.size());
5272 assertTrue(Bytes
.equals(currRow
.get(0).getRowArray(), currRow
.get(0).getRowOffset(), currRow
5273 .get(0).getRowLength(), rowD
, 0, rowD
.length
));
5278 public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs1()
5279 throws IOException
{
5280 byte[] row0
= Bytes
.toBytes("row0"); // 1 kv
5281 byte[] row1
= Bytes
.toBytes("row1"); // 2 kv
5282 byte[] row2
= Bytes
.toBytes("row2"); // 4 kv
5283 byte[] row3
= Bytes
.toBytes("row3"); // 2 kv
5284 byte[] row4
= Bytes
.toBytes("row4"); // 5 kv
5285 byte[] row5
= Bytes
.toBytes("row5"); // 2 kv
5286 byte[] cf1
= Bytes
.toBytes("CF1");
5287 byte[] cf2
= Bytes
.toBytes("CF2");
5288 byte[] cf3
= Bytes
.toBytes("CF3");
5289 byte[][] families
= { cf1
, cf2
, cf3
};
5290 byte[] col
= Bytes
.toBytes("C");
5292 HBaseConfiguration conf
= new HBaseConfiguration();
5293 // disable compactions in this test.
5294 conf
.setInt("hbase.hstore.compactionThreshold", 10000);
5295 this.region
= initHRegion(tableName
, method
, conf
, families
);
5296 // kv naming style: kv(row number) totalKvCountInThisRow seq no
5297 KeyValue kv0_1_1
= new KeyValue(row0
, cf1
, col
, ts
, KeyValue
.Type
.Put
,
5299 KeyValue kv1_2_1
= new KeyValue(row1
, cf2
, col
, ts
, KeyValue
.Type
.Put
,
5301 KeyValue kv1_2_2
= new KeyValue(row1
, cf1
, col
, ts
+ 1,
5302 KeyValue
.Type
.Put
, null);
5303 KeyValue kv2_4_1
= new KeyValue(row2
, cf2
, col
, ts
, KeyValue
.Type
.Put
,
5305 KeyValue kv2_4_2
= new KeyValue(row2
, cf1
, col
, ts
, KeyValue
.Type
.Put
,
5307 KeyValue kv2_4_3
= new KeyValue(row2
, cf3
, col
, ts
, KeyValue
.Type
.Put
,
5309 KeyValue kv2_4_4
= new KeyValue(row2
, cf1
, col
, ts
+ 4,
5310 KeyValue
.Type
.Put
, null);
5311 KeyValue kv3_2_1
= new KeyValue(row3
, cf2
, col
, ts
, KeyValue
.Type
.Put
,
5313 KeyValue kv3_2_2
= new KeyValue(row3
, cf1
, col
, ts
+ 4,
5314 KeyValue
.Type
.Put
, null);
5315 KeyValue kv4_5_1
= new KeyValue(row4
, cf1
, col
, ts
, KeyValue
.Type
.Put
,
5317 KeyValue kv4_5_2
= new KeyValue(row4
, cf3
, col
, ts
, KeyValue
.Type
.Put
,
5319 KeyValue kv4_5_3
= new KeyValue(row4
, cf3
, col
, ts
+ 5,
5320 KeyValue
.Type
.Put
, null);
5321 KeyValue kv4_5_4
= new KeyValue(row4
, cf2
, col
, ts
, KeyValue
.Type
.Put
,
5323 KeyValue kv4_5_5
= new KeyValue(row4
, cf1
, col
, ts
+ 3,
5324 KeyValue
.Type
.Put
, null);
5325 KeyValue kv5_2_1
= new KeyValue(row5
, cf2
, col
, ts
, KeyValue
.Type
.Put
,
5327 KeyValue kv5_2_2
= new KeyValue(row5
, cf3
, col
, ts
, KeyValue
.Type
.Put
,
5329 // hfiles(cf1/cf2) :"row1"(1 kv) / "row2"(1 kv) / "row4"(2 kv)
5331 put
= new Put(row1
);
5334 put
= new Put(row2
);
5337 put
= new Put(row4
);
5342 // hfiles(cf1/cf3) : "row1" (1 kvs) / "row2" (1 kv) / "row4" (2 kv)
5343 put
= new Put(row4
);
5347 put
= new Put(row1
);
5350 put
= new Put(row2
);
5354 // hfiles(cf1/cf3) : "row2"(2 kv) / "row3"(1 kvs) / "row4" (1 kv)
5355 put
= new Put(row4
);
5358 put
= new Put(row2
);
5362 put
= new Put(row3
);
5366 // memstore(cf1/cf2/cf3) : "row0" (1 kvs) / "row3" ( 1 kv) / "row5" (max)
5368 put
= new Put(row0
);
5371 put
= new Put(row3
);
5374 put
= new Put(row5
);
5378 // scan range = ["row4", min), skip the max "row5"
5379 Scan scan
= new Scan(row4
);
5380 scan
.setMaxVersions(5);
5382 scan
.setReversed(true);
5383 InternalScanner scanner
= region
.getScanner(scan
);
5384 List
<Cell
> currRow
= new ArrayList
<>();
5385 boolean hasNext
= false;
5386 // 1. scan out "row4" (5 kvs), "row5" can't be scanned out since not
5387 // included in scan range
5388 // "row4" takes 2 next() calls since batch=3
5389 hasNext
= scanner
.next(currRow
);
5390 assertEquals(3, currRow
.size());
5391 assertTrue(Bytes
.equals(currRow
.get(0).getRowArray(), currRow
.get(0).getRowOffset(), currRow
5392 .get(0).getRowLength(), row4
, 0, row4
.length
));
5393 assertTrue(hasNext
);
5395 hasNext
= scanner
.next(currRow
);
5396 assertEquals(2, currRow
.size());
5397 assertTrue(Bytes
.equals(currRow
.get(0).getRowArray(), currRow
.get(0).getRowOffset(),
5398 currRow
.get(0).getRowLength(), row4
, 0,
5400 assertTrue(hasNext
);
5401 // 2. scan out "row3" (2 kv)
5403 hasNext
= scanner
.next(currRow
);
5404 assertEquals(2, currRow
.size());
5405 assertTrue(Bytes
.equals(currRow
.get(0).getRowArray(), currRow
.get(0).getRowOffset(), currRow
5406 .get(0).getRowLength(), row3
, 0, row3
.length
));
5407 assertTrue(hasNext
);
5408 // 3. scan out "row2" (4 kvs)
5409 // "row2" takes 2 next() calls since batch=3
5411 hasNext
= scanner
.next(currRow
);
5412 assertEquals(3, currRow
.size());
5413 assertTrue(Bytes
.equals(currRow
.get(0).getRowArray(), currRow
.get(0).getRowOffset(), currRow
5414 .get(0).getRowLength(), row2
, 0, row2
.length
));
5415 assertTrue(hasNext
);
5417 hasNext
= scanner
.next(currRow
);
5418 assertEquals(1, currRow
.size());
5419 assertTrue(Bytes
.equals(currRow
.get(0).getRowArray(), currRow
.get(0).getRowOffset(), currRow
5420 .get(0).getRowLength(), row2
, 0, row2
.length
));
5421 assertTrue(hasNext
);
5422 // 4. scan out "row1" (2 kv)
5424 hasNext
= scanner
.next(currRow
);
5425 assertEquals(2, currRow
.size());
5426 assertTrue(Bytes
.equals(currRow
.get(0).getRowArray(), currRow
.get(0).getRowOffset(), currRow
5427 .get(0).getRowLength(), row1
, 0, row1
.length
));
5428 assertTrue(hasNext
);
5429 // 5. scan out "row0" (1 kv)
5431 hasNext
= scanner
.next(currRow
);
5432 assertEquals(1, currRow
.size());
5433 assertTrue(Bytes
.equals(currRow
.get(0).getRowArray(), currRow
.get(0).getRowOffset(), currRow
5434 .get(0).getRowLength(), row0
, 0, row0
.length
));
5435 assertFalse(hasNext
);
5441 public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs2()
5442 throws IOException
{
5443 byte[] row1
= Bytes
.toBytes("row1");
5444 byte[] row2
= Bytes
.toBytes("row2");
5445 byte[] row3
= Bytes
.toBytes("row3");
5446 byte[] row4
= Bytes
.toBytes("row4");
5447 byte[] cf1
= Bytes
.toBytes("CF1");
5448 byte[] cf2
= Bytes
.toBytes("CF2");
5449 byte[] cf3
= Bytes
.toBytes("CF3");
5450 byte[] cf4
= Bytes
.toBytes("CF4");
5451 byte[][] families
= { cf1
, cf2
, cf3
, cf4
};
5452 byte[] col
= Bytes
.toBytes("C");
5454 HBaseConfiguration conf
= new HBaseConfiguration();
5455 // disable compactions in this test.
5456 conf
.setInt("hbase.hstore.compactionThreshold", 10000);
5457 this.region
= initHRegion(tableName
, method
, conf
, families
);
5458 KeyValue kv1
= new KeyValue(row1
, cf1
, col
, ts
, KeyValue
.Type
.Put
, null);
5459 KeyValue kv2
= new KeyValue(row2
, cf2
, col
, ts
, KeyValue
.Type
.Put
, null);
5460 KeyValue kv3
= new KeyValue(row3
, cf3
, col
, ts
, KeyValue
.Type
.Put
, null);
5461 KeyValue kv4
= new KeyValue(row4
, cf4
, col
, ts
, KeyValue
.Type
.Put
, null);
5463 Put put
= new Put(row1
);
5468 put
= new Put(row2
);
5473 put
= new Put(row3
);
5478 put
= new Put(row4
);
5481 // scan range = ["row4", min)
5482 Scan scan
= new Scan(row4
);
5483 scan
.setReversed(true);
5485 InternalScanner scanner
= region
.getScanner(scan
);
5486 List
<Cell
> currRow
= new ArrayList
<>();
5487 boolean hasNext
= scanner
.next(currRow
);
5488 assertEquals(1, currRow
.size());
5489 assertTrue(Bytes
.equals(currRow
.get(0).getRowArray(), currRow
.get(0).getRowOffset(), currRow
5490 .get(0).getRowLength(), row4
, 0, row4
.length
));
5491 assertTrue(hasNext
);
5493 hasNext
= scanner
.next(currRow
);
5494 assertEquals(1, currRow
.size());
5495 assertTrue(Bytes
.equals(currRow
.get(0).getRowArray(), currRow
.get(0).getRowOffset(), currRow
5496 .get(0).getRowLength(), row3
, 0, row3
.length
));
5497 assertTrue(hasNext
);
5499 hasNext
= scanner
.next(currRow
);
5500 assertEquals(1, currRow
.size());
5501 assertTrue(Bytes
.equals(currRow
.get(0).getRowArray(), currRow
.get(0).getRowOffset(), currRow
5502 .get(0).getRowLength(), row2
, 0, row2
.length
));
5503 assertTrue(hasNext
);
5505 hasNext
= scanner
.next(currRow
);
5506 assertEquals(1, currRow
.size());
5507 assertTrue(Bytes
.equals(currRow
.get(0).getRowArray(), currRow
.get(0).getRowOffset(), currRow
5508 .get(0).getRowLength(), row1
, 0, row1
.length
));
5509 assertFalse(hasNext
);
5513 * Test for HBASE-14497: Reverse Scan threw StackOverflow caused by readPt checking
5516 public void testReverseScanner_StackOverflow() throws IOException
{
5517 byte[] cf1
= Bytes
.toBytes("CF1");
5518 byte[][] families
= {cf1
};
5519 byte[] col
= Bytes
.toBytes("C");
5520 HBaseConfiguration conf
= new HBaseConfiguration();
5521 this.region
= initHRegion(tableName
, method
, conf
, families
);
5522 // setup with one storefile and one memstore, to create scanner and get an earlier readPt
5523 Put put
= new Put(Bytes
.toBytes("19998"));
5524 put
.addColumn(cf1
, col
, Bytes
.toBytes("val"));
5526 region
.flushcache(true, true, FlushLifeCycleTracker
.DUMMY
);
5527 Put put2
= new Put(Bytes
.toBytes("19997"));
5528 put2
.addColumn(cf1
, col
, Bytes
.toBytes("val"));
5531 Scan scan
= new Scan(Bytes
.toBytes("19998"));
5532 scan
.setReversed(true);
5533 InternalScanner scanner
= region
.getScanner(scan
);
5535 // create one storefile contains many rows will be skipped
5536 // to check StoreFileScanner.seekToPreviousRow
5537 for (int i
= 10000; i
< 20000; i
++) {
5538 Put p
= new Put(Bytes
.toBytes(""+i
));
5539 p
.addColumn(cf1
, col
, Bytes
.toBytes("" + i
));
5542 region
.flushcache(true, true, FlushLifeCycleTracker
.DUMMY
);
5544 // create one memstore contains many rows will be skipped
5545 // to check MemStoreScanner.seekToPreviousRow
5546 for (int i
= 10000; i
< 20000; i
++) {
5547 Put p
= new Put(Bytes
.toBytes(""+i
));
5548 p
.addColumn(cf1
, col
, Bytes
.toBytes("" + i
));
5552 List
<Cell
> currRow
= new ArrayList
<>();
5555 hasNext
= scanner
.next(currRow
);
5557 assertEquals(2, currRow
.size());
5558 assertEquals("19998", Bytes
.toString(currRow
.get(0).getRowArray(),
5559 currRow
.get(0).getRowOffset(), currRow
.get(0).getRowLength()));
5560 assertEquals("19997", Bytes
.toString(currRow
.get(1).getRowArray(),
5561 currRow
.get(1).getRowOffset(), currRow
.get(1).getRowLength()));
5565 public void testReverseScanShouldNotScanMemstoreIfReadPtLesser() throws Exception
{
5566 byte[] cf1
= Bytes
.toBytes("CF1");
5567 byte[][] families
= { cf1
};
5568 byte[] col
= Bytes
.toBytes("C");
5569 HBaseConfiguration conf
= new HBaseConfiguration();
5570 this.region
= initHRegion(tableName
, method
, conf
, families
);
5571 // setup with one storefile and one memstore, to create scanner and get an earlier readPt
5572 Put put
= new Put(Bytes
.toBytes("19996"));
5573 put
.addColumn(cf1
, col
, Bytes
.toBytes("val"));
5575 Put put2
= new Put(Bytes
.toBytes("19995"));
5576 put2
.addColumn(cf1
, col
, Bytes
.toBytes("val"));
5578 // create a reverse scan
5579 Scan scan
= new Scan(Bytes
.toBytes("19996"));
5580 scan
.setReversed(true);
5581 RegionScannerImpl scanner
= region
.getScanner(scan
);
5583 // flush the cache. This will reset the store scanner
5584 region
.flushcache(true, true, FlushLifeCycleTracker
.DUMMY
);
5586 // create one memstore contains many rows will be skipped
5587 // to check MemStoreScanner.seekToPreviousRow
5588 for (int i
= 10000; i
< 20000; i
++) {
5589 Put p
= new Put(Bytes
.toBytes("" + i
));
5590 p
.addColumn(cf1
, col
, Bytes
.toBytes("" + i
));
5593 List
<Cell
> currRow
= new ArrayList
<>();
5595 boolean assertDone
= false;
5597 hasNext
= scanner
.next(currRow
);
5598 // With HBASE-15871, after the scanner is reset the memstore scanner should not be
5601 StoreScanner current
=
5602 (StoreScanner
) (scanner
.storeHeap
).getCurrentForTesting();
5603 List
<KeyValueScanner
> scanners
= current
.getAllScannersForTesting();
5604 assertEquals("There should be only one scanner the store file scanner", 1,
5609 assertEquals(2, currRow
.size());
5610 assertEquals("19996", Bytes
.toString(currRow
.get(0).getRowArray(),
5611 currRow
.get(0).getRowOffset(), currRow
.get(0).getRowLength()));
5612 assertEquals("19995", Bytes
.toString(currRow
.get(1).getRowArray(),
5613 currRow
.get(1).getRowOffset(), currRow
.get(1).getRowLength()));
5617 public void testReverseScanWhenPutCellsAfterOpenReverseScan() throws Exception
{
5618 byte[] cf1
= Bytes
.toBytes("CF1");
5619 byte[][] families
= { cf1
};
5620 byte[] col
= Bytes
.toBytes("C");
5622 HBaseConfiguration conf
= new HBaseConfiguration();
5623 this.region
= initHRegion(tableName
, method
, conf
, families
);
5625 Put put
= new Put(Bytes
.toBytes("199996"));
5626 put
.addColumn(cf1
, col
, Bytes
.toBytes("val"));
5628 Put put2
= new Put(Bytes
.toBytes("199995"));
5629 put2
.addColumn(cf1
, col
, Bytes
.toBytes("val"));
5632 // Create a reverse scan
5633 Scan scan
= new Scan(Bytes
.toBytes("199996"));
5634 scan
.setReversed(true);
5635 RegionScannerImpl scanner
= region
.getScanner(scan
);
5637 // Put a lot of cells that have sequenceIDs grater than the readPt of the reverse scan
5638 for (int i
= 100000; i
< 200000; i
++) {
5639 Put p
= new Put(Bytes
.toBytes("" + i
));
5640 p
.addColumn(cf1
, col
, Bytes
.toBytes("" + i
));
5643 List
<Cell
> currRow
= new ArrayList
<>();
5646 hasNext
= scanner
.next(currRow
);
5649 assertEquals(2, currRow
.size());
5650 assertEquals("199996", Bytes
.toString(currRow
.get(0).getRowArray(),
5651 currRow
.get(0).getRowOffset(), currRow
.get(0).getRowLength()));
5652 assertEquals("199995", Bytes
.toString(currRow
.get(1).getRowArray(),
5653 currRow
.get(1).getRowOffset(), currRow
.get(1).getRowLength()));
5657 public void testWriteRequestsCounter() throws IOException
{
5658 byte[] fam
= Bytes
.toBytes("info");
5659 byte[][] families
= { fam
};
5660 this.region
= initHRegion(tableName
, method
, CONF
, families
);
5662 Assert
.assertEquals(0L, region
.getWriteRequestsCount());
5664 Put put
= new Put(row
);
5665 put
.addColumn(fam
, fam
, fam
);
5667 Assert
.assertEquals(0L, region
.getWriteRequestsCount());
5669 Assert
.assertEquals(1L, region
.getWriteRequestsCount());
5671 Assert
.assertEquals(2L, region
.getWriteRequestsCount());
5673 Assert
.assertEquals(3L, region
.getWriteRequestsCount());
5675 region
.delete(new Delete(row
));
5676 Assert
.assertEquals(4L, region
.getWriteRequestsCount());
5680 public void testOpenRegionWrittenToWAL() throws Exception
{
5681 final ServerName serverName
= ServerName
.valueOf(name
.getMethodName(), 100, 42);
5682 final RegionServerServices rss
= spy(TEST_UTIL
.createMockRegionServerService(serverName
));
5684 TableDescriptor htd
= TableDescriptorBuilder
.newBuilder(TableName
.valueOf(name
.getMethodName()))
5685 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(fam1
))
5686 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(fam2
)).build();
5687 RegionInfo hri
= RegionInfoBuilder
.newBuilder(htd
.getTableName()).build();
5689 // open the region w/o rss and wal and flush some files
5691 HBaseTestingUtility
.createRegionAndWAL(hri
, TEST_UTIL
.getDataTestDir(), TEST_UTIL
5692 .getConfiguration(), htd
);
5693 assertNotNull(region
);
5695 // create a file in fam1 for the region before opening in OpenRegionHandler
5696 region
.put(new Put(Bytes
.toBytes("a")).addColumn(fam1
, fam1
, fam1
));
5698 HBaseTestingUtility
.closeRegionAndWAL(region
);
5700 ArgumentCaptor
<WALEdit
> editCaptor
= ArgumentCaptor
.forClass(WALEdit
.class);
5702 // capture append() calls
5703 WAL wal
= mockWAL();
5704 when(rss
.getWAL(any(RegionInfo
.class))).thenReturn(wal
);
5706 region
= HRegion
.openHRegion(hri
, htd
, rss
.getWAL(hri
),
5707 TEST_UTIL
.getConfiguration(), rss
, null);
5709 verify(wal
, times(1)).appendMarker(any(RegionInfo
.class), any(WALKeyImpl
.class),
5710 editCaptor
.capture());
5712 WALEdit edit
= editCaptor
.getValue();
5713 assertNotNull(edit
);
5714 assertNotNull(edit
.getCells());
5715 assertEquals(1, edit
.getCells().size());
5716 RegionEventDescriptor desc
= WALEdit
.getRegionEventDescriptor(edit
.getCells().get(0));
5717 assertNotNull(desc
);
5719 LOG
.info("RegionEventDescriptor from WAL: " + desc
);
5721 assertEquals(RegionEventDescriptor
.EventType
.REGION_OPEN
, desc
.getEventType());
5722 assertTrue(Bytes
.equals(desc
.getTableName().toByteArray(), htd
.getTableName().toBytes()));
5723 assertTrue(Bytes
.equals(desc
.getEncodedRegionName().toByteArray(),
5724 hri
.getEncodedNameAsBytes()));
5725 assertTrue(desc
.getLogSequenceNumber() > 0);
5726 assertEquals(serverName
, ProtobufUtil
.toServerName(desc
.getServer()));
5727 assertEquals(2, desc
.getStoresCount());
5729 StoreDescriptor store
= desc
.getStores(0);
5730 assertTrue(Bytes
.equals(store
.getFamilyName().toByteArray(), fam1
));
5731 assertEquals(store
.getStoreHomeDir(), Bytes
.toString(fam1
));
5732 assertEquals(1, store
.getStoreFileCount()); // 1store file
5733 assertFalse(store
.getStoreFile(0).contains("/")); // ensure path is relative
5735 store
= desc
.getStores(1);
5736 assertTrue(Bytes
.equals(store
.getFamilyName().toByteArray(), fam2
));
5737 assertEquals(store
.getStoreHomeDir(), Bytes
.toString(fam2
));
5738 assertEquals(0, store
.getStoreFileCount()); // no store files
5741 // Helper for test testOpenRegionWrittenToWALForLogReplay
5742 static class HRegionWithSeqId
extends HRegion
{
5743 public HRegionWithSeqId(final Path tableDir
, final WAL wal
, final FileSystem fs
,
5744 final Configuration confParam
, final RegionInfo regionInfo
,
5745 final TableDescriptor htd
, final RegionServerServices rsServices
) {
5746 super(tableDir
, wal
, fs
, confParam
, regionInfo
, htd
, rsServices
);
5749 protected long getNextSequenceId(WAL wal
) throws IOException
{
5755 public void testFlushedFileWithNoTags() throws Exception
{
5756 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
5757 HTableDescriptor htd
= new HTableDescriptor(tableName
);
5758 htd
.addFamily(new HColumnDescriptor(fam1
));
5759 HRegionInfo info
= new HRegionInfo(tableName
, null, null, false);
5760 Path path
= TEST_UTIL
.getDataTestDir(getClass().getSimpleName());
5761 region
= HBaseTestingUtility
.createRegionAndWAL(info
, path
, TEST_UTIL
.getConfiguration(), htd
);
5762 Put put
= new Put(Bytes
.toBytes("a-b-0-0"));
5763 put
.addColumn(fam1
, qual1
, Bytes
.toBytes("c1-value"));
5766 HStore store
= region
.getStore(fam1
);
5767 Collection
<HStoreFile
> storefiles
= store
.getStorefiles();
5768 for (HStoreFile sf
: storefiles
) {
5769 assertFalse("Tags should not be present "
5770 ,sf
.getReader().getHFileReader().getFileContext().isIncludesTags());
5775 * Utility method to setup a WAL mock.
5777 * Needs to do the bit where we close latch on the WALKeyImpl on append else test hangs.
5778 * @return a mock WAL
5780 private WAL
mockWAL() throws IOException
{
5781 WAL wal
= mock(WAL
.class);
5782 when(wal
.appendData(any(RegionInfo
.class), any(WALKeyImpl
.class), any(WALEdit
.class)))
5783 .thenAnswer(new Answer
<Long
>() {
5785 public Long
answer(InvocationOnMock invocation
) throws Throwable
{
5786 WALKeyImpl key
= invocation
.getArgument(1);
5787 MultiVersionConcurrencyControl
.WriteEntry we
= key
.getMvcc().begin();
5788 key
.setWriteEntry(we
);
5792 when(wal
.appendMarker(any(RegionInfo
.class), any(WALKeyImpl
.class), any(WALEdit
.class))).
5793 thenAnswer(new Answer
<Long
>() {
5795 public Long
answer(InvocationOnMock invocation
) throws Throwable
{
5796 WALKeyImpl key
= invocation
.getArgument(1);
5797 MultiVersionConcurrencyControl
.WriteEntry we
= key
.getMvcc().begin();
5798 key
.setWriteEntry(we
);
5806 public void testCloseRegionWrittenToWAL() throws Exception
{
5807 Path rootDir
= new Path(dir
+ name
.getMethodName());
5808 FSUtils
.setRootDir(TEST_UTIL
.getConfiguration(), rootDir
);
5810 final ServerName serverName
= ServerName
.valueOf("testCloseRegionWrittenToWAL", 100, 42);
5811 final RegionServerServices rss
= spy(TEST_UTIL
.createMockRegionServerService(serverName
));
5813 TableDescriptor htd
= TableDescriptorBuilder
.newBuilder(TableName
.valueOf(name
.getMethodName()))
5814 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(fam1
))
5815 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(fam2
)).build();
5816 RegionInfo hri
= RegionInfoBuilder
.newBuilder(htd
.getTableName()).build();
5818 ArgumentCaptor
<WALEdit
> editCaptor
= ArgumentCaptor
.forClass(WALEdit
.class);
5820 // capture append() calls
5821 WAL wal
= mockWAL();
5822 when(rss
.getWAL(any(RegionInfo
.class))).thenReturn(wal
);
5825 // create and then open a region first so that it can be closed later
5826 region
= HRegion
.createHRegion(hri
, rootDir
, TEST_UTIL
.getConfiguration(), htd
, rss
.getWAL(hri
));
5827 region
= HRegion
.openHRegion(hri
, htd
, rss
.getWAL(hri
),
5828 TEST_UTIL
.getConfiguration(), rss
, null);
5831 region
.close(false);
5833 // 2 times, one for region open, the other close region
5834 verify(wal
, times(2)).appendMarker(any(RegionInfo
.class),
5835 (WALKeyImpl
) any(WALKeyImpl
.class), editCaptor
.capture());
5837 WALEdit edit
= editCaptor
.getAllValues().get(1);
5838 assertNotNull(edit
);
5839 assertNotNull(edit
.getCells());
5840 assertEquals(1, edit
.getCells().size());
5841 RegionEventDescriptor desc
= WALEdit
.getRegionEventDescriptor(edit
.getCells().get(0));
5842 assertNotNull(desc
);
5844 LOG
.info("RegionEventDescriptor from WAL: " + desc
);
5846 assertEquals(RegionEventDescriptor
.EventType
.REGION_CLOSE
, desc
.getEventType());
5847 assertTrue(Bytes
.equals(desc
.getTableName().toByteArray(), htd
.getTableName().toBytes()));
5848 assertTrue(Bytes
.equals(desc
.getEncodedRegionName().toByteArray(),
5849 hri
.getEncodedNameAsBytes()));
5850 assertTrue(desc
.getLogSequenceNumber() > 0);
5851 assertEquals(serverName
, ProtobufUtil
.toServerName(desc
.getServer()));
5852 assertEquals(2, desc
.getStoresCount());
5854 StoreDescriptor store
= desc
.getStores(0);
5855 assertTrue(Bytes
.equals(store
.getFamilyName().toByteArray(), fam1
));
5856 assertEquals(store
.getStoreHomeDir(), Bytes
.toString(fam1
));
5857 assertEquals(0, store
.getStoreFileCount()); // no store files
5859 store
= desc
.getStores(1);
5860 assertTrue(Bytes
.equals(store
.getFamilyName().toByteArray(), fam2
));
5861 assertEquals(store
.getStoreHomeDir(), Bytes
.toString(fam2
));
5862 assertEquals(0, store
.getStoreFileCount()); // no store files
5866 * Test RegionTooBusyException thrown when region is busy
5869 public void testRegionTooBusy() throws IOException
{
5870 byte[] family
= Bytes
.toBytes("family");
5871 long defaultBusyWaitDuration
= CONF
.getLong("hbase.busy.wait.duration",
5872 HRegion
.DEFAULT_BUSY_WAIT_DURATION
);
5873 CONF
.setLong("hbase.busy.wait.duration", 1000);
5874 region
= initHRegion(tableName
, method
, CONF
, family
);
5875 final AtomicBoolean stopped
= new AtomicBoolean(true);
5876 Thread t
= new Thread(new Runnable() {
5880 region
.lock
.writeLock().lock();
5882 while (!stopped
.get()) {
5885 } catch (InterruptedException ie
) {
5887 region
.lock
.writeLock().unlock();
5892 Get get
= new Get(row
);
5894 while (stopped
.get()) {
5898 fail("Should throw RegionTooBusyException");
5899 } catch (InterruptedException ie
) {
5900 fail("test interrupted");
5901 } catch (RegionTooBusyException e
) {
5907 } catch (Throwable e
) {
5910 HBaseTestingUtility
.closeRegionAndWAL(region
);
5912 CONF
.setLong("hbase.busy.wait.duration", defaultBusyWaitDuration
);
5917 public void testCellTTLs() throws IOException
{
5918 IncrementingEnvironmentEdge edge
= new IncrementingEnvironmentEdge();
5919 EnvironmentEdgeManager
.injectEdge(edge
);
5921 final byte[] row
= Bytes
.toBytes("testRow");
5922 final byte[] q1
= Bytes
.toBytes("q1");
5923 final byte[] q2
= Bytes
.toBytes("q2");
5924 final byte[] q3
= Bytes
.toBytes("q3");
5925 final byte[] q4
= Bytes
.toBytes("q4");
5927 HTableDescriptor htd
= new HTableDescriptor(TableName
.valueOf(name
.getMethodName()));
5928 HColumnDescriptor hcd
= new HColumnDescriptor(fam1
);
5929 hcd
.setTimeToLive(10); // 10 seconds
5932 Configuration conf
= new Configuration(TEST_UTIL
.getConfiguration());
5933 conf
.setInt(HFile
.FORMAT_VERSION_KEY
, HFile
.MIN_FORMAT_VERSION_WITH_TAGS
);
5935 region
= HBaseTestingUtility
.createRegionAndWAL(new HRegionInfo(htd
.getTableName(),
5936 HConstants
.EMPTY_BYTE_ARRAY
, HConstants
.EMPTY_BYTE_ARRAY
),
5937 TEST_UTIL
.getDataTestDir(), conf
, htd
);
5938 assertNotNull(region
);
5939 long now
= EnvironmentEdgeManager
.currentTime();
5940 // Add a cell that will expire in 5 seconds via cell TTL
5941 region
.put(new Put(row
).add(new KeyValue(row
, fam1
, q1
, now
,
5942 HConstants
.EMPTY_BYTE_ARRAY
, new ArrayBackedTag
[] {
5943 // TTL tags specify ts in milliseconds
5944 new ArrayBackedTag(TagType
.TTL_TAG_TYPE
, Bytes
.toBytes(5000L)) })));
5945 // Add a cell that will expire after 10 seconds via family setting
5946 region
.put(new Put(row
).addColumn(fam1
, q2
, now
, HConstants
.EMPTY_BYTE_ARRAY
));
5947 // Add a cell that will expire in 15 seconds via cell TTL
5948 region
.put(new Put(row
).add(new KeyValue(row
, fam1
, q3
, now
+ 10000 - 1,
5949 HConstants
.EMPTY_BYTE_ARRAY
, new ArrayBackedTag
[] {
5950 // TTL tags specify ts in milliseconds
5951 new ArrayBackedTag(TagType
.TTL_TAG_TYPE
, Bytes
.toBytes(5000L)) })));
5952 // Add a cell that will expire in 20 seconds via family setting
5953 region
.put(new Put(row
).addColumn(fam1
, q4
, now
+ 10000 - 1, HConstants
.EMPTY_BYTE_ARRAY
));
5955 // Flush so we are sure store scanning gets this right
5958 // A query at time T+0 should return all cells
5959 Result r
= region
.get(new Get(row
));
5960 assertNotNull(r
.getValue(fam1
, q1
));
5961 assertNotNull(r
.getValue(fam1
, q2
));
5962 assertNotNull(r
.getValue(fam1
, q3
));
5963 assertNotNull(r
.getValue(fam1
, q4
));
5965 // Increment time to T+5 seconds
5966 edge
.incrementTime(5000);
5968 r
= region
.get(new Get(row
));
5969 assertNull(r
.getValue(fam1
, q1
));
5970 assertNotNull(r
.getValue(fam1
, q2
));
5971 assertNotNull(r
.getValue(fam1
, q3
));
5972 assertNotNull(r
.getValue(fam1
, q4
));
5974 // Increment time to T+10 seconds
5975 edge
.incrementTime(5000);
5977 r
= region
.get(new Get(row
));
5978 assertNull(r
.getValue(fam1
, q1
));
5979 assertNull(r
.getValue(fam1
, q2
));
5980 assertNotNull(r
.getValue(fam1
, q3
));
5981 assertNotNull(r
.getValue(fam1
, q4
));
5983 // Increment time to T+15 seconds
5984 edge
.incrementTime(5000);
5986 r
= region
.get(new Get(row
));
5987 assertNull(r
.getValue(fam1
, q1
));
5988 assertNull(r
.getValue(fam1
, q2
));
5989 assertNull(r
.getValue(fam1
, q3
));
5990 assertNotNull(r
.getValue(fam1
, q4
));
5992 // Increment time to T+20 seconds
5993 edge
.incrementTime(10000);
5995 r
= region
.get(new Get(row
));
5996 assertNull(r
.getValue(fam1
, q1
));
5997 assertNull(r
.getValue(fam1
, q2
));
5998 assertNull(r
.getValue(fam1
, q3
));
5999 assertNull(r
.getValue(fam1
, q4
));
6001 // Fun with disappearing increments
6004 region
.put(new Put(row
).addColumn(fam1
, q1
, Bytes
.toBytes(1L)));
6005 r
= region
.get(new Get(row
));
6006 byte[] val
= r
.getValue(fam1
, q1
);
6008 assertEquals(1L, Bytes
.toLong(val
));
6010 // Increment with a TTL of 5 seconds
6011 Increment incr
= new Increment(row
).addColumn(fam1
, q1
, 1L);
6013 region
.increment(incr
); // 2
6015 // New value should be 2
6016 r
= region
.get(new Get(row
));
6017 val
= r
.getValue(fam1
, q1
);
6019 assertEquals(2L, Bytes
.toLong(val
));
6021 // Increment time to T+25 seconds
6022 edge
.incrementTime(5000);
6024 // Value should be back to 1
6025 r
= region
.get(new Get(row
));
6026 val
= r
.getValue(fam1
, q1
);
6028 assertEquals(1L, Bytes
.toLong(val
));
6030 // Increment time to T+30 seconds
6031 edge
.incrementTime(5000);
6033 // Original value written at T+20 should be gone now via family TTL
6034 r
= region
.get(new Get(row
));
6035 assertNull(r
.getValue(fam1
, q1
));
6039 public void testIncrementTimestampsAreMonotonic() throws IOException
{
6040 region
= initHRegion(tableName
, method
, CONF
, fam1
);
6041 ManualEnvironmentEdge edge
= new ManualEnvironmentEdge();
6042 EnvironmentEdgeManager
.injectEdge(edge
);
6045 Increment inc
= new Increment(row
);
6046 inc
.setDurability(Durability
.SKIP_WAL
);
6047 inc
.addColumn(fam1
, qual1
, 1L);
6048 region
.increment(inc
);
6050 Result result
= region
.get(new Get(row
));
6051 Cell c
= result
.getColumnLatestCell(fam1
, qual1
);
6053 assertEquals(10L, c
.getTimestamp());
6055 edge
.setValue(1); // clock goes back
6056 region
.increment(inc
);
6057 result
= region
.get(new Get(row
));
6058 c
= result
.getColumnLatestCell(fam1
, qual1
);
6059 assertEquals(11L, c
.getTimestamp());
6060 assertEquals(2L, Bytes
.toLong(c
.getValueArray(), c
.getValueOffset(), c
.getValueLength()));
6064 public void testAppendTimestampsAreMonotonic() throws IOException
{
6065 region
= initHRegion(tableName
, method
, CONF
, fam1
);
6066 ManualEnvironmentEdge edge
= new ManualEnvironmentEdge();
6067 EnvironmentEdgeManager
.injectEdge(edge
);
6070 Append a
= new Append(row
);
6071 a
.setDurability(Durability
.SKIP_WAL
);
6072 a
.addColumn(fam1
, qual1
, qual1
);
6075 Result result
= region
.get(new Get(row
));
6076 Cell c
= result
.getColumnLatestCell(fam1
, qual1
);
6078 assertEquals(10L, c
.getTimestamp());
6080 edge
.setValue(1); // clock goes back
6082 result
= region
.get(new Get(row
));
6083 c
= result
.getColumnLatestCell(fam1
, qual1
);
6084 assertEquals(11L, c
.getTimestamp());
6086 byte[] expected
= new byte[qual1
.length
*2];
6087 System
.arraycopy(qual1
, 0, expected
, 0, qual1
.length
);
6088 System
.arraycopy(qual1
, 0, expected
, qual1
.length
, qual1
.length
);
6090 assertTrue(Bytes
.equals(c
.getValueArray(), c
.getValueOffset(), c
.getValueLength(),
6091 expected
, 0, expected
.length
));
6095 public void testCheckAndMutateTimestampsAreMonotonic() throws IOException
{
6096 region
= initHRegion(tableName
, method
, CONF
, fam1
);
6097 ManualEnvironmentEdge edge
= new ManualEnvironmentEdge();
6098 EnvironmentEdgeManager
.injectEdge(edge
);
6101 Put p
= new Put(row
);
6102 p
.setDurability(Durability
.SKIP_WAL
);
6103 p
.addColumn(fam1
, qual1
, qual1
);
6106 Result result
= region
.get(new Get(row
));
6107 Cell c
= result
.getColumnLatestCell(fam1
, qual1
);
6109 assertEquals(10L, c
.getTimestamp());
6111 edge
.setValue(1); // clock goes back
6113 p
.setDurability(Durability
.SKIP_WAL
);
6114 p
.addColumn(fam1
, qual1
, qual2
);
6115 region
.checkAndMutate(row
, fam1
, qual1
, CompareOperator
.EQUAL
, new BinaryComparator(qual1
), p
);
6116 result
= region
.get(new Get(row
));
6117 c
= result
.getColumnLatestCell(fam1
, qual1
);
6118 assertEquals(10L, c
.getTimestamp());
6120 assertTrue(Bytes
.equals(c
.getValueArray(), c
.getValueOffset(), c
.getValueLength(),
6121 qual2
, 0, qual2
.length
));
6125 public void testBatchMutateWithWrongRegionException() throws Exception
{
6126 final byte[] a
= Bytes
.toBytes("a");
6127 final byte[] b
= Bytes
.toBytes("b");
6128 final byte[] c
= Bytes
.toBytes("c"); // exclusive
6130 int prevLockTimeout
= CONF
.getInt("hbase.rowlock.wait.duration", 30000);
6131 CONF
.setInt("hbase.rowlock.wait.duration", 1000);
6132 region
= initHRegion(tableName
, a
, c
, method
, CONF
, false, fam1
);
6134 Mutation
[] mutations
= new Mutation
[] {
6136 .add(CellBuilderFactory
.create(CellBuilderType
.SHALLOW_COPY
)
6139 .setTimestamp(HConstants
.LATEST_TIMESTAMP
)
6140 .setType(Cell
.Type
.Put
)
6142 // this is outside the region boundary
6143 new Put(c
).add(CellBuilderFactory
.create(CellBuilderType
.SHALLOW_COPY
)
6146 .setTimestamp(HConstants
.LATEST_TIMESTAMP
)
6149 new Put(b
).add(CellBuilderFactory
.create(CellBuilderType
.SHALLOW_COPY
)
6152 .setTimestamp(HConstants
.LATEST_TIMESTAMP
)
6153 .setType(Cell
.Type
.Put
)
6157 OperationStatus
[] status
= region
.batchMutate(mutations
);
6158 assertEquals(OperationStatusCode
.SUCCESS
, status
[0].getOperationStatusCode());
6159 assertEquals(OperationStatusCode
.SANITY_CHECK_FAILURE
, status
[1].getOperationStatusCode());
6160 assertEquals(OperationStatusCode
.SUCCESS
, status
[2].getOperationStatusCode());
6163 // test with a row lock held for a long time
6164 final CountDownLatch obtainedRowLock
= new CountDownLatch(1);
6165 ExecutorService exec
= Executors
.newFixedThreadPool(2);
6166 Future
<Void
> f1
= exec
.submit(new Callable
<Void
>() {
6168 public Void
call() throws Exception
{
6169 LOG
.info("Acquiring row lock");
6170 RowLock rl
= region
.getRowLock(b
);
6171 obtainedRowLock
.countDown();
6172 LOG
.info("Waiting for 5 seconds before releasing lock");
6173 Threads
.sleep(5000);
6174 LOG
.info("Releasing row lock");
6179 obtainedRowLock
.await(30, TimeUnit
.SECONDS
);
6181 Future
<Void
> f2
= exec
.submit(new Callable
<Void
>() {
6183 public Void
call() throws Exception
{
6184 Mutation
[] mutations
= new Mutation
[] {
6185 new Put(a
).add(CellBuilderFactory
.create(CellBuilderType
.SHALLOW_COPY
)
6188 .setTimestamp(HConstants
.LATEST_TIMESTAMP
)
6189 .setType(Cell
.Type
.Put
)
6191 new Put(b
).add(CellBuilderFactory
.create(CellBuilderType
.SHALLOW_COPY
)
6194 .setTimestamp(HConstants
.LATEST_TIMESTAMP
)
6195 .setType(Cell
.Type
.Put
)
6199 // this will wait for the row lock, and it will eventually succeed
6200 OperationStatus
[] status
= region
.batchMutate(mutations
);
6201 assertEquals(OperationStatusCode
.SUCCESS
, status
[0].getOperationStatusCode());
6202 assertEquals(OperationStatusCode
.SUCCESS
, status
[1].getOperationStatusCode());
6210 CONF
.setInt("hbase.rowlock.wait.duration", prevLockTimeout
);
6214 public void testCheckAndRowMutateTimestampsAreMonotonic() throws IOException
{
6215 region
= initHRegion(tableName
, method
, CONF
, fam1
);
6216 ManualEnvironmentEdge edge
= new ManualEnvironmentEdge();
6217 EnvironmentEdgeManager
.injectEdge(edge
);
6220 Put p
= new Put(row
);
6221 p
.setDurability(Durability
.SKIP_WAL
);
6222 p
.addColumn(fam1
, qual1
, qual1
);
6225 Result result
= region
.get(new Get(row
));
6226 Cell c
= result
.getColumnLatestCell(fam1
, qual1
);
6228 assertEquals(10L, c
.getTimestamp());
6230 edge
.setValue(1); // clock goes back
6232 p
.setDurability(Durability
.SKIP_WAL
);
6233 p
.addColumn(fam1
, qual1
, qual2
);
6234 RowMutations rm
= new RowMutations(row
);
6236 assertTrue(region
.checkAndRowMutate(row
, fam1
, qual1
, CompareOperator
.EQUAL
,
6237 new BinaryComparator(qual1
), rm
));
6238 result
= region
.get(new Get(row
));
6239 c
= result
.getColumnLatestCell(fam1
, qual1
);
6240 assertEquals(10L, c
.getTimestamp());
6241 LOG
.info("c value " +
6242 Bytes
.toStringBinary(c
.getValueArray(), c
.getValueOffset(), c
.getValueLength()));
6244 assertTrue(Bytes
.equals(c
.getValueArray(), c
.getValueOffset(), c
.getValueLength(),
6245 qual2
, 0, qual2
.length
));
6248 HRegion
initHRegion(TableName tableName
, String callingMethod
,
6249 byte[]... families
) throws IOException
{
6250 return initHRegion(tableName
, callingMethod
, HBaseConfiguration
.create(),
6255 * HBASE-16429 Make sure no stuck if roll writer when ring buffer is filled with appends
6256 * @throws IOException if IO error occurred during test
6259 public void testWritesWhileRollWriter() throws IOException
{
6262 int numFamilies
= 2;
6263 int numQualifiers
= 2;
6264 final byte[][] families
= new byte[numFamilies
][];
6265 for (int i
= 0; i
< numFamilies
; i
++) {
6266 families
[i
] = Bytes
.toBytes("family" + i
);
6268 final byte[][] qualifiers
= new byte[numQualifiers
][];
6269 for (int i
= 0; i
< numQualifiers
; i
++) {
6270 qualifiers
[i
] = Bytes
.toBytes("qual" + i
);
6273 CONF
.setInt("hbase.regionserver.wal.disruptor.event.count", 2);
6274 this.region
= initHRegion(tableName
, method
, CONF
, families
);
6276 List
<Thread
> threads
= new ArrayList
<>();
6277 for (int i
= 0; i
< numRows
; i
++) {
6278 final int count
= i
;
6279 Thread t
= new Thread(new Runnable() {
6283 byte[] row
= Bytes
.toBytes("row" + count
);
6284 Put put
= new Put(row
);
6285 put
.setDurability(Durability
.SYNC_WAL
);
6286 byte[] value
= Bytes
.toBytes(String
.valueOf(count
));
6287 for (byte[] family
: families
) {
6288 for (byte[] qualifier
: qualifiers
) {
6289 put
.addColumn(family
, qualifier
, count
, value
);
6294 } catch (IOException e
) {
6295 throw new RuntimeException(e
);
6301 for (Thread t
: threads
) {
6305 for (int i
= 0; i
< testCount
; i
++) {
6306 region
.getWAL().rollWriter();
6311 HBaseTestingUtility
.closeRegionAndWAL(this.region
);
6312 CONF
.setInt("hbase.regionserver.wal.disruptor.event.count", 16 * 1024);
6313 } catch (DroppedSnapshotException dse
) {
6314 // We could get this on way out because we interrupt the background flusher and it could
6315 // fail anywhere causing a DSE over in the background flusher... only it is not properly
6316 // dealt with so could still be memory hanging out when we get to here -- memory we can't
6317 // flush because the accounting is 'off' since original DSE.
6324 public void testMutateRow_WriteRequestCount() throws Exception
{
6325 byte[] row1
= Bytes
.toBytes("row1");
6326 byte[] fam1
= Bytes
.toBytes("fam1");
6327 byte[] qf1
= Bytes
.toBytes("qualifier");
6328 byte[] val1
= Bytes
.toBytes("value1");
6330 RowMutations rm
= new RowMutations(row1
);
6331 Put put
= new Put(row1
);
6332 put
.addColumn(fam1
, qf1
, val1
);
6335 this.region
= initHRegion(tableName
, method
, CONF
, fam1
);
6336 long wrcBeforeMutate
= this.region
.writeRequestsCount
.longValue();
6337 this.region
.mutateRow(rm
);
6338 long wrcAfterMutate
= this.region
.writeRequestsCount
.longValue();
6339 Assert
.assertEquals(wrcBeforeMutate
+ rm
.getMutations().size(), wrcAfterMutate
);
6343 public void testBulkLoadReplicationEnabled() throws IOException
{
6344 TEST_UTIL
.getConfiguration().setBoolean(HConstants
.REPLICATION_BULKLOAD_ENABLE_KEY
, true);
6345 final ServerName serverName
= ServerName
.valueOf(name
.getMethodName(), 100, 42);
6346 final RegionServerServices rss
= spy(TEST_UTIL
.createMockRegionServerService(serverName
));
6348 HTableDescriptor htd
= new HTableDescriptor(TableName
.valueOf(name
.getMethodName()));
6349 htd
.addFamily(new HColumnDescriptor(fam1
));
6350 HRegionInfo hri
= new HRegionInfo(htd
.getTableName(),
6351 HConstants
.EMPTY_BYTE_ARRAY
, HConstants
.EMPTY_BYTE_ARRAY
);
6352 region
= HRegion
.openHRegion(hri
, htd
, rss
.getWAL(hri
), TEST_UTIL
.getConfiguration(),
6355 assertTrue(region
.conf
.getBoolean(HConstants
.REPLICATION_BULKLOAD_ENABLE_KEY
, false));
6356 String plugins
= region
.conf
.get(CoprocessorHost
.REGION_COPROCESSOR_CONF_KEY
, "");
6357 String replicationCoprocessorClass
= ReplicationObserver
.class.getCanonicalName();
6358 assertTrue(plugins
.contains(replicationCoprocessorClass
));
6359 assertTrue(region
.getCoprocessorHost().
6360 getCoprocessors().contains(ReplicationObserver
.class.getSimpleName()));
6364 * The same as HRegion class, the only difference is that instantiateHStore will
6365 * create a different HStore - HStoreForTesting. [HBASE-8518]
6367 public static class HRegionForTesting
extends HRegion
{
6369 public HRegionForTesting(final Path tableDir
, final WAL wal
, final FileSystem fs
,
6370 final Configuration confParam
, final RegionInfo regionInfo
,
6371 final TableDescriptor htd
, final RegionServerServices rsServices
) {
6372 this(new HRegionFileSystem(confParam
, fs
, tableDir
, regionInfo
),
6373 wal
, confParam
, htd
, rsServices
);
6376 public HRegionForTesting(HRegionFileSystem fs
, WAL wal
,
6377 Configuration confParam
, TableDescriptor htd
,
6378 RegionServerServices rsServices
) {
6379 super(fs
, wal
, confParam
, htd
, rsServices
);
6383 * Create HStore instance.
6384 * @return If Mob is enabled, return HMobStore, otherwise return HStoreForTesting.
6387 protected HStore
instantiateHStore(final ColumnFamilyDescriptor family
, boolean warmup
)
6388 throws IOException
{
6389 if (family
.isMobEnabled()) {
6390 if (HFile
.getFormatVersion(this.conf
) < HFile
.MIN_FORMAT_VERSION_WITH_TAGS
) {
6391 throw new IOException("A minimum HFile version of " + HFile
.MIN_FORMAT_VERSION_WITH_TAGS
+
6392 " is required for MOB feature. Consider setting " + HFile
.FORMAT_VERSION_KEY
+
6395 return new HMobStore(this, family
, this.conf
, warmup
);
6397 return new HStoreForTesting(this, family
, this.conf
, warmup
);
6402 * HStoreForTesting is merely the same as HStore, the difference is in the doCompaction method
6403 * of HStoreForTesting there is a checkpoint "hbase.hstore.compaction.complete" which
6404 * doesn't let hstore compaction complete. In the former edition, this config is set in
6405 * HStore class inside compact method, though this is just for testing, otherwise it
6406 * doesn't do any help. In HBASE-8518, we try to get rid of all "hbase.hstore.compaction.complete"
6407 * config (except for testing code).
6409 public static class HStoreForTesting
extends HStore
{
6411 protected HStoreForTesting(final HRegion region
,
6412 final ColumnFamilyDescriptor family
,
6413 final Configuration confParam
, boolean warmup
) throws IOException
{
6414 super(region
, family
, confParam
, warmup
);
6418 protected List
<HStoreFile
> doCompaction(CompactionRequestImpl cr
,
6419 Collection
<HStoreFile
> filesToCompact
, User user
, long compactionStartTime
,
6420 List
<Path
> newFiles
) throws IOException
{
6421 // let compaction incomplete.
6422 if (!this.conf
.getBoolean("hbase.hstore.compaction.complete", true)) {
6423 LOG
.warn("hbase.hstore.compaction.complete is set to false");
6424 List
<HStoreFile
> sfs
= new ArrayList
<>(newFiles
.size());
6425 final boolean evictOnClose
=
6426 cacheConf
!= null? cacheConf
.shouldEvictOnClose(): true;
6427 for (Path newFile
: newFiles
) {
6428 // Create storefile around what we wrote with a reader on it.
6429 HStoreFile sf
= createStoreFileAndReader(newFile
);
6430 sf
.closeStoreFile(evictOnClose
);
6435 return super.doCompaction(cr
, filesToCompact
, user
, compactionStartTime
, newFiles
);