HBASE-22768 Revert to MPIR 2.9 (#433)
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / master / SplitLogManager.java
blob4d977d3427549401436a6355d414cbdfc2fe5c3d
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.master;
20 import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
21 import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
22 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
23 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
24 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
25 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collections;
31 import java.util.HashSet;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.ConcurrentMap;
37 import java.util.concurrent.atomic.AtomicInteger;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.fs.FileStatus;
41 import org.apache.hadoop.fs.FileSystem;
42 import org.apache.hadoop.fs.Path;
43 import org.apache.hadoop.fs.PathFilter;
44 import org.apache.hadoop.hbase.ChoreService;
45 import org.apache.hadoop.hbase.ScheduledChore;
46 import org.apache.hadoop.hbase.ServerName;
47 import org.apache.hadoop.hbase.SplitLogCounters;
48 import org.apache.hadoop.hbase.Stoppable;
49 import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination;
50 import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails;
51 import org.apache.hadoop.hbase.log.HBaseMarkers;
52 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
53 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
54 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
55 import org.apache.hadoop.hbase.util.FSUtils;
56 import org.apache.hadoop.hbase.util.HasThread;
57 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
58 import org.apache.yetus.audience.InterfaceAudience;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
63 /**
64 * Distributes the task of log splitting to the available region servers.
65 * Coordination happens via coordination engine. For every log file that has to be split a
66 * task is created. SplitLogWorkers race to grab a task.
68 * <p>SplitLogManager monitors the tasks that it creates using the
69 * timeoutMonitor thread. If a task's progress is slow then
70 * {@link SplitLogManagerCoordination#checkTasks} will take away the
71 * task from the owner {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}
72 * and the task will be up for grabs again. When the task is done then it is
73 * deleted by SplitLogManager.
75 * <p>Clients call {@link #splitLogDistributed(Path)} to split a region server's
76 * log files. The caller thread waits in this method until all the log files
77 * have been split.
79 * <p>All the coordination calls made by this class are asynchronous. This is mainly
80 * to help reduce response time seen by the callers.
82 * <p>There is race in this design between the SplitLogManager and the
83 * SplitLogWorker. SplitLogManager might re-queue a task that has in reality
84 * already been completed by a SplitLogWorker. We rely on the idempotency of
85 * the log splitting task for correctness.
87 * <p>It is also assumed that every log splitting task is unique and once
88 * completed (either with success or with error) it will be not be submitted
89 * again. If a task is resubmitted then there is a risk that old "delete task"
90 * can delete the re-submission.
92 @InterfaceAudience.Private
93 public class SplitLogManager {
94 private static final Logger LOG = LoggerFactory.getLogger(SplitLogManager.class);
96 private final MasterServices server;
98 private final Configuration conf;
99 private final ChoreService choreService;
101 public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); // 3 min
103 private long unassignedTimeout;
104 private long lastTaskCreateTime = Long.MAX_VALUE;
106 @VisibleForTesting
107 final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<>();
108 private TimeoutMonitor timeoutMonitor;
110 private volatile Set<ServerName> deadWorkers = null;
111 private final Object deadWorkersLock = new Object();
114 * Its OK to construct this object even when region-servers are not online. It does lookup the
115 * orphan tasks in coordination engine but it doesn't block waiting for them to be done.
116 * @param master the master services
117 * @param conf the HBase configuration
118 * @throws IOException
120 public SplitLogManager(MasterServices master, Configuration conf)
121 throws IOException {
122 this.server = master;
123 this.conf = conf;
124 // Get Server Thread name. Sometimes the Server is mocked so may not implement HasThread.
125 // For example, in tests.
126 String name = master instanceof HasThread? ((HasThread)master).getName():
127 master.getServerName().toShortString();
128 this.choreService =
129 new ChoreService(name + ".splitLogManager.");
130 if (server.getCoordinatedStateManager() != null) {
131 SplitLogManagerCoordination coordination = getSplitLogManagerCoordination();
132 Set<String> failedDeletions = Collections.synchronizedSet(new HashSet<String>());
133 SplitLogManagerDetails details = new SplitLogManagerDetails(tasks, master, failedDeletions);
134 coordination.setDetails(details);
135 coordination.init();
137 this.unassignedTimeout =
138 conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
139 this.timeoutMonitor =
140 new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000),
141 master);
142 choreService.scheduleChore(timeoutMonitor);
145 private SplitLogManagerCoordination getSplitLogManagerCoordination() {
146 return server.getCoordinatedStateManager().getSplitLogManagerCoordination();
149 private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
150 return getFileList(conf, logDirs, filter);
154 * Get a list of paths that need to be split given a set of server-specific directories and
155 * optionally a filter.
157 * See {@link AbstractFSWALProvider#getServerNameFromWALDirectoryName} for more info on directory
158 * layout.
160 * Should be package-private, but is needed by
161 * {@link org.apache.hadoop.hbase.wal.WALSplitter#split(Path, Path, Path, FileSystem,
162 * Configuration, org.apache.hadoop.hbase.wal.WALFactory)} for tests.
164 @VisibleForTesting
165 public static FileStatus[] getFileList(final Configuration conf, final List<Path> logDirs,
166 final PathFilter filter)
167 throws IOException {
168 List<FileStatus> fileStatus = new ArrayList<>();
169 for (Path logDir : logDirs) {
170 final FileSystem fs = logDir.getFileSystem(conf);
171 if (!fs.exists(logDir)) {
172 LOG.warn(logDir + " doesn't exist. Nothing to do!");
173 continue;
175 FileStatus[] logfiles = FSUtils.listStatus(fs, logDir, filter);
176 if (logfiles == null || logfiles.length == 0) {
177 LOG.info("{} dir is empty, no logs to split.", logDir);
178 } else {
179 Collections.addAll(fileStatus, logfiles);
182 FileStatus[] a = new FileStatus[fileStatus.size()];
183 return fileStatus.toArray(a);
187 * @param logDir one region sever wal dir path in .logs
188 * @throws IOException if there was an error while splitting any log file
189 * @return cumulative size of the logfiles split
190 * @throws IOException
192 public long splitLogDistributed(final Path logDir) throws IOException {
193 List<Path> logDirs = new ArrayList<>();
194 logDirs.add(logDir);
195 return splitLogDistributed(logDirs);
199 * The caller will block until all the log files of the given region server have been processed -
200 * successfully split or an error is encountered - by an available worker region server. This
201 * method must only be called after the region servers have been brought online.
202 * @param logDirs List of log dirs to split
203 * @throws IOException If there was an error while splitting any log file
204 * @return cumulative size of the logfiles split
206 public long splitLogDistributed(final List<Path> logDirs) throws IOException {
207 if (logDirs.isEmpty()) {
208 return 0;
210 Set<ServerName> serverNames = new HashSet<>();
211 for (Path logDir : logDirs) {
212 try {
213 ServerName serverName = AbstractFSWALProvider.getServerNameFromWALDirectoryName(logDir);
214 if (serverName != null) {
215 serverNames.add(serverName);
217 } catch (IllegalArgumentException e) {
218 // ignore invalid format error.
219 LOG.warn("Cannot parse server name from " + logDir);
222 return splitLogDistributed(serverNames, logDirs, null);
226 * The caller will block until all the hbase:meta log files of the given region server have been
227 * processed - successfully split or an error is encountered - by an available worker region
228 * server. This method must only be called after the region servers have been brought online.
229 * @param logDirs List of log dirs to split
230 * @param filter the Path filter to select specific files for considering
231 * @throws IOException If there was an error while splitting any log file
232 * @return cumulative size of the logfiles split
234 public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,
235 PathFilter filter) throws IOException {
236 MonitoredTask status = TaskMonitor.get().createStatus("Doing distributed log split in " +
237 logDirs + " for serverName=" + serverNames);
238 long totalSize = 0;
239 TaskBatch batch = null;
240 long startTime = 0;
241 FileStatus[] logfiles = getFileList(logDirs, filter);
242 if (logfiles.length != 0) {
243 status.setStatus("Checking directory contents...");
244 SplitLogCounters.tot_mgr_log_split_batch_start.increment();
245 LOG.info("Started splitting " + logfiles.length + " logs in " + logDirs +
246 " for " + serverNames);
247 startTime = EnvironmentEdgeManager.currentTime();
248 batch = new TaskBatch();
249 for (FileStatus lf : logfiles) {
250 // TODO If the log file is still being written to - which is most likely
251 // the case for the last log file - then its length will show up here
252 // as zero. The size of such a file can only be retrieved after
253 // recover-lease is done. totalSize will be under in most cases and the
254 // metrics that it drives will also be under-reported.
255 totalSize += lf.getLen();
256 String pathToLog = FSUtils.removeWALRootPath(lf.getPath(), conf);
257 if (!enqueueSplitTask(pathToLog, batch)) {
258 throw new IOException("duplicate log split scheduled for " + lf.getPath());
261 waitForSplittingCompletion(batch, status);
264 if (batch != null && batch.done != batch.installed) {
265 batch.isDead = true;
266 SplitLogCounters.tot_mgr_log_split_batch_err.increment();
267 LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed
268 + " but only " + batch.done + " done");
269 String msg = "error or interrupted while splitting logs in " + logDirs + " Task = " + batch;
270 status.abort(msg);
271 throw new IOException(msg);
273 for (Path logDir : logDirs) {
274 status.setStatus("Cleaning up log directory...");
275 final FileSystem fs = logDir.getFileSystem(conf);
276 try {
277 if (fs.exists(logDir) && !fs.delete(logDir, false)) {
278 LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
280 } catch (IOException ioe) {
281 FileStatus[] files = fs.listStatus(logDir);
282 if (files != null && files.length > 0) {
283 LOG.warn("Returning success without actually splitting and "
284 + "deleting all the log files in path " + logDir + ": "
285 + Arrays.toString(files), ioe);
286 } else {
287 LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
290 SplitLogCounters.tot_mgr_log_split_batch_success.increment();
292 String msg = "Finished splitting (more than or equal to) " + totalSize +
293 " bytes in " + ((batch == null)? 0: batch.installed) +
294 " log files in " + logDirs + " in " +
295 ((startTime == 0)? startTime: (EnvironmentEdgeManager.currentTime() - startTime)) + "ms";
296 status.markComplete(msg);
297 LOG.info(msg);
298 return totalSize;
302 * Add a task entry to coordination if it is not already there.
303 * @param taskname the path of the log to be split
304 * @param batch the batch this task belongs to
305 * @return true if a new entry is created, false if it is already there.
307 boolean enqueueSplitTask(String taskname, TaskBatch batch) {
308 lastTaskCreateTime = EnvironmentEdgeManager.currentTime();
309 String task = getSplitLogManagerCoordination().prepareTask(taskname);
310 Task oldtask = createTaskIfAbsent(task, batch);
311 if (oldtask == null) {
312 // publish the task in the coordination engine
313 getSplitLogManagerCoordination().submitTask(task);
314 return true;
316 return false;
319 private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) {
320 synchronized (batch) {
321 while ((batch.done + batch.error) != batch.installed) {
322 try {
323 status.setStatus("Waiting for distributed tasks to finish. " + " scheduled="
324 + batch.installed + " done=" + batch.done + " error=" + batch.error);
325 int remaining = batch.installed - (batch.done + batch.error);
326 int actual = activeTasks(batch);
327 if (remaining != actual) {
328 LOG.warn("Expected " + remaining + " active tasks, but actually there are " + actual);
330 int remainingTasks = getSplitLogManagerCoordination().remainingTasksInCoordination();
331 if (remainingTasks >= 0 && actual > remainingTasks) {
332 LOG.warn("Expected at least" + actual + " tasks remaining, but actually there are "
333 + remainingTasks);
335 if (remainingTasks == 0 || actual == 0) {
336 LOG.warn("No more task remaining, splitting "
337 + "should have completed. Remaining tasks is " + remainingTasks
338 + ", active tasks in map " + actual);
339 if (remainingTasks == 0 && actual == 0) {
340 return;
343 batch.wait(100);
344 if (server.isStopped()) {
345 LOG.warn("Stopped while waiting for log splits to be completed");
346 return;
348 } catch (InterruptedException e) {
349 LOG.warn("Interrupted while waiting for log splits to be completed");
350 Thread.currentThread().interrupt();
351 return;
357 @VisibleForTesting
358 ConcurrentMap<String, Task> getTasks() {
359 return tasks;
362 private int activeTasks(final TaskBatch batch) {
363 int count = 0;
364 for (Task t : tasks.values()) {
365 if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) {
366 count++;
369 return count;
374 * @param path
375 * @param batch
376 * @return null on success, existing task on error
378 private Task createTaskIfAbsent(String path, TaskBatch batch) {
379 Task oldtask;
380 // batch.installed is only changed via this function and
381 // a single thread touches batch.installed.
382 Task newtask = new Task();
383 newtask.batch = batch;
384 oldtask = tasks.putIfAbsent(path, newtask);
385 if (oldtask == null) {
386 batch.installed++;
387 return null;
389 // new task was not used.
390 synchronized (oldtask) {
391 if (oldtask.isOrphan()) {
392 if (oldtask.status == SUCCESS) {
393 // The task is already done. Do not install the batch for this
394 // task because it might be too late for setDone() to update
395 // batch.done. There is no need for the batch creator to wait for
396 // this task to complete.
397 return (null);
399 if (oldtask.status == IN_PROGRESS) {
400 oldtask.batch = batch;
401 batch.installed++;
402 LOG.debug("Previously orphan task " + path + " is now being waited upon");
403 return null;
405 while (oldtask.status == FAILURE) {
406 LOG.debug("wait for status of task " + path + " to change to DELETED");
407 SplitLogCounters.tot_mgr_wait_for_zk_delete.increment();
408 try {
409 oldtask.wait();
410 } catch (InterruptedException e) {
411 Thread.currentThread().interrupt();
412 LOG.warn("Interrupted when waiting for znode delete callback");
413 // fall through to return failure
414 break;
417 if (oldtask.status != DELETED) {
418 LOG.warn("Failure because previously failed task"
419 + " state still present. Waiting for znode delete callback" + " path=" + path);
420 return oldtask;
422 // reinsert the newTask and it must succeed this time
423 Task t = tasks.putIfAbsent(path, newtask);
424 if (t == null) {
425 batch.installed++;
426 return null;
428 LOG.error(HBaseMarkers.FATAL, "Logic error. Deleted task still present in tasks map");
429 assert false : "Deleted task still present in tasks map";
430 return t;
432 LOG.warn("Failure because two threads can't wait for the same task; path=" + path);
433 return oldtask;
437 public void stop() {
438 if (choreService != null) {
439 choreService.shutdown();
441 if (timeoutMonitor != null) {
442 timeoutMonitor.cancel(true);
446 void handleDeadWorker(ServerName workerName) {
447 // resubmit the tasks on the TimeoutMonitor thread. Makes it easier
448 // to reason about concurrency. Makes it easier to retry.
449 synchronized (deadWorkersLock) {
450 if (deadWorkers == null) {
451 deadWorkers = new HashSet<>(100);
453 deadWorkers.add(workerName);
455 LOG.info("Dead splitlog worker {}", workerName);
458 void handleDeadWorkers(Set<ServerName> serverNames) {
459 synchronized (deadWorkersLock) {
460 if (deadWorkers == null) {
461 deadWorkers = new HashSet<>(100);
463 deadWorkers.addAll(serverNames);
465 LOG.info("dead splitlog workers " + serverNames);
469 * Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed().
470 * Clients threads use this object to wait for all their tasks to be done.
471 * <p>
472 * All access is synchronized.
474 @InterfaceAudience.Private
475 public static class TaskBatch {
476 public int installed = 0;
477 public int done = 0;
478 public int error = 0;
479 public volatile boolean isDead = false;
481 @Override
482 public String toString() {
483 return ("installed = " + installed + " done = " + done + " error = " + error);
488 * in memory state of an active task.
490 @InterfaceAudience.Private
491 public static class Task {
492 public volatile long last_update;
493 public volatile int last_version;
494 public volatile ServerName cur_worker_name;
495 public volatile TaskBatch batch;
496 public volatile TerminationStatus status;
497 public volatile AtomicInteger incarnation = new AtomicInteger(0);
498 public final AtomicInteger unforcedResubmits = new AtomicInteger();
499 public volatile boolean resubmitThresholdReached;
501 @Override
502 public String toString() {
503 return ("last_update = " + last_update + " last_version = " + last_version
504 + " cur_worker_name = " + cur_worker_name + " status = " + status + " incarnation = "
505 + incarnation + " resubmits = " + unforcedResubmits.get() + " batch = " + batch);
508 public Task() {
509 last_version = -1;
510 status = IN_PROGRESS;
511 setUnassigned();
514 public boolean isOrphan() {
515 return (batch == null || batch.isDead);
518 public boolean isUnassigned() {
519 return (cur_worker_name == null);
522 public void heartbeatNoDetails(long time) {
523 last_update = time;
526 public void heartbeat(long time, int version, ServerName worker) {
527 last_version = version;
528 last_update = time;
529 cur_worker_name = worker;
532 public void setUnassigned() {
533 cur_worker_name = null;
534 last_update = -1;
539 * Periodically checks all active tasks and resubmits the ones that have timed out
541 private class TimeoutMonitor extends ScheduledChore {
542 private long lastLog = 0;
544 public TimeoutMonitor(final int period, Stoppable stopper) {
545 super("SplitLogManager Timeout Monitor", stopper, period);
548 @Override
549 protected void chore() {
550 if (server.getCoordinatedStateManager() == null) return;
552 int resubmitted = 0;
553 int unassigned = 0;
554 int tot = 0;
555 boolean found_assigned_task = false;
556 Set<ServerName> localDeadWorkers;
558 synchronized (deadWorkersLock) {
559 localDeadWorkers = deadWorkers;
560 deadWorkers = null;
563 for (Map.Entry<String, Task> e : tasks.entrySet()) {
564 String path = e.getKey();
565 Task task = e.getValue();
566 ServerName cur_worker = task.cur_worker_name;
567 tot++;
568 // don't easily resubmit a task which hasn't been picked up yet. It
569 // might be a long while before a SplitLogWorker is free to pick up a
570 // task. This is because a SplitLogWorker picks up a task one at a
571 // time. If we want progress when there are no region servers then we
572 // will have to run a SplitLogWorker thread in the Master.
573 if (task.isUnassigned()) {
574 unassigned++;
575 continue;
577 found_assigned_task = true;
578 if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
579 SplitLogCounters.tot_mgr_resubmit_dead_server_task.increment();
580 if (getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) {
581 resubmitted++;
582 } else {
583 handleDeadWorker(cur_worker);
584 LOG.warn("Failed to resubmit task " + path + " owned by dead " + cur_worker
585 + ", will retry.");
587 } else if (getSplitLogManagerCoordination().resubmitTask(path, task, CHECK)) {
588 resubmitted++;
591 if (tot > 0) {
592 long now = EnvironmentEdgeManager.currentTime();
593 if (now > lastLog + 5000) {
594 lastLog = now;
595 LOG.info("total=" + tot + ", unassigned=" + unassigned + ", tasks=" + tasks);
598 if (resubmitted > 0) {
599 LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks");
601 // If there are pending tasks and all of them have been unassigned for
602 // some time then put up a RESCAN node to ping the workers.
603 // ZKSplitlog.DEFAULT_UNASSIGNED_TIMEOUT is of the order of minutes
604 // because a. it is very unlikely that every worker had a
605 // transient error when trying to grab the task b. if there are no
606 // workers then all tasks wills stay unassigned indefinitely and the
607 // manager will be indefinitely creating RESCAN nodes. TODO may be the
608 // master should spawn both a manager and a worker thread to guarantee
609 // that there is always one worker in the system
610 if (tot > 0
611 && !found_assigned_task
612 && ((EnvironmentEdgeManager.currentTime() - lastTaskCreateTime) > unassignedTimeout)) {
613 for (Map.Entry<String, Task> e : tasks.entrySet()) {
614 String key = e.getKey();
615 Task task = e.getValue();
616 // we have to do task.isUnassigned() check again because tasks might
617 // have been asynchronously assigned. There is no locking required
618 // for these checks ... it is OK even if tryGetDataSetWatch() is
619 // called unnecessarily for a taskpath
620 if (task.isUnassigned() && (task.status != FAILURE)) {
621 // We just touch the znode to make sure its still there
622 getSplitLogManagerCoordination().checkTaskStillAvailable(key);
625 getSplitLogManagerCoordination().checkTasks();
626 SplitLogCounters.tot_mgr_resubmit_unassigned.increment();
627 LOG.debug("resubmitting unassigned task(s) after timeout");
629 Set<String> failedDeletions =
630 getSplitLogManagerCoordination().getDetails().getFailedDeletions();
631 // Retry previously failed deletes
632 if (failedDeletions.size() > 0) {
633 List<String> tmpPaths = new ArrayList<>(failedDeletions);
634 for (String tmpPath : tmpPaths) {
635 // deleteNode is an async call
636 getSplitLogManagerCoordination().deleteTask(tmpPath);
638 failedDeletions.removeAll(tmpPaths);
643 public enum ResubmitDirective {
644 CHECK(), FORCE()
647 public enum TerminationStatus {
648 IN_PROGRESS("in_progress"), SUCCESS("success"), FAILURE("failure"), DELETED("deleted");
650 final String statusMsg;
652 TerminationStatus(String msg) {
653 statusMsg = msg;
656 @Override
657 public String toString() {
658 return statusMsg;