2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.master
;
20 import static org
.apache
.hadoop
.hbase
.master
.SplitLogManager
.ResubmitDirective
.CHECK
;
21 import static org
.apache
.hadoop
.hbase
.master
.SplitLogManager
.ResubmitDirective
.FORCE
;
22 import static org
.apache
.hadoop
.hbase
.master
.SplitLogManager
.TerminationStatus
.DELETED
;
23 import static org
.apache
.hadoop
.hbase
.master
.SplitLogManager
.TerminationStatus
.FAILURE
;
24 import static org
.apache
.hadoop
.hbase
.master
.SplitLogManager
.TerminationStatus
.IN_PROGRESS
;
25 import static org
.apache
.hadoop
.hbase
.master
.SplitLogManager
.TerminationStatus
.SUCCESS
;
27 import java
.io
.IOException
;
28 import java
.util
.ArrayList
;
29 import java
.util
.Arrays
;
30 import java
.util
.Collections
;
31 import java
.util
.HashSet
;
32 import java
.util
.List
;
35 import java
.util
.concurrent
.ConcurrentHashMap
;
36 import java
.util
.concurrent
.ConcurrentMap
;
37 import java
.util
.concurrent
.atomic
.AtomicInteger
;
39 import org
.apache
.hadoop
.conf
.Configuration
;
40 import org
.apache
.hadoop
.fs
.FileStatus
;
41 import org
.apache
.hadoop
.fs
.FileSystem
;
42 import org
.apache
.hadoop
.fs
.Path
;
43 import org
.apache
.hadoop
.fs
.PathFilter
;
44 import org
.apache
.hadoop
.hbase
.ChoreService
;
45 import org
.apache
.hadoop
.hbase
.ScheduledChore
;
46 import org
.apache
.hadoop
.hbase
.ServerName
;
47 import org
.apache
.hadoop
.hbase
.SplitLogCounters
;
48 import org
.apache
.hadoop
.hbase
.Stoppable
;
49 import org
.apache
.hadoop
.hbase
.coordination
.SplitLogManagerCoordination
;
50 import org
.apache
.hadoop
.hbase
.coordination
.SplitLogManagerCoordination
.SplitLogManagerDetails
;
51 import org
.apache
.hadoop
.hbase
.log
.HBaseMarkers
;
52 import org
.apache
.hadoop
.hbase
.monitoring
.MonitoredTask
;
53 import org
.apache
.hadoop
.hbase
.monitoring
.TaskMonitor
;
54 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
55 import org
.apache
.hadoop
.hbase
.util
.FSUtils
;
56 import org
.apache
.hadoop
.hbase
.util
.HasThread
;
57 import org
.apache
.hadoop
.hbase
.wal
.AbstractFSWALProvider
;
58 import org
.apache
.yetus
.audience
.InterfaceAudience
;
59 import org
.slf4j
.Logger
;
60 import org
.slf4j
.LoggerFactory
;
61 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.annotations
.VisibleForTesting
;
64 * Distributes the task of log splitting to the available region servers.
65 * Coordination happens via coordination engine. For every log file that has to be split a
66 * task is created. SplitLogWorkers race to grab a task.
68 * <p>SplitLogManager monitors the tasks that it creates using the
69 * timeoutMonitor thread. If a task's progress is slow then
70 * {@link SplitLogManagerCoordination#checkTasks} will take away the
71 * task from the owner {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}
72 * and the task will be up for grabs again. When the task is done then it is
73 * deleted by SplitLogManager.
75 * <p>Clients call {@link #splitLogDistributed(Path)} to split a region server's
76 * log files. The caller thread waits in this method until all the log files
79 * <p>All the coordination calls made by this class are asynchronous. This is mainly
80 * to help reduce response time seen by the callers.
82 * <p>There is race in this design between the SplitLogManager and the
83 * SplitLogWorker. SplitLogManager might re-queue a task that has in reality
84 * already been completed by a SplitLogWorker. We rely on the idempotency of
85 * the log splitting task for correctness.
87 * <p>It is also assumed that every log splitting task is unique and once
88 * completed (either with success or with error) it will be not be submitted
89 * again. If a task is resubmitted then there is a risk that old "delete task"
90 * can delete the re-submission.
92 @InterfaceAudience.Private
93 public class SplitLogManager
{
94 private static final Logger LOG
= LoggerFactory
.getLogger(SplitLogManager
.class);
96 private final MasterServices server
;
98 private final Configuration conf
;
99 private final ChoreService choreService
;
101 public static final int DEFAULT_UNASSIGNED_TIMEOUT
= (3 * 60 * 1000); // 3 min
103 private long unassignedTimeout
;
104 private long lastTaskCreateTime
= Long
.MAX_VALUE
;
107 final ConcurrentMap
<String
, Task
> tasks
= new ConcurrentHashMap
<>();
108 private TimeoutMonitor timeoutMonitor
;
110 private volatile Set
<ServerName
> deadWorkers
= null;
111 private final Object deadWorkersLock
= new Object();
114 * Its OK to construct this object even when region-servers are not online. It does lookup the
115 * orphan tasks in coordination engine but it doesn't block waiting for them to be done.
116 * @param master the master services
117 * @param conf the HBase configuration
118 * @throws IOException
120 public SplitLogManager(MasterServices master
, Configuration conf
)
122 this.server
= master
;
124 // Get Server Thread name. Sometimes the Server is mocked so may not implement HasThread.
125 // For example, in tests.
126 String name
= master
instanceof HasThread?
((HasThread
)master
).getName():
127 master
.getServerName().toShortString();
129 new ChoreService(name
+ ".splitLogManager.");
130 if (server
.getCoordinatedStateManager() != null) {
131 SplitLogManagerCoordination coordination
= getSplitLogManagerCoordination();
132 Set
<String
> failedDeletions
= Collections
.synchronizedSet(new HashSet
<String
>());
133 SplitLogManagerDetails details
= new SplitLogManagerDetails(tasks
, master
, failedDeletions
);
134 coordination
.setDetails(details
);
137 this.unassignedTimeout
=
138 conf
.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT
);
139 this.timeoutMonitor
=
140 new TimeoutMonitor(conf
.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000),
142 choreService
.scheduleChore(timeoutMonitor
);
145 private SplitLogManagerCoordination
getSplitLogManagerCoordination() {
146 return server
.getCoordinatedStateManager().getSplitLogManagerCoordination();
149 private FileStatus
[] getFileList(List
<Path
> logDirs
, PathFilter filter
) throws IOException
{
150 return getFileList(conf
, logDirs
, filter
);
154 * Get a list of paths that need to be split given a set of server-specific directories and
155 * optionally a filter.
157 * See {@link AbstractFSWALProvider#getServerNameFromWALDirectoryName} for more info on directory
160 * Should be package-private, but is needed by
161 * {@link org.apache.hadoop.hbase.wal.WALSplitter#split(Path, Path, Path, FileSystem,
162 * Configuration, org.apache.hadoop.hbase.wal.WALFactory)} for tests.
165 public static FileStatus
[] getFileList(final Configuration conf
, final List
<Path
> logDirs
,
166 final PathFilter filter
)
168 List
<FileStatus
> fileStatus
= new ArrayList
<>();
169 for (Path logDir
: logDirs
) {
170 final FileSystem fs
= logDir
.getFileSystem(conf
);
171 if (!fs
.exists(logDir
)) {
172 LOG
.warn(logDir
+ " doesn't exist. Nothing to do!");
175 FileStatus
[] logfiles
= FSUtils
.listStatus(fs
, logDir
, filter
);
176 if (logfiles
== null || logfiles
.length
== 0) {
177 LOG
.info("{} dir is empty, no logs to split.", logDir
);
179 Collections
.addAll(fileStatus
, logfiles
);
182 FileStatus
[] a
= new FileStatus
[fileStatus
.size()];
183 return fileStatus
.toArray(a
);
187 * @param logDir one region sever wal dir path in .logs
188 * @throws IOException if there was an error while splitting any log file
189 * @return cumulative size of the logfiles split
190 * @throws IOException
192 public long splitLogDistributed(final Path logDir
) throws IOException
{
193 List
<Path
> logDirs
= new ArrayList
<>();
195 return splitLogDistributed(logDirs
);
199 * The caller will block until all the log files of the given region server have been processed -
200 * successfully split or an error is encountered - by an available worker region server. This
201 * method must only be called after the region servers have been brought online.
202 * @param logDirs List of log dirs to split
203 * @throws IOException If there was an error while splitting any log file
204 * @return cumulative size of the logfiles split
206 public long splitLogDistributed(final List
<Path
> logDirs
) throws IOException
{
207 if (logDirs
.isEmpty()) {
210 Set
<ServerName
> serverNames
= new HashSet
<>();
211 for (Path logDir
: logDirs
) {
213 ServerName serverName
= AbstractFSWALProvider
.getServerNameFromWALDirectoryName(logDir
);
214 if (serverName
!= null) {
215 serverNames
.add(serverName
);
217 } catch (IllegalArgumentException e
) {
218 // ignore invalid format error.
219 LOG
.warn("Cannot parse server name from " + logDir
);
222 return splitLogDistributed(serverNames
, logDirs
, null);
226 * The caller will block until all the hbase:meta log files of the given region server have been
227 * processed - successfully split or an error is encountered - by an available worker region
228 * server. This method must only be called after the region servers have been brought online.
229 * @param logDirs List of log dirs to split
230 * @param filter the Path filter to select specific files for considering
231 * @throws IOException If there was an error while splitting any log file
232 * @return cumulative size of the logfiles split
234 public long splitLogDistributed(final Set
<ServerName
> serverNames
, final List
<Path
> logDirs
,
235 PathFilter filter
) throws IOException
{
236 MonitoredTask status
= TaskMonitor
.get().createStatus("Doing distributed log split in " +
237 logDirs
+ " for serverName=" + serverNames
);
239 TaskBatch batch
= null;
241 FileStatus
[] logfiles
= getFileList(logDirs
, filter
);
242 if (logfiles
.length
!= 0) {
243 status
.setStatus("Checking directory contents...");
244 SplitLogCounters
.tot_mgr_log_split_batch_start
.increment();
245 LOG
.info("Started splitting " + logfiles
.length
+ " logs in " + logDirs
+
246 " for " + serverNames
);
247 startTime
= EnvironmentEdgeManager
.currentTime();
248 batch
= new TaskBatch();
249 for (FileStatus lf
: logfiles
) {
250 // TODO If the log file is still being written to - which is most likely
251 // the case for the last log file - then its length will show up here
252 // as zero. The size of such a file can only be retrieved after
253 // recover-lease is done. totalSize will be under in most cases and the
254 // metrics that it drives will also be under-reported.
255 totalSize
+= lf
.getLen();
256 String pathToLog
= FSUtils
.removeWALRootPath(lf
.getPath(), conf
);
257 if (!enqueueSplitTask(pathToLog
, batch
)) {
258 throw new IOException("duplicate log split scheduled for " + lf
.getPath());
261 waitForSplittingCompletion(batch
, status
);
264 if (batch
!= null && batch
.done
!= batch
.installed
) {
266 SplitLogCounters
.tot_mgr_log_split_batch_err
.increment();
267 LOG
.warn("error while splitting logs in " + logDirs
+ " installed = " + batch
.installed
268 + " but only " + batch
.done
+ " done");
269 String msg
= "error or interrupted while splitting logs in " + logDirs
+ " Task = " + batch
;
271 throw new IOException(msg
);
273 for (Path logDir
: logDirs
) {
274 status
.setStatus("Cleaning up log directory...");
275 final FileSystem fs
= logDir
.getFileSystem(conf
);
277 if (fs
.exists(logDir
) && !fs
.delete(logDir
, false)) {
278 LOG
.warn("Unable to delete log src dir. Ignoring. " + logDir
);
280 } catch (IOException ioe
) {
281 FileStatus
[] files
= fs
.listStatus(logDir
);
282 if (files
!= null && files
.length
> 0) {
283 LOG
.warn("Returning success without actually splitting and "
284 + "deleting all the log files in path " + logDir
+ ": "
285 + Arrays
.toString(files
), ioe
);
287 LOG
.warn("Unable to delete log src dir. Ignoring. " + logDir
, ioe
);
290 SplitLogCounters
.tot_mgr_log_split_batch_success
.increment();
292 String msg
= "Finished splitting (more than or equal to) " + totalSize
+
293 " bytes in " + ((batch
== null)?
0: batch
.installed
) +
294 " log files in " + logDirs
+ " in " +
295 ((startTime
== 0)? startTime
: (EnvironmentEdgeManager
.currentTime() - startTime
)) + "ms";
296 status
.markComplete(msg
);
302 * Add a task entry to coordination if it is not already there.
303 * @param taskname the path of the log to be split
304 * @param batch the batch this task belongs to
305 * @return true if a new entry is created, false if it is already there.
307 boolean enqueueSplitTask(String taskname
, TaskBatch batch
) {
308 lastTaskCreateTime
= EnvironmentEdgeManager
.currentTime();
309 String task
= getSplitLogManagerCoordination().prepareTask(taskname
);
310 Task oldtask
= createTaskIfAbsent(task
, batch
);
311 if (oldtask
== null) {
312 // publish the task in the coordination engine
313 getSplitLogManagerCoordination().submitTask(task
);
319 private void waitForSplittingCompletion(TaskBatch batch
, MonitoredTask status
) {
320 synchronized (batch
) {
321 while ((batch
.done
+ batch
.error
) != batch
.installed
) {
323 status
.setStatus("Waiting for distributed tasks to finish. " + " scheduled="
324 + batch
.installed
+ " done=" + batch
.done
+ " error=" + batch
.error
);
325 int remaining
= batch
.installed
- (batch
.done
+ batch
.error
);
326 int actual
= activeTasks(batch
);
327 if (remaining
!= actual
) {
328 LOG
.warn("Expected " + remaining
+ " active tasks, but actually there are " + actual
);
330 int remainingTasks
= getSplitLogManagerCoordination().remainingTasksInCoordination();
331 if (remainingTasks
>= 0 && actual
> remainingTasks
) {
332 LOG
.warn("Expected at least" + actual
+ " tasks remaining, but actually there are "
335 if (remainingTasks
== 0 || actual
== 0) {
336 LOG
.warn("No more task remaining, splitting "
337 + "should have completed. Remaining tasks is " + remainingTasks
338 + ", active tasks in map " + actual
);
339 if (remainingTasks
== 0 && actual
== 0) {
344 if (server
.isStopped()) {
345 LOG
.warn("Stopped while waiting for log splits to be completed");
348 } catch (InterruptedException e
) {
349 LOG
.warn("Interrupted while waiting for log splits to be completed");
350 Thread
.currentThread().interrupt();
358 ConcurrentMap
<String
, Task
> getTasks() {
362 private int activeTasks(final TaskBatch batch
) {
364 for (Task t
: tasks
.values()) {
365 if (t
.batch
== batch
&& t
.status
== TerminationStatus
.IN_PROGRESS
) {
376 * @return null on success, existing task on error
378 private Task
createTaskIfAbsent(String path
, TaskBatch batch
) {
380 // batch.installed is only changed via this function and
381 // a single thread touches batch.installed.
382 Task newtask
= new Task();
383 newtask
.batch
= batch
;
384 oldtask
= tasks
.putIfAbsent(path
, newtask
);
385 if (oldtask
== null) {
389 // new task was not used.
390 synchronized (oldtask
) {
391 if (oldtask
.isOrphan()) {
392 if (oldtask
.status
== SUCCESS
) {
393 // The task is already done. Do not install the batch for this
394 // task because it might be too late for setDone() to update
395 // batch.done. There is no need for the batch creator to wait for
396 // this task to complete.
399 if (oldtask
.status
== IN_PROGRESS
) {
400 oldtask
.batch
= batch
;
402 LOG
.debug("Previously orphan task " + path
+ " is now being waited upon");
405 while (oldtask
.status
== FAILURE
) {
406 LOG
.debug("wait for status of task " + path
+ " to change to DELETED");
407 SplitLogCounters
.tot_mgr_wait_for_zk_delete
.increment();
410 } catch (InterruptedException e
) {
411 Thread
.currentThread().interrupt();
412 LOG
.warn("Interrupted when waiting for znode delete callback");
413 // fall through to return failure
417 if (oldtask
.status
!= DELETED
) {
418 LOG
.warn("Failure because previously failed task"
419 + " state still present. Waiting for znode delete callback" + " path=" + path
);
422 // reinsert the newTask and it must succeed this time
423 Task t
= tasks
.putIfAbsent(path
, newtask
);
428 LOG
.error(HBaseMarkers
.FATAL
, "Logic error. Deleted task still present in tasks map");
429 assert false : "Deleted task still present in tasks map";
432 LOG
.warn("Failure because two threads can't wait for the same task; path=" + path
);
438 if (choreService
!= null) {
439 choreService
.shutdown();
441 if (timeoutMonitor
!= null) {
442 timeoutMonitor
.cancel(true);
446 void handleDeadWorker(ServerName workerName
) {
447 // resubmit the tasks on the TimeoutMonitor thread. Makes it easier
448 // to reason about concurrency. Makes it easier to retry.
449 synchronized (deadWorkersLock
) {
450 if (deadWorkers
== null) {
451 deadWorkers
= new HashSet
<>(100);
453 deadWorkers
.add(workerName
);
455 LOG
.info("Dead splitlog worker {}", workerName
);
458 void handleDeadWorkers(Set
<ServerName
> serverNames
) {
459 synchronized (deadWorkersLock
) {
460 if (deadWorkers
== null) {
461 deadWorkers
= new HashSet
<>(100);
463 deadWorkers
.addAll(serverNames
);
465 LOG
.info("dead splitlog workers " + serverNames
);
469 * Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed().
470 * Clients threads use this object to wait for all their tasks to be done.
472 * All access is synchronized.
474 @InterfaceAudience.Private
475 public static class TaskBatch
{
476 public int installed
= 0;
478 public int error
= 0;
479 public volatile boolean isDead
= false;
482 public String
toString() {
483 return ("installed = " + installed
+ " done = " + done
+ " error = " + error
);
488 * in memory state of an active task.
490 @InterfaceAudience.Private
491 public static class Task
{
492 public volatile long last_update
;
493 public volatile int last_version
;
494 public volatile ServerName cur_worker_name
;
495 public volatile TaskBatch batch
;
496 public volatile TerminationStatus status
;
497 public volatile AtomicInteger incarnation
= new AtomicInteger(0);
498 public final AtomicInteger unforcedResubmits
= new AtomicInteger();
499 public volatile boolean resubmitThresholdReached
;
502 public String
toString() {
503 return ("last_update = " + last_update
+ " last_version = " + last_version
504 + " cur_worker_name = " + cur_worker_name
+ " status = " + status
+ " incarnation = "
505 + incarnation
+ " resubmits = " + unforcedResubmits
.get() + " batch = " + batch
);
510 status
= IN_PROGRESS
;
514 public boolean isOrphan() {
515 return (batch
== null || batch
.isDead
);
518 public boolean isUnassigned() {
519 return (cur_worker_name
== null);
522 public void heartbeatNoDetails(long time
) {
526 public void heartbeat(long time
, int version
, ServerName worker
) {
527 last_version
= version
;
529 cur_worker_name
= worker
;
532 public void setUnassigned() {
533 cur_worker_name
= null;
539 * Periodically checks all active tasks and resubmits the ones that have timed out
541 private class TimeoutMonitor
extends ScheduledChore
{
542 private long lastLog
= 0;
544 public TimeoutMonitor(final int period
, Stoppable stopper
) {
545 super("SplitLogManager Timeout Monitor", stopper
, period
);
549 protected void chore() {
550 if (server
.getCoordinatedStateManager() == null) return;
555 boolean found_assigned_task
= false;
556 Set
<ServerName
> localDeadWorkers
;
558 synchronized (deadWorkersLock
) {
559 localDeadWorkers
= deadWorkers
;
563 for (Map
.Entry
<String
, Task
> e
: tasks
.entrySet()) {
564 String path
= e
.getKey();
565 Task task
= e
.getValue();
566 ServerName cur_worker
= task
.cur_worker_name
;
568 // don't easily resubmit a task which hasn't been picked up yet. It
569 // might be a long while before a SplitLogWorker is free to pick up a
570 // task. This is because a SplitLogWorker picks up a task one at a
571 // time. If we want progress when there are no region servers then we
572 // will have to run a SplitLogWorker thread in the Master.
573 if (task
.isUnassigned()) {
577 found_assigned_task
= true;
578 if (localDeadWorkers
!= null && localDeadWorkers
.contains(cur_worker
)) {
579 SplitLogCounters
.tot_mgr_resubmit_dead_server_task
.increment();
580 if (getSplitLogManagerCoordination().resubmitTask(path
, task
, FORCE
)) {
583 handleDeadWorker(cur_worker
);
584 LOG
.warn("Failed to resubmit task " + path
+ " owned by dead " + cur_worker
587 } else if (getSplitLogManagerCoordination().resubmitTask(path
, task
, CHECK
)) {
592 long now
= EnvironmentEdgeManager
.currentTime();
593 if (now
> lastLog
+ 5000) {
595 LOG
.info("total=" + tot
+ ", unassigned=" + unassigned
+ ", tasks=" + tasks
);
598 if (resubmitted
> 0) {
599 LOG
.info("resubmitted " + resubmitted
+ " out of " + tot
+ " tasks");
601 // If there are pending tasks and all of them have been unassigned for
602 // some time then put up a RESCAN node to ping the workers.
603 // ZKSplitlog.DEFAULT_UNASSIGNED_TIMEOUT is of the order of minutes
604 // because a. it is very unlikely that every worker had a
605 // transient error when trying to grab the task b. if there are no
606 // workers then all tasks wills stay unassigned indefinitely and the
607 // manager will be indefinitely creating RESCAN nodes. TODO may be the
608 // master should spawn both a manager and a worker thread to guarantee
609 // that there is always one worker in the system
611 && !found_assigned_task
612 && ((EnvironmentEdgeManager
.currentTime() - lastTaskCreateTime
) > unassignedTimeout
)) {
613 for (Map
.Entry
<String
, Task
> e
: tasks
.entrySet()) {
614 String key
= e
.getKey();
615 Task task
= e
.getValue();
616 // we have to do task.isUnassigned() check again because tasks might
617 // have been asynchronously assigned. There is no locking required
618 // for these checks ... it is OK even if tryGetDataSetWatch() is
619 // called unnecessarily for a taskpath
620 if (task
.isUnassigned() && (task
.status
!= FAILURE
)) {
621 // We just touch the znode to make sure its still there
622 getSplitLogManagerCoordination().checkTaskStillAvailable(key
);
625 getSplitLogManagerCoordination().checkTasks();
626 SplitLogCounters
.tot_mgr_resubmit_unassigned
.increment();
627 LOG
.debug("resubmitting unassigned task(s) after timeout");
629 Set
<String
> failedDeletions
=
630 getSplitLogManagerCoordination().getDetails().getFailedDeletions();
631 // Retry previously failed deletes
632 if (failedDeletions
.size() > 0) {
633 List
<String
> tmpPaths
= new ArrayList
<>(failedDeletions
);
634 for (String tmpPath
: tmpPaths
) {
635 // deleteNode is an async call
636 getSplitLogManagerCoordination().deleteTask(tmpPath
);
638 failedDeletions
.removeAll(tmpPaths
);
643 public enum ResubmitDirective
{
647 public enum TerminationStatus
{
648 IN_PROGRESS("in_progress"), SUCCESS("success"), FAILURE("failure"), DELETED("deleted");
650 final String statusMsg
;
652 TerminationStatus(String msg
) {
657 public String
toString() {