HBASE-26151 Reimplement MasterAddressTracker to also cache backup master addresses...
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / master / TestSplitLogManager.java
blobeaff93796583c0a8baef0469508e87458efd8edc
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.master;
20 import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters;
21 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat;
22 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued;
23 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired;
24 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan;
25 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted;
26 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit;
27 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_dead_server_task;
28 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_failed;
29 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_force;
30 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_threshold_reached;
31 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_unassigned;
32 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_task_deleted;
33 import static org.junit.Assert.assertEquals;
34 import static org.junit.Assert.assertFalse;
35 import static org.junit.Assert.assertTrue;
37 import java.io.IOException;
38 import java.util.Map;
39 import java.util.concurrent.atomic.LongAdder;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.fs.FileSystem;
42 import org.apache.hadoop.fs.Path;
43 import org.apache.hadoop.hbase.CoordinatedStateManager;
44 import org.apache.hadoop.hbase.HBaseClassTestRule;
45 import org.apache.hadoop.hbase.HBaseTestingUtil;
46 import org.apache.hadoop.hbase.HConstants;
47 import org.apache.hadoop.hbase.ServerName;
48 import org.apache.hadoop.hbase.SplitLogCounters;
49 import org.apache.hadoop.hbase.SplitLogTask;
50 import org.apache.hadoop.hbase.Waiter;
51 import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
52 import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
53 import org.apache.hadoop.hbase.master.SplitLogManager.Task;
54 import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
55 import org.apache.hadoop.hbase.testclassification.LargeTests;
56 import org.apache.hadoop.hbase.testclassification.MasterTests;
57 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
58 import org.apache.hadoop.hbase.zookeeper.TestMasterAddressTracker.NodeCreationListener;
59 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
60 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
61 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
62 import org.apache.zookeeper.CreateMode;
63 import org.apache.zookeeper.KeeperException;
64 import org.apache.zookeeper.ZooDefs.Ids;
65 import org.junit.After;
66 import org.junit.Assert;
67 import org.junit.Before;
68 import org.junit.ClassRule;
69 import org.junit.Test;
70 import org.junit.experimental.categories.Category;
71 import org.mockito.Mockito;
72 import org.slf4j.Logger;
73 import org.slf4j.LoggerFactory;
75 @Category({MasterTests.class, LargeTests.class})
76 public class TestSplitLogManager {
78 @ClassRule
79 public static final HBaseClassTestRule CLASS_RULE =
80 HBaseClassTestRule.forClass(TestSplitLogManager.class);
82 private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class);
84 private final ServerManager sm = Mockito.mock(ServerManager.class);
86 private ZKWatcher zkw;
87 private DummyMasterServices master;
88 private SplitLogManager slm;
89 private Configuration conf;
90 private int to;
92 private static HBaseTestingUtil TEST_UTIL;
94 class DummyMasterServices extends MockNoopMasterServices {
95 private ZKWatcher zkw;
96 private CoordinatedStateManager cm;
98 public DummyMasterServices(ZKWatcher zkw, Configuration conf) {
99 super(conf);
100 this.zkw = zkw;
101 cm = new ZkCoordinatedStateManager(this);
104 @Override
105 public ZKWatcher getZooKeeper() {
106 return zkw;
109 @Override
110 public CoordinatedStateManager getCoordinatedStateManager() {
111 return cm;
114 @Override
115 public ServerManager getServerManager() {
116 return sm;
120 @Before
121 public void setup() throws Exception {
122 TEST_UTIL = new HBaseTestingUtil();
123 TEST_UTIL.startMiniZKCluster();
124 conf = TEST_UTIL.getConfiguration();
125 // Use a different ZK wrapper instance for each tests.
126 zkw =
127 new ZKWatcher(conf, "split-log-manager-tests" + TEST_UTIL.getRandomUUID().toString(), null);
128 master = new DummyMasterServices(zkw, conf);
130 ZKUtil.deleteChildrenRecursively(zkw, zkw.getZNodePaths().baseZNode);
131 ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().baseZNode);
132 assertTrue(ZKUtil.checkExists(zkw, zkw.getZNodePaths().baseZNode) != -1);
133 LOG.debug(zkw.getZNodePaths().baseZNode + " created");
134 ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().splitLogZNode);
135 assertTrue(ZKUtil.checkExists(zkw, zkw.getZNodePaths().splitLogZNode) != -1);
136 LOG.debug(zkw.getZNodePaths().splitLogZNode + " created");
138 resetCounters();
140 // By default, we let the test manage the error as before, so the server
141 // does not appear as dead from the master point of view, only from the split log pov.
142 Mockito.when(sm.isServerOnline(Mockito.any())).thenReturn(true);
144 to = 12000;
145 conf.setInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, to);
146 conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to);
148 conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
149 to = to + 16 * 100;
152 @After
153 public void teardown() throws IOException, KeeperException {
154 master.stop("");
155 if (slm != null) {
156 slm.stop();
158 TEST_UTIL.shutdownMiniZKCluster();
161 @Test
162 public void testBatchWaitMillis() {
163 assertEquals(100, SplitLogManager.getBatchWaitTimeMillis(0));
164 assertEquals(100, SplitLogManager.getBatchWaitTimeMillis(1));
165 assertEquals(1000, SplitLogManager.getBatchWaitTimeMillis(10));
166 assertEquals(60_000, SplitLogManager.getBatchWaitTimeMillis(101));
167 assertEquals(60_000, SplitLogManager.getBatchWaitTimeMillis(1011));
170 private interface Expr {
171 long eval();
174 private void waitForCounter(final LongAdder ctr, long oldval, long newval, long timems)
175 throws Exception {
176 Expr e = new Expr() {
177 @Override
178 public long eval() {
179 return ctr.sum();
182 waitForCounter(e, oldval, newval, timems);
183 return;
186 private void waitForCounter(final Expr e, final long oldval, long newval, long timems)
187 throws Exception {
189 TEST_UTIL.waitFor(timems, 10, new Waiter.Predicate<Exception>() {
190 @Override
191 public boolean evaluate() throws Exception {
192 return (e.eval() != oldval);
196 assertEquals(newval, e.eval());
199 private Task findOrCreateOrphanTask(String path) {
200 return slm.tasks.computeIfAbsent(path, k -> {
201 LOG.info("creating orphan task " + k);
202 SplitLogCounters.tot_mgr_orphan_task_acquired.increment();
203 return new Task();
207 private String submitTaskAndWait(TaskBatch batch, String name) throws KeeperException,
208 InterruptedException {
209 String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name);
210 NodeCreationListener listener = new NodeCreationListener(zkw, tasknode);
211 zkw.registerListener(listener);
212 ZKUtil.watchAndCheckExists(zkw, tasknode);
214 slm.enqueueSplitTask(name, batch);
215 assertEquals(1, batch.installed);
216 assertTrue(findOrCreateOrphanTask(tasknode).batch == batch);
217 assertEquals(1L, tot_mgr_node_create_queued.sum());
219 LOG.debug("waiting for task node creation");
220 listener.waitForCreation();
221 LOG.debug("task created");
222 return tasknode;
226 * Test whether the splitlog correctly creates a task in zookeeper
228 @Test
229 public void testTaskCreation() throws Exception {
231 LOG.info("TestTaskCreation - test the creation of a task in zk");
232 slm = new SplitLogManager(master, conf);
233 TaskBatch batch = new TaskBatch();
235 String tasknode = submitTaskAndWait(batch, "foo/1");
237 byte[] data = ZKUtil.getData(zkw, tasknode);
238 SplitLogTask slt = SplitLogTask.parseFrom(data);
239 LOG.info("Task node created " + slt.toString());
240 assertTrue(slt.isUnassigned(master.getServerName()));
243 @Test
244 public void testOrphanTaskAcquisition() throws Exception {
245 LOG.info("TestOrphanTaskAcquisition");
247 String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
248 SplitLogTask slt = new SplitLogTask.Owned(master.getServerName());
249 zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
250 CreateMode.PERSISTENT);
252 slm = new SplitLogManager(master, conf);
253 waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
254 Task task = findOrCreateOrphanTask(tasknode);
255 assertTrue(task.isOrphan());
256 waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
257 assertFalse(task.isUnassigned());
258 long curt = EnvironmentEdgeManager.currentTime();
259 assertTrue((task.last_update <= curt) &&
260 (task.last_update > (curt - 1000)));
261 LOG.info("waiting for manager to resubmit the orphan task");
262 waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
263 assertTrue(task.isUnassigned());
264 waitForCounter(tot_mgr_rescan, 0, 1, to + to/2);
267 @Test
268 public void testUnassignedOrphan() throws Exception {
269 LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" +
270 " startup");
271 String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
272 //create an unassigned orphan task
273 SplitLogTask slt = new SplitLogTask.Unassigned(master.getServerName());
274 zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
275 CreateMode.PERSISTENT);
276 int version = ZKUtil.checkExists(zkw, tasknode);
278 slm = new SplitLogManager(master, conf);
279 waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
280 Task task = findOrCreateOrphanTask(tasknode);
281 assertTrue(task.isOrphan());
282 assertTrue(task.isUnassigned());
283 // wait for RESCAN node to be created
284 waitForCounter(tot_mgr_rescan, 0, 1, to / 2);
285 Task task2 = findOrCreateOrphanTask(tasknode);
286 assertTrue(task == task2);
287 LOG.debug("task = " + task);
288 assertEquals(1L, tot_mgr_resubmit.sum());
289 assertEquals(1, task.incarnation.get());
290 assertEquals(0, task.unforcedResubmits.get());
291 assertTrue(task.isOrphan());
292 assertTrue(task.isUnassigned());
293 assertTrue(ZKUtil.checkExists(zkw, tasknode) > version);
296 @Test
297 public void testMultipleResubmits() throws Exception {
298 LOG.info("TestMultipleResbmits - no indefinite resubmissions");
299 conf.setInt("hbase.splitlog.max.resubmit", 2);
300 slm = new SplitLogManager(master, conf);
301 TaskBatch batch = new TaskBatch();
303 String tasknode = submitTaskAndWait(batch, "foo/1");
304 int version = ZKUtil.checkExists(zkw, tasknode);
305 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
306 final ServerName worker2 = ServerName.valueOf("worker2,1,1");
307 final ServerName worker3 = ServerName.valueOf("worker3,1,1");
308 SplitLogTask slt = new SplitLogTask.Owned(worker1);
309 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
310 waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
311 waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
312 int version1 = ZKUtil.checkExists(zkw, tasknode);
313 assertTrue(version1 > version);
314 slt = new SplitLogTask.Owned(worker2);
315 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
316 waitForCounter(tot_mgr_heartbeat, 1, 2, to/2);
317 waitForCounter(tot_mgr_resubmit, 1, 2, to + to/2);
318 int version2 = ZKUtil.checkExists(zkw, tasknode);
319 assertTrue(version2 > version1);
320 slt = new SplitLogTask.Owned(worker3);
321 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
322 waitForCounter(tot_mgr_heartbeat, 2, 3, to/2);
323 waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to/2);
324 Thread.sleep(to + to/2);
325 assertEquals(2L, tot_mgr_resubmit.sum() - tot_mgr_resubmit_force.sum());
328 @Test
329 public void testRescanCleanup() throws Exception {
330 LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
332 slm = new SplitLogManager(master, conf);
333 TaskBatch batch = new TaskBatch();
335 String tasknode = submitTaskAndWait(batch, "foo/1");
336 int version = ZKUtil.checkExists(zkw, tasknode);
337 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
338 SplitLogTask slt = new SplitLogTask.Owned(worker1);
339 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
340 waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
341 waitForCounter(new Expr() {
342 @Override
343 public long eval() {
344 return (tot_mgr_resubmit.sum() + tot_mgr_resubmit_failed.sum());
346 }, 0, 1, 5*60000); // wait long enough
347 Assert.assertEquals("Could not run test. Lost ZK connection?",
348 0, tot_mgr_resubmit_failed.sum());
349 int version1 = ZKUtil.checkExists(zkw, tasknode);
350 assertTrue(version1 > version);
351 byte[] taskstate = ZKUtil.getData(zkw, tasknode);
352 slt = SplitLogTask.parseFrom(taskstate);
353 assertTrue(slt.isUnassigned(master.getServerName()));
355 waitForCounter(tot_mgr_rescan_deleted, 0, 1, to/2);
358 @Test
359 public void testTaskDone() throws Exception {
360 LOG.info("TestTaskDone - cleanup task node once in DONE state");
362 slm = new SplitLogManager(master, conf);
363 TaskBatch batch = new TaskBatch();
364 String tasknode = submitTaskAndWait(batch, "foo/1");
365 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
366 SplitLogTask slt = new SplitLogTask.Done(worker1);
367 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
368 synchronized (batch) {
369 while (batch.installed != batch.done) {
370 batch.wait();
373 waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
374 assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
377 @Test
378 public void testTaskErr() throws Exception {
379 LOG.info("TestTaskErr - cleanup task node once in ERR state");
381 conf.setInt("hbase.splitlog.max.resubmit", 0);
382 slm = new SplitLogManager(master, conf);
383 TaskBatch batch = new TaskBatch();
385 String tasknode = submitTaskAndWait(batch, "foo/1");
386 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
387 SplitLogTask slt = new SplitLogTask.Err(worker1);
388 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
390 synchronized (batch) {
391 while (batch.installed != batch.error) {
392 batch.wait();
395 waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
396 assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
397 conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination.DEFAULT_MAX_RESUBMIT);
400 @Test
401 public void testTaskResigned() throws Exception {
402 LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
403 assertEquals(0, tot_mgr_resubmit.sum());
404 slm = new SplitLogManager(master, conf);
405 assertEquals(0, tot_mgr_resubmit.sum());
406 TaskBatch batch = new TaskBatch();
407 String tasknode = submitTaskAndWait(batch, "foo/1");
408 assertEquals(0, tot_mgr_resubmit.sum());
409 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
410 assertEquals(0, tot_mgr_resubmit.sum());
411 SplitLogTask slt = new SplitLogTask.Resigned(worker1);
412 assertEquals(0, tot_mgr_resubmit.sum());
413 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
414 ZKUtil.checkExists(zkw, tasknode);
415 // Could be small race here.
416 if (tot_mgr_resubmit.sum() == 0) {
417 waitForCounter(tot_mgr_resubmit, 0, 1, to/2);
419 assertEquals(1, tot_mgr_resubmit.sum());
421 byte[] taskstate = ZKUtil.getData(zkw, tasknode);
422 slt = SplitLogTask.parseFrom(taskstate);
423 assertTrue(slt.isUnassigned(master.getServerName()));
426 @Test
427 public void testUnassignedTimeout() throws Exception {
428 LOG.info("TestUnassignedTimeout - iff all tasks are unassigned then" +
429 " resubmit");
431 // create an orphan task in OWNED state
432 String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1");
433 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
434 SplitLogTask slt = new SplitLogTask.Owned(worker1);
435 zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
436 CreateMode.PERSISTENT);
438 slm = new SplitLogManager(master, conf);
439 waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
441 // submit another task which will stay in unassigned mode
442 TaskBatch batch = new TaskBatch();
443 submitTaskAndWait(batch, "foo/1");
445 // keep updating the orphan owned node every to/2 seconds
446 for (int i = 0; i < (3 * to)/100; i++) {
447 Thread.sleep(100);
448 final ServerName worker2 = ServerName.valueOf("worker1,1,1");
449 slt = new SplitLogTask.Owned(worker2);
450 ZKUtil.setData(zkw, tasknode1, slt.toByteArray());
453 // since we have stopped heartbeating the owned node therefore it should
454 // get resubmitted
455 LOG.info("waiting for manager to resubmit the orphan task");
456 waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
458 // now all the nodes are unassigned. manager should post another rescan
459 waitForCounter(tot_mgr_resubmit_unassigned, 0, 1, 2 * to + to/2);
462 @Test
463 public void testDeadWorker() throws Exception {
464 LOG.info("testDeadWorker");
466 conf.setLong("hbase.splitlog.max.resubmit", 0);
467 slm = new SplitLogManager(master, conf);
468 TaskBatch batch = new TaskBatch();
470 String tasknode = submitTaskAndWait(batch, "foo/1");
471 int version = ZKUtil.checkExists(zkw, tasknode);
472 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
473 SplitLogTask slt = new SplitLogTask.Owned(worker1);
474 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
475 if (tot_mgr_heartbeat.sum() == 0) {
476 waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
478 slm.handleDeadWorker(worker1);
479 if (tot_mgr_resubmit.sum() == 0) {
480 waitForCounter(tot_mgr_resubmit, 0, 1, to+to/2);
482 if (tot_mgr_resubmit_dead_server_task.sum() == 0) {
483 waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, to + to/2);
486 int version1 = ZKUtil.checkExists(zkw, tasknode);
487 assertTrue(version1 > version);
488 byte[] taskstate = ZKUtil.getData(zkw, tasknode);
489 slt = SplitLogTask.parseFrom(taskstate);
490 assertTrue(slt.isUnassigned(master.getServerName()));
491 return;
494 @Test
495 public void testWorkerCrash() throws Exception {
496 slm = new SplitLogManager(master, conf);
497 TaskBatch batch = new TaskBatch();
499 String tasknode = submitTaskAndWait(batch, "foo/1");
500 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
502 SplitLogTask slt = new SplitLogTask.Owned(worker1);
503 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
504 if (tot_mgr_heartbeat.sum() == 0) {
505 waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
508 // Not yet resubmitted.
509 Assert.assertEquals(0, tot_mgr_resubmit.sum());
511 // This server becomes dead
512 Mockito.when(sm.isServerOnline(worker1)).thenReturn(false);
514 Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded).
516 // It has been resubmitted
517 Assert.assertEquals(1, tot_mgr_resubmit.sum());
520 @Test
521 public void testEmptyLogDir() throws Exception {
522 LOG.info("testEmptyLogDir");
523 slm = new SplitLogManager(master, conf);
524 FileSystem fs = TEST_UTIL.getTestFileSystem();
525 Path emptyLogDirPath = new Path(new Path(fs.getWorkingDirectory(),
526 HConstants.HREGION_LOGDIR_NAME),
527 ServerName.valueOf("emptyLogDir", 1, 1).toString());
528 fs.mkdirs(emptyLogDirPath);
529 slm.splitLogDistributed(emptyLogDirPath);
530 assertFalse(fs.exists(emptyLogDirPath));
533 @Test
534 public void testLogFilesAreArchived() throws Exception {
535 LOG.info("testLogFilesAreArchived");
536 slm = new SplitLogManager(master, conf);
537 FileSystem fs = TEST_UTIL.getTestFileSystem();
538 Path dir = TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived");
539 conf.set(HConstants.HBASE_DIR, dir.toString());
540 String serverName = ServerName.valueOf("foo", 1, 1).toString();
541 Path logDirPath = new Path(new Path(dir, HConstants.HREGION_LOGDIR_NAME), serverName);
542 fs.mkdirs(logDirPath);
543 // create an empty log file
544 String logFile = new Path(logDirPath, TEST_UTIL.getRandomUUID().toString()).toString();
545 fs.create(new Path(logDirPath, logFile)).close();
547 // spin up a thread mocking split done.
548 new Thread() {
549 @Override
550 public void run() {
551 boolean done = false;
552 while (!done) {
553 for (Map.Entry<String, Task> entry : slm.getTasks().entrySet()) {
554 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
555 SplitLogTask slt = new SplitLogTask.Done(worker1);
556 boolean encounteredZKException = false;
557 try {
558 ZKUtil.setData(zkw, entry.getKey(), slt.toByteArray());
559 } catch (KeeperException e) {
560 LOG.warn(e.toString(), e);
561 encounteredZKException = true;
563 if (!encounteredZKException) {
564 done = true;
569 }.start();
571 slm.splitLogDistributed(logDirPath);
573 assertFalse(fs.exists(logDirPath));