2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.master
;
20 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.resetCounters
;
21 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_mgr_heartbeat
;
22 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_mgr_node_create_queued
;
23 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_mgr_orphan_task_acquired
;
24 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_mgr_rescan
;
25 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_mgr_rescan_deleted
;
26 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_mgr_resubmit
;
27 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_mgr_resubmit_dead_server_task
;
28 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_mgr_resubmit_failed
;
29 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_mgr_resubmit_force
;
30 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_mgr_resubmit_threshold_reached
;
31 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_mgr_resubmit_unassigned
;
32 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_mgr_task_deleted
;
33 import static org
.junit
.Assert
.assertEquals
;
34 import static org
.junit
.Assert
.assertFalse
;
35 import static org
.junit
.Assert
.assertTrue
;
36 import java
.io
.IOException
;
38 import java
.util
.concurrent
.atomic
.LongAdder
;
39 import org
.apache
.hadoop
.conf
.Configuration
;
40 import org
.apache
.hadoop
.fs
.FileSystem
;
41 import org
.apache
.hadoop
.fs
.Path
;
42 import org
.apache
.hadoop
.hbase
.CoordinatedStateManager
;
43 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
44 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
45 import org
.apache
.hadoop
.hbase
.HConstants
;
46 import org
.apache
.hadoop
.hbase
.ServerName
;
47 import org
.apache
.hadoop
.hbase
.SplitLogCounters
;
48 import org
.apache
.hadoop
.hbase
.SplitLogTask
;
49 import org
.apache
.hadoop
.hbase
.Waiter
;
50 import org
.apache
.hadoop
.hbase
.coordination
.ZKSplitLogManagerCoordination
;
51 import org
.apache
.hadoop
.hbase
.coordination
.ZkCoordinatedStateManager
;
52 import org
.apache
.hadoop
.hbase
.master
.SplitLogManager
.Task
;
53 import org
.apache
.hadoop
.hbase
.master
.SplitLogManager
.TaskBatch
;
54 import org
.apache
.hadoop
.hbase
.regionserver
.TestMasterAddressTracker
.NodeCreationListener
;
55 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
56 import org
.apache
.hadoop
.hbase
.testclassification
.MasterTests
;
57 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKSplitLog
;
58 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKUtil
;
59 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKWatcher
;
60 import org
.apache
.zookeeper
.CreateMode
;
61 import org
.apache
.zookeeper
.KeeperException
;
62 import org
.apache
.zookeeper
.ZooDefs
.Ids
;
63 import org
.junit
.After
;
64 import org
.junit
.Assert
;
65 import org
.junit
.Before
;
66 import org
.junit
.ClassRule
;
67 import org
.junit
.Test
;
68 import org
.junit
.experimental
.categories
.Category
;
69 import org
.mockito
.Mockito
;
70 import org
.slf4j
.Logger
;
71 import org
.slf4j
.LoggerFactory
;
73 @Category({MasterTests
.class, LargeTests
.class})
74 public class TestSplitLogManager
{
77 public static final HBaseClassTestRule CLASS_RULE
=
78 HBaseClassTestRule
.forClass(TestSplitLogManager
.class);
80 private static final Logger LOG
= LoggerFactory
.getLogger(TestSplitLogManager
.class);
82 private final ServerManager sm
= Mockito
.mock(ServerManager
.class);
84 private ZKWatcher zkw
;
85 private DummyMasterServices master
;
86 private SplitLogManager slm
;
87 private Configuration conf
;
90 private static HBaseTestingUtility TEST_UTIL
;
92 class DummyMasterServices
extends MockNoopMasterServices
{
93 private ZKWatcher zkw
;
94 private CoordinatedStateManager cm
;
96 public DummyMasterServices(ZKWatcher zkw
, Configuration conf
) {
99 cm
= new ZkCoordinatedStateManager(this);
103 public ZKWatcher
getZooKeeper() {
108 public CoordinatedStateManager
getCoordinatedStateManager() {
113 public ServerManager
getServerManager() {
119 public void setup() throws Exception
{
120 TEST_UTIL
= new HBaseTestingUtility();
121 TEST_UTIL
.startMiniZKCluster();
122 conf
= TEST_UTIL
.getConfiguration();
123 // Use a different ZK wrapper instance for each tests.
125 new ZKWatcher(conf
, "split-log-manager-tests" + TEST_UTIL
.getRandomUUID().toString(), null);
126 master
= new DummyMasterServices(zkw
, conf
);
128 ZKUtil
.deleteChildrenRecursively(zkw
, zkw
.getZNodePaths().baseZNode
);
129 ZKUtil
.createAndFailSilent(zkw
, zkw
.getZNodePaths().baseZNode
);
130 assertTrue(ZKUtil
.checkExists(zkw
, zkw
.getZNodePaths().baseZNode
) != -1);
131 LOG
.debug(zkw
.getZNodePaths().baseZNode
+ " created");
132 ZKUtil
.createAndFailSilent(zkw
, zkw
.getZNodePaths().splitLogZNode
);
133 assertTrue(ZKUtil
.checkExists(zkw
, zkw
.getZNodePaths().splitLogZNode
) != -1);
134 LOG
.debug(zkw
.getZNodePaths().splitLogZNode
+ " created");
138 // By default, we let the test manage the error as before, so the server
139 // does not appear as dead from the master point of view, only from the split log pov.
140 Mockito
.when(sm
.isServerOnline(Mockito
.any())).thenReturn(true);
143 conf
.setInt(HConstants
.HBASE_SPLITLOG_MANAGER_TIMEOUT
, to
);
144 conf
.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to
);
146 conf
.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
151 public void teardown() throws IOException
, KeeperException
{
156 TEST_UTIL
.shutdownMiniZKCluster();
159 private interface Expr
{
163 private void waitForCounter(final LongAdder ctr
, long oldval
, long newval
, long timems
)
165 Expr e
= new Expr() {
171 waitForCounter(e
, oldval
, newval
, timems
);
175 private void waitForCounter(final Expr e
, final long oldval
, long newval
, long timems
)
178 TEST_UTIL
.waitFor(timems
, 10, new Waiter
.Predicate
<Exception
>() {
180 public boolean evaluate() throws Exception
{
181 return (e
.eval() != oldval
);
185 assertEquals(newval
, e
.eval());
188 private Task
findOrCreateOrphanTask(String path
) {
189 return slm
.tasks
.computeIfAbsent(path
, k
-> {
190 LOG
.info("creating orphan task " + k
);
191 SplitLogCounters
.tot_mgr_orphan_task_acquired
.increment();
196 private String
submitTaskAndWait(TaskBatch batch
, String name
) throws KeeperException
,
197 InterruptedException
{
198 String tasknode
= ZKSplitLog
.getEncodedNodeName(zkw
, name
);
199 NodeCreationListener listener
= new NodeCreationListener(zkw
, tasknode
);
200 zkw
.registerListener(listener
);
201 ZKUtil
.watchAndCheckExists(zkw
, tasknode
);
203 slm
.enqueueSplitTask(name
, batch
);
204 assertEquals(1, batch
.installed
);
205 assertTrue(findOrCreateOrphanTask(tasknode
).batch
== batch
);
206 assertEquals(1L, tot_mgr_node_create_queued
.sum());
208 LOG
.debug("waiting for task node creation");
209 listener
.waitForCreation();
210 LOG
.debug("task created");
215 * Test whether the splitlog correctly creates a task in zookeeper
218 public void testTaskCreation() throws Exception
{
220 LOG
.info("TestTaskCreation - test the creation of a task in zk");
221 slm
= new SplitLogManager(master
, conf
);
222 TaskBatch batch
= new TaskBatch();
224 String tasknode
= submitTaskAndWait(batch
, "foo/1");
226 byte[] data
= ZKUtil
.getData(zkw
, tasknode
);
227 SplitLogTask slt
= SplitLogTask
.parseFrom(data
);
228 LOG
.info("Task node created " + slt
.toString());
229 assertTrue(slt
.isUnassigned(master
.getServerName()));
233 public void testOrphanTaskAcquisition() throws Exception
{
234 LOG
.info("TestOrphanTaskAcquisition");
236 String tasknode
= ZKSplitLog
.getEncodedNodeName(zkw
, "orphan/test/slash");
237 SplitLogTask slt
= new SplitLogTask
.Owned(master
.getServerName());
238 zkw
.getRecoverableZooKeeper().create(tasknode
, slt
.toByteArray(), Ids
.OPEN_ACL_UNSAFE
,
239 CreateMode
.PERSISTENT
);
241 slm
= new SplitLogManager(master
, conf
);
242 waitForCounter(tot_mgr_orphan_task_acquired
, 0, 1, to
/2);
243 Task task
= findOrCreateOrphanTask(tasknode
);
244 assertTrue(task
.isOrphan());
245 waitForCounter(tot_mgr_heartbeat
, 0, 1, to
/2);
246 assertFalse(task
.isUnassigned());
247 long curt
= System
.currentTimeMillis();
248 assertTrue((task
.last_update
<= curt
) &&
249 (task
.last_update
> (curt
- 1000)));
250 LOG
.info("waiting for manager to resubmit the orphan task");
251 waitForCounter(tot_mgr_resubmit
, 0, 1, to
+ to
/2);
252 assertTrue(task
.isUnassigned());
253 waitForCounter(tot_mgr_rescan
, 0, 1, to
+ to
/2);
257 public void testUnassignedOrphan() throws Exception
{
258 LOG
.info("TestUnassignedOrphan - an unassigned task is resubmitted at" +
260 String tasknode
= ZKSplitLog
.getEncodedNodeName(zkw
, "orphan/test/slash");
261 //create an unassigned orphan task
262 SplitLogTask slt
= new SplitLogTask
.Unassigned(master
.getServerName());
263 zkw
.getRecoverableZooKeeper().create(tasknode
, slt
.toByteArray(), Ids
.OPEN_ACL_UNSAFE
,
264 CreateMode
.PERSISTENT
);
265 int version
= ZKUtil
.checkExists(zkw
, tasknode
);
267 slm
= new SplitLogManager(master
, conf
);
268 waitForCounter(tot_mgr_orphan_task_acquired
, 0, 1, to
/2);
269 Task task
= findOrCreateOrphanTask(tasknode
);
270 assertTrue(task
.isOrphan());
271 assertTrue(task
.isUnassigned());
272 // wait for RESCAN node to be created
273 waitForCounter(tot_mgr_rescan
, 0, 1, to
/ 2);
274 Task task2
= findOrCreateOrphanTask(tasknode
);
275 assertTrue(task
== task2
);
276 LOG
.debug("task = " + task
);
277 assertEquals(1L, tot_mgr_resubmit
.sum());
278 assertEquals(1, task
.incarnation
.get());
279 assertEquals(0, task
.unforcedResubmits
.get());
280 assertTrue(task
.isOrphan());
281 assertTrue(task
.isUnassigned());
282 assertTrue(ZKUtil
.checkExists(zkw
, tasknode
) > version
);
286 public void testMultipleResubmits() throws Exception
{
287 LOG
.info("TestMultipleResbmits - no indefinite resubmissions");
288 conf
.setInt("hbase.splitlog.max.resubmit", 2);
289 slm
= new SplitLogManager(master
, conf
);
290 TaskBatch batch
= new TaskBatch();
292 String tasknode
= submitTaskAndWait(batch
, "foo/1");
293 int version
= ZKUtil
.checkExists(zkw
, tasknode
);
294 final ServerName worker1
= ServerName
.valueOf("worker1,1,1");
295 final ServerName worker2
= ServerName
.valueOf("worker2,1,1");
296 final ServerName worker3
= ServerName
.valueOf("worker3,1,1");
297 SplitLogTask slt
= new SplitLogTask
.Owned(worker1
);
298 ZKUtil
.setData(zkw
, tasknode
, slt
.toByteArray());
299 waitForCounter(tot_mgr_heartbeat
, 0, 1, to
/2);
300 waitForCounter(tot_mgr_resubmit
, 0, 1, to
+ to
/2);
301 int version1
= ZKUtil
.checkExists(zkw
, tasknode
);
302 assertTrue(version1
> version
);
303 slt
= new SplitLogTask
.Owned(worker2
);
304 ZKUtil
.setData(zkw
, tasknode
, slt
.toByteArray());
305 waitForCounter(tot_mgr_heartbeat
, 1, 2, to
/2);
306 waitForCounter(tot_mgr_resubmit
, 1, 2, to
+ to
/2);
307 int version2
= ZKUtil
.checkExists(zkw
, tasknode
);
308 assertTrue(version2
> version1
);
309 slt
= new SplitLogTask
.Owned(worker3
);
310 ZKUtil
.setData(zkw
, tasknode
, slt
.toByteArray());
311 waitForCounter(tot_mgr_heartbeat
, 2, 3, to
/2);
312 waitForCounter(tot_mgr_resubmit_threshold_reached
, 0, 1, to
+ to
/2);
313 Thread
.sleep(to
+ to
/2);
314 assertEquals(2L, tot_mgr_resubmit
.sum() - tot_mgr_resubmit_force
.sum());
318 public void testRescanCleanup() throws Exception
{
319 LOG
.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
321 slm
= new SplitLogManager(master
, conf
);
322 TaskBatch batch
= new TaskBatch();
324 String tasknode
= submitTaskAndWait(batch
, "foo/1");
325 int version
= ZKUtil
.checkExists(zkw
, tasknode
);
326 final ServerName worker1
= ServerName
.valueOf("worker1,1,1");
327 SplitLogTask slt
= new SplitLogTask
.Owned(worker1
);
328 ZKUtil
.setData(zkw
, tasknode
, slt
.toByteArray());
329 waitForCounter(tot_mgr_heartbeat
, 0, 1, to
/2);
330 waitForCounter(new Expr() {
333 return (tot_mgr_resubmit
.sum() + tot_mgr_resubmit_failed
.sum());
335 }, 0, 1, 5*60000); // wait long enough
336 Assert
.assertEquals("Could not run test. Lost ZK connection?",
337 0, tot_mgr_resubmit_failed
.sum());
338 int version1
= ZKUtil
.checkExists(zkw
, tasknode
);
339 assertTrue(version1
> version
);
340 byte[] taskstate
= ZKUtil
.getData(zkw
, tasknode
);
341 slt
= SplitLogTask
.parseFrom(taskstate
);
342 assertTrue(slt
.isUnassigned(master
.getServerName()));
344 waitForCounter(tot_mgr_rescan_deleted
, 0, 1, to
/2);
348 public void testTaskDone() throws Exception
{
349 LOG
.info("TestTaskDone - cleanup task node once in DONE state");
351 slm
= new SplitLogManager(master
, conf
);
352 TaskBatch batch
= new TaskBatch();
353 String tasknode
= submitTaskAndWait(batch
, "foo/1");
354 final ServerName worker1
= ServerName
.valueOf("worker1,1,1");
355 SplitLogTask slt
= new SplitLogTask
.Done(worker1
);
356 ZKUtil
.setData(zkw
, tasknode
, slt
.toByteArray());
357 synchronized (batch
) {
358 while (batch
.installed
!= batch
.done
) {
362 waitForCounter(tot_mgr_task_deleted
, 0, 1, to
/2);
363 assertTrue(ZKUtil
.checkExists(zkw
, tasknode
) == -1);
367 public void testTaskErr() throws Exception
{
368 LOG
.info("TestTaskErr - cleanup task node once in ERR state");
370 conf
.setInt("hbase.splitlog.max.resubmit", 0);
371 slm
= new SplitLogManager(master
, conf
);
372 TaskBatch batch
= new TaskBatch();
374 String tasknode
= submitTaskAndWait(batch
, "foo/1");
375 final ServerName worker1
= ServerName
.valueOf("worker1,1,1");
376 SplitLogTask slt
= new SplitLogTask
.Err(worker1
);
377 ZKUtil
.setData(zkw
, tasknode
, slt
.toByteArray());
379 synchronized (batch
) {
380 while (batch
.installed
!= batch
.error
) {
384 waitForCounter(tot_mgr_task_deleted
, 0, 1, to
/2);
385 assertTrue(ZKUtil
.checkExists(zkw
, tasknode
) == -1);
386 conf
.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination
.DEFAULT_MAX_RESUBMIT
);
390 public void testTaskResigned() throws Exception
{
391 LOG
.info("TestTaskResigned - resubmit task node once in RESIGNED state");
392 assertEquals(0, tot_mgr_resubmit
.sum());
393 slm
= new SplitLogManager(master
, conf
);
394 assertEquals(0, tot_mgr_resubmit
.sum());
395 TaskBatch batch
= new TaskBatch();
396 String tasknode
= submitTaskAndWait(batch
, "foo/1");
397 assertEquals(0, tot_mgr_resubmit
.sum());
398 final ServerName worker1
= ServerName
.valueOf("worker1,1,1");
399 assertEquals(0, tot_mgr_resubmit
.sum());
400 SplitLogTask slt
= new SplitLogTask
.Resigned(worker1
);
401 assertEquals(0, tot_mgr_resubmit
.sum());
402 ZKUtil
.setData(zkw
, tasknode
, slt
.toByteArray());
403 ZKUtil
.checkExists(zkw
, tasknode
);
404 // Could be small race here.
405 if (tot_mgr_resubmit
.sum() == 0) {
406 waitForCounter(tot_mgr_resubmit
, 0, 1, to
/2);
408 assertEquals(1, tot_mgr_resubmit
.sum());
410 byte[] taskstate
= ZKUtil
.getData(zkw
, tasknode
);
411 slt
= SplitLogTask
.parseFrom(taskstate
);
412 assertTrue(slt
.isUnassigned(master
.getServerName()));
416 public void testUnassignedTimeout() throws Exception
{
417 LOG
.info("TestUnassignedTimeout - iff all tasks are unassigned then" +
420 // create an orphan task in OWNED state
421 String tasknode1
= ZKSplitLog
.getEncodedNodeName(zkw
, "orphan/1");
422 final ServerName worker1
= ServerName
.valueOf("worker1,1,1");
423 SplitLogTask slt
= new SplitLogTask
.Owned(worker1
);
424 zkw
.getRecoverableZooKeeper().create(tasknode1
, slt
.toByteArray(), Ids
.OPEN_ACL_UNSAFE
,
425 CreateMode
.PERSISTENT
);
427 slm
= new SplitLogManager(master
, conf
);
428 waitForCounter(tot_mgr_orphan_task_acquired
, 0, 1, to
/2);
430 // submit another task which will stay in unassigned mode
431 TaskBatch batch
= new TaskBatch();
432 submitTaskAndWait(batch
, "foo/1");
434 // keep updating the orphan owned node every to/2 seconds
435 for (int i
= 0; i
< (3 * to
)/100; i
++) {
437 final ServerName worker2
= ServerName
.valueOf("worker1,1,1");
438 slt
= new SplitLogTask
.Owned(worker2
);
439 ZKUtil
.setData(zkw
, tasknode1
, slt
.toByteArray());
442 // since we have stopped heartbeating the owned node therefore it should
444 LOG
.info("waiting for manager to resubmit the orphan task");
445 waitForCounter(tot_mgr_resubmit
, 0, 1, to
+ to
/2);
447 // now all the nodes are unassigned. manager should post another rescan
448 waitForCounter(tot_mgr_resubmit_unassigned
, 0, 1, 2 * to
+ to
/2);
452 public void testDeadWorker() throws Exception
{
453 LOG
.info("testDeadWorker");
455 conf
.setLong("hbase.splitlog.max.resubmit", 0);
456 slm
= new SplitLogManager(master
, conf
);
457 TaskBatch batch
= new TaskBatch();
459 String tasknode
= submitTaskAndWait(batch
, "foo/1");
460 int version
= ZKUtil
.checkExists(zkw
, tasknode
);
461 final ServerName worker1
= ServerName
.valueOf("worker1,1,1");
462 SplitLogTask slt
= new SplitLogTask
.Owned(worker1
);
463 ZKUtil
.setData(zkw
, tasknode
, slt
.toByteArray());
464 if (tot_mgr_heartbeat
.sum() == 0) {
465 waitForCounter(tot_mgr_heartbeat
, 0, 1, to
/2);
467 slm
.handleDeadWorker(worker1
);
468 if (tot_mgr_resubmit
.sum() == 0) {
469 waitForCounter(tot_mgr_resubmit
, 0, 1, to
+to
/2);
471 if (tot_mgr_resubmit_dead_server_task
.sum() == 0) {
472 waitForCounter(tot_mgr_resubmit_dead_server_task
, 0, 1, to
+ to
/2);
475 int version1
= ZKUtil
.checkExists(zkw
, tasknode
);
476 assertTrue(version1
> version
);
477 byte[] taskstate
= ZKUtil
.getData(zkw
, tasknode
);
478 slt
= SplitLogTask
.parseFrom(taskstate
);
479 assertTrue(slt
.isUnassigned(master
.getServerName()));
484 public void testWorkerCrash() throws Exception
{
485 slm
= new SplitLogManager(master
, conf
);
486 TaskBatch batch
= new TaskBatch();
488 String tasknode
= submitTaskAndWait(batch
, "foo/1");
489 final ServerName worker1
= ServerName
.valueOf("worker1,1,1");
491 SplitLogTask slt
= new SplitLogTask
.Owned(worker1
);
492 ZKUtil
.setData(zkw
, tasknode
, slt
.toByteArray());
493 if (tot_mgr_heartbeat
.sum() == 0) {
494 waitForCounter(tot_mgr_heartbeat
, 0, 1, to
/2);
497 // Not yet resubmitted.
498 Assert
.assertEquals(0, tot_mgr_resubmit
.sum());
500 // This server becomes dead
501 Mockito
.when(sm
.isServerOnline(worker1
)).thenReturn(false);
503 Thread
.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded).
505 // It has been resubmitted
506 Assert
.assertEquals(1, tot_mgr_resubmit
.sum());
510 public void testEmptyLogDir() throws Exception
{
511 LOG
.info("testEmptyLogDir");
512 slm
= new SplitLogManager(master
, conf
);
513 FileSystem fs
= TEST_UTIL
.getTestFileSystem();
514 Path emptyLogDirPath
= new Path(new Path(fs
.getWorkingDirectory(),
515 HConstants
.HREGION_LOGDIR_NAME
),
516 ServerName
.valueOf("emptyLogDir", 1, 1).toString());
517 fs
.mkdirs(emptyLogDirPath
);
518 slm
.splitLogDistributed(emptyLogDirPath
);
519 assertFalse(fs
.exists(emptyLogDirPath
));
523 public void testLogFilesAreArchived() throws Exception
{
524 LOG
.info("testLogFilesAreArchived");
525 slm
= new SplitLogManager(master
, conf
);
526 FileSystem fs
= TEST_UTIL
.getTestFileSystem();
527 Path dir
= TEST_UTIL
.getDataTestDirOnTestFS("testLogFilesAreArchived");
528 conf
.set(HConstants
.HBASE_DIR
, dir
.toString());
529 String serverName
= ServerName
.valueOf("foo", 1, 1).toString();
530 Path logDirPath
= new Path(new Path(dir
, HConstants
.HREGION_LOGDIR_NAME
), serverName
);
531 fs
.mkdirs(logDirPath
);
532 // create an empty log file
533 String logFile
= new Path(logDirPath
, TEST_UTIL
.getRandomUUID().toString()).toString();
534 fs
.create(new Path(logDirPath
, logFile
)).close();
536 // spin up a thread mocking split done.
540 boolean done
= false;
542 for (Map
.Entry
<String
, Task
> entry
: slm
.getTasks().entrySet()) {
543 final ServerName worker1
= ServerName
.valueOf("worker1,1,1");
544 SplitLogTask slt
= new SplitLogTask
.Done(worker1
);
545 boolean encounteredZKException
= false;
547 ZKUtil
.setData(zkw
, entry
.getKey(), slt
.toByteArray());
548 } catch (KeeperException e
) {
549 LOG
.warn(e
.toString(), e
);
550 encounteredZKException
= true;
552 if (!encounteredZKException
) {
560 slm
.splitLogDistributed(logDirPath
);
562 assertFalse(fs
.exists(logDirPath
));