HBASE-23949 refactor loadBalancer implements for rsgroup balance by table to achieve...
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / master / TestSplitLogManager.java
blob00ead6501a14e4407c8ac81ce3cdc7d34658a93c
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.master;
20 import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters;
21 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat;
22 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued;
23 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired;
24 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan;
25 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted;
26 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit;
27 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_dead_server_task;
28 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_failed;
29 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_force;
30 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_threshold_reached;
31 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_unassigned;
32 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_task_deleted;
33 import static org.junit.Assert.assertEquals;
34 import static org.junit.Assert.assertFalse;
35 import static org.junit.Assert.assertTrue;
36 import java.io.IOException;
37 import java.util.Map;
38 import java.util.concurrent.atomic.LongAdder;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.fs.FileSystem;
41 import org.apache.hadoop.fs.Path;
42 import org.apache.hadoop.hbase.CoordinatedStateManager;
43 import org.apache.hadoop.hbase.HBaseClassTestRule;
44 import org.apache.hadoop.hbase.HBaseTestingUtility;
45 import org.apache.hadoop.hbase.HConstants;
46 import org.apache.hadoop.hbase.ServerName;
47 import org.apache.hadoop.hbase.SplitLogCounters;
48 import org.apache.hadoop.hbase.SplitLogTask;
49 import org.apache.hadoop.hbase.Waiter;
50 import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
51 import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
52 import org.apache.hadoop.hbase.master.SplitLogManager.Task;
53 import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
54 import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener;
55 import org.apache.hadoop.hbase.testclassification.LargeTests;
56 import org.apache.hadoop.hbase.testclassification.MasterTests;
57 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
58 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
59 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
60 import org.apache.zookeeper.CreateMode;
61 import org.apache.zookeeper.KeeperException;
62 import org.apache.zookeeper.ZooDefs.Ids;
63 import org.junit.After;
64 import org.junit.Assert;
65 import org.junit.Before;
66 import org.junit.ClassRule;
67 import org.junit.Test;
68 import org.junit.experimental.categories.Category;
69 import org.mockito.Mockito;
70 import org.slf4j.Logger;
71 import org.slf4j.LoggerFactory;
73 @Category({MasterTests.class, LargeTests.class})
74 public class TestSplitLogManager {
76 @ClassRule
77 public static final HBaseClassTestRule CLASS_RULE =
78 HBaseClassTestRule.forClass(TestSplitLogManager.class);
80 private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class);
82 private final ServerManager sm = Mockito.mock(ServerManager.class);
84 private ZKWatcher zkw;
85 private DummyMasterServices master;
86 private SplitLogManager slm;
87 private Configuration conf;
88 private int to;
90 private static HBaseTestingUtility TEST_UTIL;
92 class DummyMasterServices extends MockNoopMasterServices {
93 private ZKWatcher zkw;
94 private CoordinatedStateManager cm;
96 public DummyMasterServices(ZKWatcher zkw, Configuration conf) {
97 super(conf);
98 this.zkw = zkw;
99 cm = new ZkCoordinatedStateManager(this);
102 @Override
103 public ZKWatcher getZooKeeper() {
104 return zkw;
107 @Override
108 public CoordinatedStateManager getCoordinatedStateManager() {
109 return cm;
112 @Override
113 public ServerManager getServerManager() {
114 return sm;
118 @Before
119 public void setup() throws Exception {
120 TEST_UTIL = new HBaseTestingUtility();
121 TEST_UTIL.startMiniZKCluster();
122 conf = TEST_UTIL.getConfiguration();
123 // Use a different ZK wrapper instance for each tests.
124 zkw =
125 new ZKWatcher(conf, "split-log-manager-tests" + TEST_UTIL.getRandomUUID().toString(), null);
126 master = new DummyMasterServices(zkw, conf);
128 ZKUtil.deleteChildrenRecursively(zkw, zkw.getZNodePaths().baseZNode);
129 ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().baseZNode);
130 assertTrue(ZKUtil.checkExists(zkw, zkw.getZNodePaths().baseZNode) != -1);
131 LOG.debug(zkw.getZNodePaths().baseZNode + " created");
132 ZKUtil.createAndFailSilent(zkw, zkw.getZNodePaths().splitLogZNode);
133 assertTrue(ZKUtil.checkExists(zkw, zkw.getZNodePaths().splitLogZNode) != -1);
134 LOG.debug(zkw.getZNodePaths().splitLogZNode + " created");
136 resetCounters();
138 // By default, we let the test manage the error as before, so the server
139 // does not appear as dead from the master point of view, only from the split log pov.
140 Mockito.when(sm.isServerOnline(Mockito.any())).thenReturn(true);
142 to = 12000;
143 conf.setInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, to);
144 conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to);
146 conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
147 to = to + 16 * 100;
150 @After
151 public void teardown() throws IOException, KeeperException {
152 master.stop("");
153 if (slm != null) {
154 slm.stop();
156 TEST_UTIL.shutdownMiniZKCluster();
159 private interface Expr {
160 long eval();
163 private void waitForCounter(final LongAdder ctr, long oldval, long newval, long timems)
164 throws Exception {
165 Expr e = new Expr() {
166 @Override
167 public long eval() {
168 return ctr.sum();
171 waitForCounter(e, oldval, newval, timems);
172 return;
175 private void waitForCounter(final Expr e, final long oldval, long newval, long timems)
176 throws Exception {
178 TEST_UTIL.waitFor(timems, 10, new Waiter.Predicate<Exception>() {
179 @Override
180 public boolean evaluate() throws Exception {
181 return (e.eval() != oldval);
185 assertEquals(newval, e.eval());
188 private Task findOrCreateOrphanTask(String path) {
189 return slm.tasks.computeIfAbsent(path, k -> {
190 LOG.info("creating orphan task " + k);
191 SplitLogCounters.tot_mgr_orphan_task_acquired.increment();
192 return new Task();
196 private String submitTaskAndWait(TaskBatch batch, String name) throws KeeperException,
197 InterruptedException {
198 String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name);
199 NodeCreationListener listener = new NodeCreationListener(zkw, tasknode);
200 zkw.registerListener(listener);
201 ZKUtil.watchAndCheckExists(zkw, tasknode);
203 slm.enqueueSplitTask(name, batch);
204 assertEquals(1, batch.installed);
205 assertTrue(findOrCreateOrphanTask(tasknode).batch == batch);
206 assertEquals(1L, tot_mgr_node_create_queued.sum());
208 LOG.debug("waiting for task node creation");
209 listener.waitForCreation();
210 LOG.debug("task created");
211 return tasknode;
215 * Test whether the splitlog correctly creates a task in zookeeper
217 @Test
218 public void testTaskCreation() throws Exception {
220 LOG.info("TestTaskCreation - test the creation of a task in zk");
221 slm = new SplitLogManager(master, conf);
222 TaskBatch batch = new TaskBatch();
224 String tasknode = submitTaskAndWait(batch, "foo/1");
226 byte[] data = ZKUtil.getData(zkw, tasknode);
227 SplitLogTask slt = SplitLogTask.parseFrom(data);
228 LOG.info("Task node created " + slt.toString());
229 assertTrue(slt.isUnassigned(master.getServerName()));
232 @Test
233 public void testOrphanTaskAcquisition() throws Exception {
234 LOG.info("TestOrphanTaskAcquisition");
236 String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
237 SplitLogTask slt = new SplitLogTask.Owned(master.getServerName());
238 zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
239 CreateMode.PERSISTENT);
241 slm = new SplitLogManager(master, conf);
242 waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
243 Task task = findOrCreateOrphanTask(tasknode);
244 assertTrue(task.isOrphan());
245 waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
246 assertFalse(task.isUnassigned());
247 long curt = System.currentTimeMillis();
248 assertTrue((task.last_update <= curt) &&
249 (task.last_update > (curt - 1000)));
250 LOG.info("waiting for manager to resubmit the orphan task");
251 waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
252 assertTrue(task.isUnassigned());
253 waitForCounter(tot_mgr_rescan, 0, 1, to + to/2);
256 @Test
257 public void testUnassignedOrphan() throws Exception {
258 LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" +
259 " startup");
260 String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
261 //create an unassigned orphan task
262 SplitLogTask slt = new SplitLogTask.Unassigned(master.getServerName());
263 zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
264 CreateMode.PERSISTENT);
265 int version = ZKUtil.checkExists(zkw, tasknode);
267 slm = new SplitLogManager(master, conf);
268 waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
269 Task task = findOrCreateOrphanTask(tasknode);
270 assertTrue(task.isOrphan());
271 assertTrue(task.isUnassigned());
272 // wait for RESCAN node to be created
273 waitForCounter(tot_mgr_rescan, 0, 1, to / 2);
274 Task task2 = findOrCreateOrphanTask(tasknode);
275 assertTrue(task == task2);
276 LOG.debug("task = " + task);
277 assertEquals(1L, tot_mgr_resubmit.sum());
278 assertEquals(1, task.incarnation.get());
279 assertEquals(0, task.unforcedResubmits.get());
280 assertTrue(task.isOrphan());
281 assertTrue(task.isUnassigned());
282 assertTrue(ZKUtil.checkExists(zkw, tasknode) > version);
285 @Test
286 public void testMultipleResubmits() throws Exception {
287 LOG.info("TestMultipleResbmits - no indefinite resubmissions");
288 conf.setInt("hbase.splitlog.max.resubmit", 2);
289 slm = new SplitLogManager(master, conf);
290 TaskBatch batch = new TaskBatch();
292 String tasknode = submitTaskAndWait(batch, "foo/1");
293 int version = ZKUtil.checkExists(zkw, tasknode);
294 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
295 final ServerName worker2 = ServerName.valueOf("worker2,1,1");
296 final ServerName worker3 = ServerName.valueOf("worker3,1,1");
297 SplitLogTask slt = new SplitLogTask.Owned(worker1);
298 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
299 waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
300 waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
301 int version1 = ZKUtil.checkExists(zkw, tasknode);
302 assertTrue(version1 > version);
303 slt = new SplitLogTask.Owned(worker2);
304 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
305 waitForCounter(tot_mgr_heartbeat, 1, 2, to/2);
306 waitForCounter(tot_mgr_resubmit, 1, 2, to + to/2);
307 int version2 = ZKUtil.checkExists(zkw, tasknode);
308 assertTrue(version2 > version1);
309 slt = new SplitLogTask.Owned(worker3);
310 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
311 waitForCounter(tot_mgr_heartbeat, 2, 3, to/2);
312 waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to/2);
313 Thread.sleep(to + to/2);
314 assertEquals(2L, tot_mgr_resubmit.sum() - tot_mgr_resubmit_force.sum());
317 @Test
318 public void testRescanCleanup() throws Exception {
319 LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
321 slm = new SplitLogManager(master, conf);
322 TaskBatch batch = new TaskBatch();
324 String tasknode = submitTaskAndWait(batch, "foo/1");
325 int version = ZKUtil.checkExists(zkw, tasknode);
326 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
327 SplitLogTask slt = new SplitLogTask.Owned(worker1);
328 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
329 waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
330 waitForCounter(new Expr() {
331 @Override
332 public long eval() {
333 return (tot_mgr_resubmit.sum() + tot_mgr_resubmit_failed.sum());
335 }, 0, 1, 5*60000); // wait long enough
336 Assert.assertEquals("Could not run test. Lost ZK connection?",
337 0, tot_mgr_resubmit_failed.sum());
338 int version1 = ZKUtil.checkExists(zkw, tasknode);
339 assertTrue(version1 > version);
340 byte[] taskstate = ZKUtil.getData(zkw, tasknode);
341 slt = SplitLogTask.parseFrom(taskstate);
342 assertTrue(slt.isUnassigned(master.getServerName()));
344 waitForCounter(tot_mgr_rescan_deleted, 0, 1, to/2);
347 @Test
348 public void testTaskDone() throws Exception {
349 LOG.info("TestTaskDone - cleanup task node once in DONE state");
351 slm = new SplitLogManager(master, conf);
352 TaskBatch batch = new TaskBatch();
353 String tasknode = submitTaskAndWait(batch, "foo/1");
354 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
355 SplitLogTask slt = new SplitLogTask.Done(worker1);
356 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
357 synchronized (batch) {
358 while (batch.installed != batch.done) {
359 batch.wait();
362 waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
363 assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
366 @Test
367 public void testTaskErr() throws Exception {
368 LOG.info("TestTaskErr - cleanup task node once in ERR state");
370 conf.setInt("hbase.splitlog.max.resubmit", 0);
371 slm = new SplitLogManager(master, conf);
372 TaskBatch batch = new TaskBatch();
374 String tasknode = submitTaskAndWait(batch, "foo/1");
375 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
376 SplitLogTask slt = new SplitLogTask.Err(worker1);
377 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
379 synchronized (batch) {
380 while (batch.installed != batch.error) {
381 batch.wait();
384 waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
385 assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
386 conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination.DEFAULT_MAX_RESUBMIT);
389 @Test
390 public void testTaskResigned() throws Exception {
391 LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
392 assertEquals(0, tot_mgr_resubmit.sum());
393 slm = new SplitLogManager(master, conf);
394 assertEquals(0, tot_mgr_resubmit.sum());
395 TaskBatch batch = new TaskBatch();
396 String tasknode = submitTaskAndWait(batch, "foo/1");
397 assertEquals(0, tot_mgr_resubmit.sum());
398 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
399 assertEquals(0, tot_mgr_resubmit.sum());
400 SplitLogTask slt = new SplitLogTask.Resigned(worker1);
401 assertEquals(0, tot_mgr_resubmit.sum());
402 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
403 ZKUtil.checkExists(zkw, tasknode);
404 // Could be small race here.
405 if (tot_mgr_resubmit.sum() == 0) {
406 waitForCounter(tot_mgr_resubmit, 0, 1, to/2);
408 assertEquals(1, tot_mgr_resubmit.sum());
410 byte[] taskstate = ZKUtil.getData(zkw, tasknode);
411 slt = SplitLogTask.parseFrom(taskstate);
412 assertTrue(slt.isUnassigned(master.getServerName()));
415 @Test
416 public void testUnassignedTimeout() throws Exception {
417 LOG.info("TestUnassignedTimeout - iff all tasks are unassigned then" +
418 " resubmit");
420 // create an orphan task in OWNED state
421 String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1");
422 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
423 SplitLogTask slt = new SplitLogTask.Owned(worker1);
424 zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
425 CreateMode.PERSISTENT);
427 slm = new SplitLogManager(master, conf);
428 waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
430 // submit another task which will stay in unassigned mode
431 TaskBatch batch = new TaskBatch();
432 submitTaskAndWait(batch, "foo/1");
434 // keep updating the orphan owned node every to/2 seconds
435 for (int i = 0; i < (3 * to)/100; i++) {
436 Thread.sleep(100);
437 final ServerName worker2 = ServerName.valueOf("worker1,1,1");
438 slt = new SplitLogTask.Owned(worker2);
439 ZKUtil.setData(zkw, tasknode1, slt.toByteArray());
442 // since we have stopped heartbeating the owned node therefore it should
443 // get resubmitted
444 LOG.info("waiting for manager to resubmit the orphan task");
445 waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
447 // now all the nodes are unassigned. manager should post another rescan
448 waitForCounter(tot_mgr_resubmit_unassigned, 0, 1, 2 * to + to/2);
451 @Test
452 public void testDeadWorker() throws Exception {
453 LOG.info("testDeadWorker");
455 conf.setLong("hbase.splitlog.max.resubmit", 0);
456 slm = new SplitLogManager(master, conf);
457 TaskBatch batch = new TaskBatch();
459 String tasknode = submitTaskAndWait(batch, "foo/1");
460 int version = ZKUtil.checkExists(zkw, tasknode);
461 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
462 SplitLogTask slt = new SplitLogTask.Owned(worker1);
463 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
464 if (tot_mgr_heartbeat.sum() == 0) {
465 waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
467 slm.handleDeadWorker(worker1);
468 if (tot_mgr_resubmit.sum() == 0) {
469 waitForCounter(tot_mgr_resubmit, 0, 1, to+to/2);
471 if (tot_mgr_resubmit_dead_server_task.sum() == 0) {
472 waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, to + to/2);
475 int version1 = ZKUtil.checkExists(zkw, tasknode);
476 assertTrue(version1 > version);
477 byte[] taskstate = ZKUtil.getData(zkw, tasknode);
478 slt = SplitLogTask.parseFrom(taskstate);
479 assertTrue(slt.isUnassigned(master.getServerName()));
480 return;
483 @Test
484 public void testWorkerCrash() throws Exception {
485 slm = new SplitLogManager(master, conf);
486 TaskBatch batch = new TaskBatch();
488 String tasknode = submitTaskAndWait(batch, "foo/1");
489 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
491 SplitLogTask slt = new SplitLogTask.Owned(worker1);
492 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
493 if (tot_mgr_heartbeat.sum() == 0) {
494 waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
497 // Not yet resubmitted.
498 Assert.assertEquals(0, tot_mgr_resubmit.sum());
500 // This server becomes dead
501 Mockito.when(sm.isServerOnline(worker1)).thenReturn(false);
503 Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded).
505 // It has been resubmitted
506 Assert.assertEquals(1, tot_mgr_resubmit.sum());
509 @Test
510 public void testEmptyLogDir() throws Exception {
511 LOG.info("testEmptyLogDir");
512 slm = new SplitLogManager(master, conf);
513 FileSystem fs = TEST_UTIL.getTestFileSystem();
514 Path emptyLogDirPath = new Path(new Path(fs.getWorkingDirectory(),
515 HConstants.HREGION_LOGDIR_NAME),
516 ServerName.valueOf("emptyLogDir", 1, 1).toString());
517 fs.mkdirs(emptyLogDirPath);
518 slm.splitLogDistributed(emptyLogDirPath);
519 assertFalse(fs.exists(emptyLogDirPath));
522 @Test
523 public void testLogFilesAreArchived() throws Exception {
524 LOG.info("testLogFilesAreArchived");
525 slm = new SplitLogManager(master, conf);
526 FileSystem fs = TEST_UTIL.getTestFileSystem();
527 Path dir = TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived");
528 conf.set(HConstants.HBASE_DIR, dir.toString());
529 String serverName = ServerName.valueOf("foo", 1, 1).toString();
530 Path logDirPath = new Path(new Path(dir, HConstants.HREGION_LOGDIR_NAME), serverName);
531 fs.mkdirs(logDirPath);
532 // create an empty log file
533 String logFile = new Path(logDirPath, TEST_UTIL.getRandomUUID().toString()).toString();
534 fs.create(new Path(logDirPath, logFile)).close();
536 // spin up a thread mocking split done.
537 new Thread() {
538 @Override
539 public void run() {
540 boolean done = false;
541 while (!done) {
542 for (Map.Entry<String, Task> entry : slm.getTasks().entrySet()) {
543 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
544 SplitLogTask slt = new SplitLogTask.Done(worker1);
545 boolean encounteredZKException = false;
546 try {
547 ZKUtil.setData(zkw, entry.getKey(), slt.toByteArray());
548 } catch (KeeperException e) {
549 LOG.warn(e.toString(), e);
550 encounteredZKException = true;
552 if (!encounteredZKException) {
553 done = true;
558 }.start();
560 slm.splitLogDistributed(logDirPath);
562 assertFalse(fs.exists(logDirPath));