2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.master
;
20 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.resetCounters
;
21 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_mgr_heartbeat
;
22 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_mgr_node_create_queued
;
23 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_mgr_orphan_task_acquired
;
24 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_mgr_rescan
;
25 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_mgr_rescan_deleted
;
26 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_mgr_resubmit
;
27 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_mgr_resubmit_dead_server_task
;
28 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_mgr_resubmit_failed
;
29 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_mgr_resubmit_force
;
30 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_mgr_resubmit_threshold_reached
;
31 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_mgr_resubmit_unassigned
;
32 import static org
.apache
.hadoop
.hbase
.SplitLogCounters
.tot_mgr_task_deleted
;
33 import static org
.junit
.Assert
.assertEquals
;
34 import static org
.junit
.Assert
.assertFalse
;
35 import static org
.junit
.Assert
.assertTrue
;
37 import java
.io
.IOException
;
39 import java
.util
.concurrent
.atomic
.LongAdder
;
40 import org
.apache
.hadoop
.conf
.Configuration
;
41 import org
.apache
.hadoop
.fs
.FileSystem
;
42 import org
.apache
.hadoop
.fs
.Path
;
43 import org
.apache
.hadoop
.hbase
.CoordinatedStateManager
;
44 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
45 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
46 import org
.apache
.hadoop
.hbase
.HConstants
;
47 import org
.apache
.hadoop
.hbase
.ServerName
;
48 import org
.apache
.hadoop
.hbase
.SplitLogCounters
;
49 import org
.apache
.hadoop
.hbase
.SplitLogTask
;
50 import org
.apache
.hadoop
.hbase
.Waiter
;
51 import org
.apache
.hadoop
.hbase
.coordination
.ZKSplitLogManagerCoordination
;
52 import org
.apache
.hadoop
.hbase
.coordination
.ZkCoordinatedStateManager
;
53 import org
.apache
.hadoop
.hbase
.master
.SplitLogManager
.Task
;
54 import org
.apache
.hadoop
.hbase
.master
.SplitLogManager
.TaskBatch
;
55 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
56 import org
.apache
.hadoop
.hbase
.testclassification
.MasterTests
;
57 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
58 import org
.apache
.hadoop
.hbase
.zookeeper
.TestMasterAddressTracker
.NodeCreationListener
;
59 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKSplitLog
;
60 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKUtil
;
61 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKWatcher
;
62 import org
.apache
.zookeeper
.CreateMode
;
63 import org
.apache
.zookeeper
.KeeperException
;
64 import org
.apache
.zookeeper
.ZooDefs
.Ids
;
65 import org
.junit
.After
;
66 import org
.junit
.Assert
;
67 import org
.junit
.Before
;
68 import org
.junit
.ClassRule
;
69 import org
.junit
.Test
;
70 import org
.junit
.experimental
.categories
.Category
;
71 import org
.mockito
.Mockito
;
72 import org
.slf4j
.Logger
;
73 import org
.slf4j
.LoggerFactory
;
75 @Category({MasterTests
.class, LargeTests
.class})
76 public class TestSplitLogManager
{
79 public static final HBaseClassTestRule CLASS_RULE
=
80 HBaseClassTestRule
.forClass(TestSplitLogManager
.class);
82 private static final Logger LOG
= LoggerFactory
.getLogger(TestSplitLogManager
.class);
84 private final ServerManager sm
= Mockito
.mock(ServerManager
.class);
86 private ZKWatcher zkw
;
87 private DummyMasterServices master
;
88 private SplitLogManager slm
;
89 private Configuration conf
;
92 private static HBaseTestingUtil TEST_UTIL
;
94 class DummyMasterServices
extends MockNoopMasterServices
{
95 private ZKWatcher zkw
;
96 private CoordinatedStateManager cm
;
98 public DummyMasterServices(ZKWatcher zkw
, Configuration conf
) {
101 cm
= new ZkCoordinatedStateManager(this);
105 public ZKWatcher
getZooKeeper() {
110 public CoordinatedStateManager
getCoordinatedStateManager() {
115 public ServerManager
getServerManager() {
121 public void setup() throws Exception
{
122 TEST_UTIL
= new HBaseTestingUtil();
123 TEST_UTIL
.startMiniZKCluster();
124 conf
= TEST_UTIL
.getConfiguration();
125 // Use a different ZK wrapper instance for each tests.
127 new ZKWatcher(conf
, "split-log-manager-tests" + TEST_UTIL
.getRandomUUID().toString(), null);
128 master
= new DummyMasterServices(zkw
, conf
);
130 ZKUtil
.deleteChildrenRecursively(zkw
, zkw
.getZNodePaths().baseZNode
);
131 ZKUtil
.createAndFailSilent(zkw
, zkw
.getZNodePaths().baseZNode
);
132 assertTrue(ZKUtil
.checkExists(zkw
, zkw
.getZNodePaths().baseZNode
) != -1);
133 LOG
.debug(zkw
.getZNodePaths().baseZNode
+ " created");
134 ZKUtil
.createAndFailSilent(zkw
, zkw
.getZNodePaths().splitLogZNode
);
135 assertTrue(ZKUtil
.checkExists(zkw
, zkw
.getZNodePaths().splitLogZNode
) != -1);
136 LOG
.debug(zkw
.getZNodePaths().splitLogZNode
+ " created");
140 // By default, we let the test manage the error as before, so the server
141 // does not appear as dead from the master point of view, only from the split log pov.
142 Mockito
.when(sm
.isServerOnline(Mockito
.any())).thenReturn(true);
145 conf
.setInt(HConstants
.HBASE_SPLITLOG_MANAGER_TIMEOUT
, to
);
146 conf
.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to
);
148 conf
.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
153 public void teardown() throws IOException
, KeeperException
{
158 TEST_UTIL
.shutdownMiniZKCluster();
162 public void testBatchWaitMillis() {
163 assertEquals(100, SplitLogManager
.getBatchWaitTimeMillis(0));
164 assertEquals(100, SplitLogManager
.getBatchWaitTimeMillis(1));
165 assertEquals(1000, SplitLogManager
.getBatchWaitTimeMillis(10));
166 assertEquals(60_000
, SplitLogManager
.getBatchWaitTimeMillis(101));
167 assertEquals(60_000
, SplitLogManager
.getBatchWaitTimeMillis(1011));
170 private interface Expr
{
174 private void waitForCounter(final LongAdder ctr
, long oldval
, long newval
, long timems
)
176 Expr e
= new Expr() {
182 waitForCounter(e
, oldval
, newval
, timems
);
186 private void waitForCounter(final Expr e
, final long oldval
, long newval
, long timems
)
189 TEST_UTIL
.waitFor(timems
, 10, new Waiter
.Predicate
<Exception
>() {
191 public boolean evaluate() throws Exception
{
192 return (e
.eval() != oldval
);
196 assertEquals(newval
, e
.eval());
199 private Task
findOrCreateOrphanTask(String path
) {
200 return slm
.tasks
.computeIfAbsent(path
, k
-> {
201 LOG
.info("creating orphan task " + k
);
202 SplitLogCounters
.tot_mgr_orphan_task_acquired
.increment();
207 private String
submitTaskAndWait(TaskBatch batch
, String name
) throws KeeperException
,
208 InterruptedException
{
209 String tasknode
= ZKSplitLog
.getEncodedNodeName(zkw
, name
);
210 NodeCreationListener listener
= new NodeCreationListener(zkw
, tasknode
);
211 zkw
.registerListener(listener
);
212 ZKUtil
.watchAndCheckExists(zkw
, tasknode
);
214 slm
.enqueueSplitTask(name
, batch
);
215 assertEquals(1, batch
.installed
);
216 assertTrue(findOrCreateOrphanTask(tasknode
).batch
== batch
);
217 assertEquals(1L, tot_mgr_node_create_queued
.sum());
219 LOG
.debug("waiting for task node creation");
220 listener
.waitForCreation();
221 LOG
.debug("task created");
226 * Test whether the splitlog correctly creates a task in zookeeper
229 public void testTaskCreation() throws Exception
{
231 LOG
.info("TestTaskCreation - test the creation of a task in zk");
232 slm
= new SplitLogManager(master
, conf
);
233 TaskBatch batch
= new TaskBatch();
235 String tasknode
= submitTaskAndWait(batch
, "foo/1");
237 byte[] data
= ZKUtil
.getData(zkw
, tasknode
);
238 SplitLogTask slt
= SplitLogTask
.parseFrom(data
);
239 LOG
.info("Task node created " + slt
.toString());
240 assertTrue(slt
.isUnassigned(master
.getServerName()));
244 public void testOrphanTaskAcquisition() throws Exception
{
245 LOG
.info("TestOrphanTaskAcquisition");
247 String tasknode
= ZKSplitLog
.getEncodedNodeName(zkw
, "orphan/test/slash");
248 SplitLogTask slt
= new SplitLogTask
.Owned(master
.getServerName());
249 zkw
.getRecoverableZooKeeper().create(tasknode
, slt
.toByteArray(), Ids
.OPEN_ACL_UNSAFE
,
250 CreateMode
.PERSISTENT
);
252 slm
= new SplitLogManager(master
, conf
);
253 waitForCounter(tot_mgr_orphan_task_acquired
, 0, 1, to
/2);
254 Task task
= findOrCreateOrphanTask(tasknode
);
255 assertTrue(task
.isOrphan());
256 waitForCounter(tot_mgr_heartbeat
, 0, 1, to
/2);
257 assertFalse(task
.isUnassigned());
258 long curt
= EnvironmentEdgeManager
.currentTime();
259 assertTrue((task
.last_update
<= curt
) &&
260 (task
.last_update
> (curt
- 1000)));
261 LOG
.info("waiting for manager to resubmit the orphan task");
262 waitForCounter(tot_mgr_resubmit
, 0, 1, to
+ to
/2);
263 assertTrue(task
.isUnassigned());
264 waitForCounter(tot_mgr_rescan
, 0, 1, to
+ to
/2);
268 public void testUnassignedOrphan() throws Exception
{
269 LOG
.info("TestUnassignedOrphan - an unassigned task is resubmitted at" +
271 String tasknode
= ZKSplitLog
.getEncodedNodeName(zkw
, "orphan/test/slash");
272 //create an unassigned orphan task
273 SplitLogTask slt
= new SplitLogTask
.Unassigned(master
.getServerName());
274 zkw
.getRecoverableZooKeeper().create(tasknode
, slt
.toByteArray(), Ids
.OPEN_ACL_UNSAFE
,
275 CreateMode
.PERSISTENT
);
276 int version
= ZKUtil
.checkExists(zkw
, tasknode
);
278 slm
= new SplitLogManager(master
, conf
);
279 waitForCounter(tot_mgr_orphan_task_acquired
, 0, 1, to
/2);
280 Task task
= findOrCreateOrphanTask(tasknode
);
281 assertTrue(task
.isOrphan());
282 assertTrue(task
.isUnassigned());
283 // wait for RESCAN node to be created
284 waitForCounter(tot_mgr_rescan
, 0, 1, to
/ 2);
285 Task task2
= findOrCreateOrphanTask(tasknode
);
286 assertTrue(task
== task2
);
287 LOG
.debug("task = " + task
);
288 assertEquals(1L, tot_mgr_resubmit
.sum());
289 assertEquals(1, task
.incarnation
.get());
290 assertEquals(0, task
.unforcedResubmits
.get());
291 assertTrue(task
.isOrphan());
292 assertTrue(task
.isUnassigned());
293 assertTrue(ZKUtil
.checkExists(zkw
, tasknode
) > version
);
297 public void testMultipleResubmits() throws Exception
{
298 LOG
.info("TestMultipleResbmits - no indefinite resubmissions");
299 conf
.setInt("hbase.splitlog.max.resubmit", 2);
300 slm
= new SplitLogManager(master
, conf
);
301 TaskBatch batch
= new TaskBatch();
303 String tasknode
= submitTaskAndWait(batch
, "foo/1");
304 int version
= ZKUtil
.checkExists(zkw
, tasknode
);
305 final ServerName worker1
= ServerName
.valueOf("worker1,1,1");
306 final ServerName worker2
= ServerName
.valueOf("worker2,1,1");
307 final ServerName worker3
= ServerName
.valueOf("worker3,1,1");
308 SplitLogTask slt
= new SplitLogTask
.Owned(worker1
);
309 ZKUtil
.setData(zkw
, tasknode
, slt
.toByteArray());
310 waitForCounter(tot_mgr_heartbeat
, 0, 1, to
/2);
311 waitForCounter(tot_mgr_resubmit
, 0, 1, to
+ to
/2);
312 int version1
= ZKUtil
.checkExists(zkw
, tasknode
);
313 assertTrue(version1
> version
);
314 slt
= new SplitLogTask
.Owned(worker2
);
315 ZKUtil
.setData(zkw
, tasknode
, slt
.toByteArray());
316 waitForCounter(tot_mgr_heartbeat
, 1, 2, to
/2);
317 waitForCounter(tot_mgr_resubmit
, 1, 2, to
+ to
/2);
318 int version2
= ZKUtil
.checkExists(zkw
, tasknode
);
319 assertTrue(version2
> version1
);
320 slt
= new SplitLogTask
.Owned(worker3
);
321 ZKUtil
.setData(zkw
, tasknode
, slt
.toByteArray());
322 waitForCounter(tot_mgr_heartbeat
, 2, 3, to
/2);
323 waitForCounter(tot_mgr_resubmit_threshold_reached
, 0, 1, to
+ to
/2);
324 Thread
.sleep(to
+ to
/2);
325 assertEquals(2L, tot_mgr_resubmit
.sum() - tot_mgr_resubmit_force
.sum());
329 public void testRescanCleanup() throws Exception
{
330 LOG
.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
332 slm
= new SplitLogManager(master
, conf
);
333 TaskBatch batch
= new TaskBatch();
335 String tasknode
= submitTaskAndWait(batch
, "foo/1");
336 int version
= ZKUtil
.checkExists(zkw
, tasknode
);
337 final ServerName worker1
= ServerName
.valueOf("worker1,1,1");
338 SplitLogTask slt
= new SplitLogTask
.Owned(worker1
);
339 ZKUtil
.setData(zkw
, tasknode
, slt
.toByteArray());
340 waitForCounter(tot_mgr_heartbeat
, 0, 1, to
/2);
341 waitForCounter(new Expr() {
344 return (tot_mgr_resubmit
.sum() + tot_mgr_resubmit_failed
.sum());
346 }, 0, 1, 5*60000); // wait long enough
347 Assert
.assertEquals("Could not run test. Lost ZK connection?",
348 0, tot_mgr_resubmit_failed
.sum());
349 int version1
= ZKUtil
.checkExists(zkw
, tasknode
);
350 assertTrue(version1
> version
);
351 byte[] taskstate
= ZKUtil
.getData(zkw
, tasknode
);
352 slt
= SplitLogTask
.parseFrom(taskstate
);
353 assertTrue(slt
.isUnassigned(master
.getServerName()));
355 waitForCounter(tot_mgr_rescan_deleted
, 0, 1, to
/2);
359 public void testTaskDone() throws Exception
{
360 LOG
.info("TestTaskDone - cleanup task node once in DONE state");
362 slm
= new SplitLogManager(master
, conf
);
363 TaskBatch batch
= new TaskBatch();
364 String tasknode
= submitTaskAndWait(batch
, "foo/1");
365 final ServerName worker1
= ServerName
.valueOf("worker1,1,1");
366 SplitLogTask slt
= new SplitLogTask
.Done(worker1
);
367 ZKUtil
.setData(zkw
, tasknode
, slt
.toByteArray());
368 synchronized (batch
) {
369 while (batch
.installed
!= batch
.done
) {
373 waitForCounter(tot_mgr_task_deleted
, 0, 1, to
/2);
374 assertTrue(ZKUtil
.checkExists(zkw
, tasknode
) == -1);
378 public void testTaskErr() throws Exception
{
379 LOG
.info("TestTaskErr - cleanup task node once in ERR state");
381 conf
.setInt("hbase.splitlog.max.resubmit", 0);
382 slm
= new SplitLogManager(master
, conf
);
383 TaskBatch batch
= new TaskBatch();
385 String tasknode
= submitTaskAndWait(batch
, "foo/1");
386 final ServerName worker1
= ServerName
.valueOf("worker1,1,1");
387 SplitLogTask slt
= new SplitLogTask
.Err(worker1
);
388 ZKUtil
.setData(zkw
, tasknode
, slt
.toByteArray());
390 synchronized (batch
) {
391 while (batch
.installed
!= batch
.error
) {
395 waitForCounter(tot_mgr_task_deleted
, 0, 1, to
/2);
396 assertTrue(ZKUtil
.checkExists(zkw
, tasknode
) == -1);
397 conf
.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination
.DEFAULT_MAX_RESUBMIT
);
401 public void testTaskResigned() throws Exception
{
402 LOG
.info("TestTaskResigned - resubmit task node once in RESIGNED state");
403 assertEquals(0, tot_mgr_resubmit
.sum());
404 slm
= new SplitLogManager(master
, conf
);
405 assertEquals(0, tot_mgr_resubmit
.sum());
406 TaskBatch batch
= new TaskBatch();
407 String tasknode
= submitTaskAndWait(batch
, "foo/1");
408 assertEquals(0, tot_mgr_resubmit
.sum());
409 final ServerName worker1
= ServerName
.valueOf("worker1,1,1");
410 assertEquals(0, tot_mgr_resubmit
.sum());
411 SplitLogTask slt
= new SplitLogTask
.Resigned(worker1
);
412 assertEquals(0, tot_mgr_resubmit
.sum());
413 ZKUtil
.setData(zkw
, tasknode
, slt
.toByteArray());
414 ZKUtil
.checkExists(zkw
, tasknode
);
415 // Could be small race here.
416 if (tot_mgr_resubmit
.sum() == 0) {
417 waitForCounter(tot_mgr_resubmit
, 0, 1, to
/2);
419 assertEquals(1, tot_mgr_resubmit
.sum());
421 byte[] taskstate
= ZKUtil
.getData(zkw
, tasknode
);
422 slt
= SplitLogTask
.parseFrom(taskstate
);
423 assertTrue(slt
.isUnassigned(master
.getServerName()));
427 public void testUnassignedTimeout() throws Exception
{
428 LOG
.info("TestUnassignedTimeout - iff all tasks are unassigned then" +
431 // create an orphan task in OWNED state
432 String tasknode1
= ZKSplitLog
.getEncodedNodeName(zkw
, "orphan/1");
433 final ServerName worker1
= ServerName
.valueOf("worker1,1,1");
434 SplitLogTask slt
= new SplitLogTask
.Owned(worker1
);
435 zkw
.getRecoverableZooKeeper().create(tasknode1
, slt
.toByteArray(), Ids
.OPEN_ACL_UNSAFE
,
436 CreateMode
.PERSISTENT
);
438 slm
= new SplitLogManager(master
, conf
);
439 waitForCounter(tot_mgr_orphan_task_acquired
, 0, 1, to
/2);
441 // submit another task which will stay in unassigned mode
442 TaskBatch batch
= new TaskBatch();
443 submitTaskAndWait(batch
, "foo/1");
445 // keep updating the orphan owned node every to/2 seconds
446 for (int i
= 0; i
< (3 * to
)/100; i
++) {
448 final ServerName worker2
= ServerName
.valueOf("worker1,1,1");
449 slt
= new SplitLogTask
.Owned(worker2
);
450 ZKUtil
.setData(zkw
, tasknode1
, slt
.toByteArray());
453 // since we have stopped heartbeating the owned node therefore it should
455 LOG
.info("waiting for manager to resubmit the orphan task");
456 waitForCounter(tot_mgr_resubmit
, 0, 1, to
+ to
/2);
458 // now all the nodes are unassigned. manager should post another rescan
459 waitForCounter(tot_mgr_resubmit_unassigned
, 0, 1, 2 * to
+ to
/2);
463 public void testDeadWorker() throws Exception
{
464 LOG
.info("testDeadWorker");
466 conf
.setLong("hbase.splitlog.max.resubmit", 0);
467 slm
= new SplitLogManager(master
, conf
);
468 TaskBatch batch
= new TaskBatch();
470 String tasknode
= submitTaskAndWait(batch
, "foo/1");
471 int version
= ZKUtil
.checkExists(zkw
, tasknode
);
472 final ServerName worker1
= ServerName
.valueOf("worker1,1,1");
473 SplitLogTask slt
= new SplitLogTask
.Owned(worker1
);
474 ZKUtil
.setData(zkw
, tasknode
, slt
.toByteArray());
475 if (tot_mgr_heartbeat
.sum() == 0) {
476 waitForCounter(tot_mgr_heartbeat
, 0, 1, to
/2);
478 slm
.handleDeadWorker(worker1
);
479 if (tot_mgr_resubmit
.sum() == 0) {
480 waitForCounter(tot_mgr_resubmit
, 0, 1, to
+to
/2);
482 if (tot_mgr_resubmit_dead_server_task
.sum() == 0) {
483 waitForCounter(tot_mgr_resubmit_dead_server_task
, 0, 1, to
+ to
/2);
486 int version1
= ZKUtil
.checkExists(zkw
, tasknode
);
487 assertTrue(version1
> version
);
488 byte[] taskstate
= ZKUtil
.getData(zkw
, tasknode
);
489 slt
= SplitLogTask
.parseFrom(taskstate
);
490 assertTrue(slt
.isUnassigned(master
.getServerName()));
495 public void testWorkerCrash() throws Exception
{
496 slm
= new SplitLogManager(master
, conf
);
497 TaskBatch batch
= new TaskBatch();
499 String tasknode
= submitTaskAndWait(batch
, "foo/1");
500 final ServerName worker1
= ServerName
.valueOf("worker1,1,1");
502 SplitLogTask slt
= new SplitLogTask
.Owned(worker1
);
503 ZKUtil
.setData(zkw
, tasknode
, slt
.toByteArray());
504 if (tot_mgr_heartbeat
.sum() == 0) {
505 waitForCounter(tot_mgr_heartbeat
, 0, 1, to
/2);
508 // Not yet resubmitted.
509 Assert
.assertEquals(0, tot_mgr_resubmit
.sum());
511 // This server becomes dead
512 Mockito
.when(sm
.isServerOnline(worker1
)).thenReturn(false);
514 Thread
.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded).
516 // It has been resubmitted
517 Assert
.assertEquals(1, tot_mgr_resubmit
.sum());
521 public void testEmptyLogDir() throws Exception
{
522 LOG
.info("testEmptyLogDir");
523 slm
= new SplitLogManager(master
, conf
);
524 FileSystem fs
= TEST_UTIL
.getTestFileSystem();
525 Path emptyLogDirPath
= new Path(new Path(fs
.getWorkingDirectory(),
526 HConstants
.HREGION_LOGDIR_NAME
),
527 ServerName
.valueOf("emptyLogDir", 1, 1).toString());
528 fs
.mkdirs(emptyLogDirPath
);
529 slm
.splitLogDistributed(emptyLogDirPath
);
530 assertFalse(fs
.exists(emptyLogDirPath
));
534 public void testLogFilesAreArchived() throws Exception
{
535 LOG
.info("testLogFilesAreArchived");
536 slm
= new SplitLogManager(master
, conf
);
537 FileSystem fs
= TEST_UTIL
.getTestFileSystem();
538 Path dir
= TEST_UTIL
.getDataTestDirOnTestFS("testLogFilesAreArchived");
539 conf
.set(HConstants
.HBASE_DIR
, dir
.toString());
540 String serverName
= ServerName
.valueOf("foo", 1, 1).toString();
541 Path logDirPath
= new Path(new Path(dir
, HConstants
.HREGION_LOGDIR_NAME
), serverName
);
542 fs
.mkdirs(logDirPath
);
543 // create an empty log file
544 String logFile
= new Path(logDirPath
, TEST_UTIL
.getRandomUUID().toString()).toString();
545 fs
.create(new Path(logDirPath
, logFile
)).close();
547 // spin up a thread mocking split done.
551 boolean done
= false;
553 for (Map
.Entry
<String
, Task
> entry
: slm
.getTasks().entrySet()) {
554 final ServerName worker1
= ServerName
.valueOf("worker1,1,1");
555 SplitLogTask slt
= new SplitLogTask
.Done(worker1
);
556 boolean encounteredZKException
= false;
558 ZKUtil
.setData(zkw
, entry
.getKey(), slt
.toByteArray());
559 } catch (KeeperException e
) {
560 LOG
.warn(e
.toString(), e
);
561 encounteredZKException
= true;
563 if (!encounteredZKException
) {
571 slm
.splitLogDistributed(logDirPath
);
573 assertFalse(fs
.exists(logDirPath
));