HBASE-21843 RegionGroupingProvider breaks the meta wal file name pattern which may...
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / util / FSHDFSUtils.java
bloba49ee0227a99c603eac5d234018fd7ef5d389fca
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org.apache.hadoop.hbase.util;
21 import java.io.FileNotFoundException;
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.lang.reflect.Method;
25 import java.net.InetSocketAddress;
26 import java.net.URI;
27 import java.util.Collection;
28 import java.util.HashSet;
29 import java.util.Map;
30 import java.util.Set;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.FileSystem;
33 import org.apache.hadoop.fs.FilterFileSystem;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.hdfs.DistributedFileSystem;
36 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
37 import org.apache.yetus.audience.InterfaceAudience;
38 import org.apache.yetus.audience.InterfaceStability;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
42 import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
44 /**
45 * Implementation for hdfs
47 @InterfaceAudience.Private
48 @InterfaceStability.Evolving
49 public class FSHDFSUtils extends FSUtils {
50 private static final Logger LOG = LoggerFactory.getLogger(FSHDFSUtils.class);
51 private static Class dfsUtilClazz;
52 private static Method getNNAddressesMethod;
54 /**
55 * @param fs
56 * @param conf
57 * @return A set containing all namenode addresses of fs
59 private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs,
60 Configuration conf) {
61 Set<InetSocketAddress> addresses = new HashSet<>();
62 String serviceName = fs.getCanonicalServiceName();
64 if (serviceName.startsWith("ha-hdfs")) {
65 try {
66 if (dfsUtilClazz == null) {
67 dfsUtilClazz = Class.forName("org.apache.hadoop.hdfs.DFSUtil");
69 if (getNNAddressesMethod == null) {
70 try {
71 // getNNServiceRpcAddressesForCluster is available only in version
72 // equal to or later than Hadoop 2.6
73 getNNAddressesMethod =
74 dfsUtilClazz.getMethod("getNNServiceRpcAddressesForCluster", Configuration.class);
75 } catch (NoSuchMethodException e) {
76 // If hadoop version is older than hadoop 2.6
77 getNNAddressesMethod =
78 dfsUtilClazz.getMethod("getNNServiceRpcAddresses", Configuration.class);
83 Map<String, Map<String, InetSocketAddress>> addressMap =
84 (Map<String, Map<String, InetSocketAddress>>) getNNAddressesMethod
85 .invoke(null, conf);
86 String nameService = serviceName.substring(serviceName.indexOf(":") + 1);
87 if (addressMap.containsKey(nameService)) {
88 Map<String, InetSocketAddress> nnMap = addressMap.get(nameService);
89 for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) {
90 InetSocketAddress addr = e2.getValue();
91 addresses.add(addr);
94 } catch (Exception e) {
95 LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e);
97 } else {
98 URI uri = fs.getUri();
99 int port = uri.getPort();
100 if (port < 0) {
101 int idx = serviceName.indexOf(':');
102 port = Integer.parseInt(serviceName.substring(idx+1));
104 InetSocketAddress addr = new InetSocketAddress(uri.getHost(), port);
105 addresses.add(addr);
108 return addresses;
112 * @param conf the Configuration of HBase
113 * @param srcFs
114 * @param desFs
115 * @return Whether srcFs and desFs are on same hdfs or not
117 public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) {
118 // By getCanonicalServiceName, we could make sure both srcFs and desFs
119 // show a unified format which contains scheme, host and port.
120 String srcServiceName = srcFs.getCanonicalServiceName();
121 String desServiceName = desFs.getCanonicalServiceName();
123 if (srcServiceName == null || desServiceName == null) {
124 return false;
126 if (srcServiceName.equals(desServiceName)) {
127 return true;
129 if (srcServiceName.startsWith("ha-hdfs") && desServiceName.startsWith("ha-hdfs")) {
130 Collection<String> internalNameServices =
131 conf.getTrimmedStringCollection("dfs.internal.nameservices");
132 if (!internalNameServices.isEmpty()) {
133 if (internalNameServices.contains(srcServiceName.split(":")[1])) {
134 return true;
135 } else {
136 return false;
140 if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) {
141 //If one serviceName is an HA format while the other is a non-HA format,
142 // maybe they refer to the same FileSystem.
143 //For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port"
144 Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf);
145 Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf);
146 if (Sets.intersection(srcAddrs, desAddrs).size() > 0) {
147 return true;
151 return false;
155 * Recover the lease from HDFS, retrying multiple times.
157 @Override
158 public void recoverFileLease(FileSystem fs, Path p, Configuration conf,
159 CancelableProgressable reporter) throws IOException {
160 if (fs instanceof FilterFileSystem) {
161 fs = ((FilterFileSystem) fs).getRawFileSystem();
163 // lease recovery not needed for local file system case.
164 if (!(fs instanceof DistributedFileSystem)) {
165 return;
167 recoverDFSFileLease((DistributedFileSystem) fs, p, conf, reporter);
171 * Run the dfs recover lease. recoverLease is asynchronous. It returns:
172 * -false when it starts the lease recovery (i.e. lease recovery not *yet* done)
173 * - true when the lease recovery has succeeded or the file is closed.
174 * But, we have to be careful. Each time we call recoverLease, it starts the recover lease
175 * process over from the beginning. We could put ourselves in a situation where we are
176 * doing nothing but starting a recovery, interrupting it to start again, and so on.
177 * The findings over in HBASE-8354 have it that the namenode will try to recover the lease
178 * on the file's primary node. If all is well, it should return near immediately. But,
179 * as is common, it is the very primary node that has crashed and so the namenode will be
180 * stuck waiting on a socket timeout before it will ask another datanode to start the
181 * recovery. It does not help if we call recoverLease in the meantime and in particular,
182 * subsequent to the socket timeout, a recoverLease invocation will cause us to start
183 * over from square one (possibly waiting on socket timeout against primary node). So,
184 * in the below, we do the following:
185 * 1. Call recoverLease.
186 * 2. If it returns true, break.
187 * 3. If it returns false, wait a few seconds and then call it again.
188 * 4. If it returns true, break.
189 * 5. If it returns false, wait for what we think the datanode socket timeout is
190 * (configurable) and then try again.
191 * 6. If it returns true, break.
192 * 7. If it returns false, repeat starting at step 5. above.
194 * If HDFS-4525 is available, call it every second and we might be able to exit early.
196 boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p,
197 final Configuration conf, final CancelableProgressable reporter)
198 throws IOException {
199 LOG.info("Recover lease on dfs file " + p);
200 long startWaiting = EnvironmentEdgeManager.currentTime();
201 // Default is 15 minutes. It's huge, but the idea is that if we have a major issue, HDFS
202 // usually needs 10 minutes before marking the nodes as dead. So we're putting ourselves
203 // beyond that limit 'to be safe'.
204 long recoveryTimeout = conf.getInt("hbase.lease.recovery.timeout", 900000) + startWaiting;
205 // This setting should be a little bit above what the cluster dfs heartbeat is set to.
206 long firstPause = conf.getInt("hbase.lease.recovery.first.pause", 4000);
207 // This should be set to how long it'll take for us to timeout against primary datanode if it
208 // is dead. We set it to 64 seconds, 4 second than the default READ_TIMEOUT in HDFS, the
209 // default value for DFS_CLIENT_SOCKET_TIMEOUT_KEY. If recovery is still failing after this
210 // timeout, then further recovery will take liner backoff with this base, to avoid endless
211 // preemptions when this value is not properly configured.
212 long subsequentPauseBase = conf.getLong("hbase.lease.recovery.dfs.timeout", 64 * 1000);
214 Method isFileClosedMeth = null;
215 // whether we need to look for isFileClosed method
216 boolean findIsFileClosedMeth = true;
217 boolean recovered = false;
218 // We break the loop if we succeed the lease recovery, timeout, or we throw an exception.
219 for (int nbAttempt = 0; !recovered; nbAttempt++) {
220 recovered = recoverLease(dfs, nbAttempt, p, startWaiting);
221 if (recovered) break;
222 checkIfCancelled(reporter);
223 if (checkIfTimedout(conf, recoveryTimeout, nbAttempt, p, startWaiting)) break;
224 try {
225 // On the first time through wait the short 'firstPause'.
226 if (nbAttempt == 0) {
227 Thread.sleep(firstPause);
228 } else {
229 // Cycle here until (subsequentPause * nbAttempt) elapses. While spinning, check
230 // isFileClosed if available (should be in hadoop 2.0.5... not in hadoop 1 though.
231 long localStartWaiting = EnvironmentEdgeManager.currentTime();
232 while ((EnvironmentEdgeManager.currentTime() - localStartWaiting) <
233 subsequentPauseBase * nbAttempt) {
234 Thread.sleep(conf.getInt("hbase.lease.recovery.pause", 1000));
235 if (findIsFileClosedMeth) {
236 try {
237 isFileClosedMeth = dfs.getClass().getMethod("isFileClosed",
238 new Class[]{ Path.class });
239 } catch (NoSuchMethodException nsme) {
240 LOG.debug("isFileClosed not available");
241 } finally {
242 findIsFileClosedMeth = false;
245 if (isFileClosedMeth != null && isFileClosed(dfs, isFileClosedMeth, p)) {
246 recovered = true;
247 break;
249 checkIfCancelled(reporter);
252 } catch (InterruptedException ie) {
253 InterruptedIOException iioe = new InterruptedIOException();
254 iioe.initCause(ie);
255 throw iioe;
258 return recovered;
261 boolean checkIfTimedout(final Configuration conf, final long recoveryTimeout,
262 final int nbAttempt, final Path p, final long startWaiting) {
263 if (recoveryTimeout < EnvironmentEdgeManager.currentTime()) {
264 LOG.warn("Cannot recoverLease after trying for " +
265 conf.getInt("hbase.lease.recovery.timeout", 900000) +
266 "ms (hbase.lease.recovery.timeout); continuing, but may be DATALOSS!!!; " +
267 getLogMessageDetail(nbAttempt, p, startWaiting));
268 return true;
270 return false;
274 * Try to recover the lease.
275 * @param dfs
276 * @param nbAttempt
277 * @param p
278 * @param startWaiting
279 * @return True if dfs#recoverLease came by true.
280 * @throws FileNotFoundException
282 boolean recoverLease(final DistributedFileSystem dfs, final int nbAttempt, final Path p,
283 final long startWaiting)
284 throws FileNotFoundException {
285 boolean recovered = false;
286 try {
287 recovered = dfs.recoverLease(p);
288 LOG.info((recovered? "Recovered lease, ": "Failed to recover lease, ") +
289 getLogMessageDetail(nbAttempt, p, startWaiting));
290 } catch (IOException e) {
291 if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) {
292 // This exception comes out instead of FNFE, fix it
293 throw new FileNotFoundException("The given WAL wasn't found at " + p);
294 } else if (e instanceof FileNotFoundException) {
295 throw (FileNotFoundException)e;
297 LOG.warn(getLogMessageDetail(nbAttempt, p, startWaiting), e);
299 return recovered;
303 * @param nbAttempt
304 * @param p
305 * @param startWaiting
306 * @return Detail to append to any log message around lease recovering.
308 private String getLogMessageDetail(final int nbAttempt, final Path p, final long startWaiting) {
309 return "attempt=" + nbAttempt + " on file=" + p + " after " +
310 (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms";
314 * Call HDFS-4525 isFileClosed if it is available.
315 * @param dfs
316 * @param m
317 * @param p
318 * @return True if file is closed.
320 private boolean isFileClosed(final DistributedFileSystem dfs, final Method m, final Path p) {
321 try {
322 return (Boolean) m.invoke(dfs, p);
323 } catch (SecurityException e) {
324 LOG.warn("No access", e);
325 } catch (Exception e) {
326 LOG.warn("Failed invocation for " + p.toString(), e);
328 return false;
331 void checkIfCancelled(final CancelableProgressable reporter)
332 throws InterruptedIOException {
333 if (reporter == null) return;
334 if (!reporter.progress()) throw new InterruptedIOException("Operation cancelled");