2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.util
;
21 import java
.io
.FileNotFoundException
;
22 import java
.io
.IOException
;
23 import java
.io
.InterruptedIOException
;
24 import java
.lang
.reflect
.Method
;
25 import java
.net
.InetSocketAddress
;
27 import java
.util
.Collection
;
28 import java
.util
.HashSet
;
31 import org
.apache
.hadoop
.conf
.Configuration
;
32 import org
.apache
.hadoop
.fs
.FileSystem
;
33 import org
.apache
.hadoop
.fs
.FilterFileSystem
;
34 import org
.apache
.hadoop
.fs
.Path
;
35 import org
.apache
.hadoop
.hdfs
.DistributedFileSystem
;
36 import org
.apache
.hadoop
.hdfs
.server
.namenode
.LeaseExpiredException
;
37 import org
.apache
.yetus
.audience
.InterfaceAudience
;
38 import org
.apache
.yetus
.audience
.InterfaceStability
;
39 import org
.slf4j
.Logger
;
40 import org
.slf4j
.LoggerFactory
;
42 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Sets
;
45 * Implementation for hdfs
47 @InterfaceAudience.Private
48 @InterfaceStability.Evolving
49 public class FSHDFSUtils
extends FSUtils
{
50 private static final Logger LOG
= LoggerFactory
.getLogger(FSHDFSUtils
.class);
51 private static Class dfsUtilClazz
;
52 private static Method getNNAddressesMethod
;
57 * @return A set containing all namenode addresses of fs
59 private static Set
<InetSocketAddress
> getNNAddresses(DistributedFileSystem fs
,
61 Set
<InetSocketAddress
> addresses
= new HashSet
<>();
62 String serviceName
= fs
.getCanonicalServiceName();
64 if (serviceName
.startsWith("ha-hdfs")) {
66 if (dfsUtilClazz
== null) {
67 dfsUtilClazz
= Class
.forName("org.apache.hadoop.hdfs.DFSUtil");
69 if (getNNAddressesMethod
== null) {
71 // getNNServiceRpcAddressesForCluster is available only in version
72 // equal to or later than Hadoop 2.6
73 getNNAddressesMethod
=
74 dfsUtilClazz
.getMethod("getNNServiceRpcAddressesForCluster", Configuration
.class);
75 } catch (NoSuchMethodException e
) {
76 // If hadoop version is older than hadoop 2.6
77 getNNAddressesMethod
=
78 dfsUtilClazz
.getMethod("getNNServiceRpcAddresses", Configuration
.class);
83 Map
<String
, Map
<String
, InetSocketAddress
>> addressMap
=
84 (Map
<String
, Map
<String
, InetSocketAddress
>>) getNNAddressesMethod
86 String nameService
= serviceName
.substring(serviceName
.indexOf(":") + 1);
87 if (addressMap
.containsKey(nameService
)) {
88 Map
<String
, InetSocketAddress
> nnMap
= addressMap
.get(nameService
);
89 for (Map
.Entry
<String
, InetSocketAddress
> e2
: nnMap
.entrySet()) {
90 InetSocketAddress addr
= e2
.getValue();
94 } catch (Exception e
) {
95 LOG
.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName
, e
);
98 URI uri
= fs
.getUri();
99 int port
= uri
.getPort();
101 int idx
= serviceName
.indexOf(':');
102 port
= Integer
.parseInt(serviceName
.substring(idx
+1));
104 InetSocketAddress addr
= new InetSocketAddress(uri
.getHost(), port
);
112 * @param conf the Configuration of HBase
115 * @return Whether srcFs and desFs are on same hdfs or not
117 public static boolean isSameHdfs(Configuration conf
, FileSystem srcFs
, FileSystem desFs
) {
118 // By getCanonicalServiceName, we could make sure both srcFs and desFs
119 // show a unified format which contains scheme, host and port.
120 String srcServiceName
= srcFs
.getCanonicalServiceName();
121 String desServiceName
= desFs
.getCanonicalServiceName();
123 if (srcServiceName
== null || desServiceName
== null) {
126 if (srcServiceName
.equals(desServiceName
)) {
129 if (srcServiceName
.startsWith("ha-hdfs") && desServiceName
.startsWith("ha-hdfs")) {
130 Collection
<String
> internalNameServices
=
131 conf
.getTrimmedStringCollection("dfs.internal.nameservices");
132 if (!internalNameServices
.isEmpty()) {
133 if (internalNameServices
.contains(srcServiceName
.split(":")[1])) {
140 if (srcFs
instanceof DistributedFileSystem
&& desFs
instanceof DistributedFileSystem
) {
141 //If one serviceName is an HA format while the other is a non-HA format,
142 // maybe they refer to the same FileSystem.
143 //For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port"
144 Set
<InetSocketAddress
> srcAddrs
= getNNAddresses((DistributedFileSystem
) srcFs
, conf
);
145 Set
<InetSocketAddress
> desAddrs
= getNNAddresses((DistributedFileSystem
) desFs
, conf
);
146 if (Sets
.intersection(srcAddrs
, desAddrs
).size() > 0) {
155 * Recover the lease from HDFS, retrying multiple times.
158 public void recoverFileLease(FileSystem fs
, Path p
, Configuration conf
,
159 CancelableProgressable reporter
) throws IOException
{
160 if (fs
instanceof FilterFileSystem
) {
161 fs
= ((FilterFileSystem
) fs
).getRawFileSystem();
163 // lease recovery not needed for local file system case.
164 if (!(fs
instanceof DistributedFileSystem
)) {
167 recoverDFSFileLease((DistributedFileSystem
) fs
, p
, conf
, reporter
);
171 * Run the dfs recover lease. recoverLease is asynchronous. It returns:
172 * -false when it starts the lease recovery (i.e. lease recovery not *yet* done)
173 * - true when the lease recovery has succeeded or the file is closed.
174 * But, we have to be careful. Each time we call recoverLease, it starts the recover lease
175 * process over from the beginning. We could put ourselves in a situation where we are
176 * doing nothing but starting a recovery, interrupting it to start again, and so on.
177 * The findings over in HBASE-8354 have it that the namenode will try to recover the lease
178 * on the file's primary node. If all is well, it should return near immediately. But,
179 * as is common, it is the very primary node that has crashed and so the namenode will be
180 * stuck waiting on a socket timeout before it will ask another datanode to start the
181 * recovery. It does not help if we call recoverLease in the meantime and in particular,
182 * subsequent to the socket timeout, a recoverLease invocation will cause us to start
183 * over from square one (possibly waiting on socket timeout against primary node). So,
184 * in the below, we do the following:
185 * 1. Call recoverLease.
186 * 2. If it returns true, break.
187 * 3. If it returns false, wait a few seconds and then call it again.
188 * 4. If it returns true, break.
189 * 5. If it returns false, wait for what we think the datanode socket timeout is
190 * (configurable) and then try again.
191 * 6. If it returns true, break.
192 * 7. If it returns false, repeat starting at step 5. above.
194 * If HDFS-4525 is available, call it every second and we might be able to exit early.
196 boolean recoverDFSFileLease(final DistributedFileSystem dfs
, final Path p
,
197 final Configuration conf
, final CancelableProgressable reporter
)
199 LOG
.info("Recover lease on dfs file " + p
);
200 long startWaiting
= EnvironmentEdgeManager
.currentTime();
201 // Default is 15 minutes. It's huge, but the idea is that if we have a major issue, HDFS
202 // usually needs 10 minutes before marking the nodes as dead. So we're putting ourselves
203 // beyond that limit 'to be safe'.
204 long recoveryTimeout
= conf
.getInt("hbase.lease.recovery.timeout", 900000) + startWaiting
;
205 // This setting should be a little bit above what the cluster dfs heartbeat is set to.
206 long firstPause
= conf
.getInt("hbase.lease.recovery.first.pause", 4000);
207 // This should be set to how long it'll take for us to timeout against primary datanode if it
208 // is dead. We set it to 64 seconds, 4 second than the default READ_TIMEOUT in HDFS, the
209 // default value for DFS_CLIENT_SOCKET_TIMEOUT_KEY. If recovery is still failing after this
210 // timeout, then further recovery will take liner backoff with this base, to avoid endless
211 // preemptions when this value is not properly configured.
212 long subsequentPauseBase
= conf
.getLong("hbase.lease.recovery.dfs.timeout", 64 * 1000);
214 Method isFileClosedMeth
= null;
215 // whether we need to look for isFileClosed method
216 boolean findIsFileClosedMeth
= true;
217 boolean recovered
= false;
218 // We break the loop if we succeed the lease recovery, timeout, or we throw an exception.
219 for (int nbAttempt
= 0; !recovered
; nbAttempt
++) {
220 recovered
= recoverLease(dfs
, nbAttempt
, p
, startWaiting
);
221 if (recovered
) break;
222 checkIfCancelled(reporter
);
223 if (checkIfTimedout(conf
, recoveryTimeout
, nbAttempt
, p
, startWaiting
)) break;
225 // On the first time through wait the short 'firstPause'.
226 if (nbAttempt
== 0) {
227 Thread
.sleep(firstPause
);
229 // Cycle here until (subsequentPause * nbAttempt) elapses. While spinning, check
230 // isFileClosed if available (should be in hadoop 2.0.5... not in hadoop 1 though.
231 long localStartWaiting
= EnvironmentEdgeManager
.currentTime();
232 while ((EnvironmentEdgeManager
.currentTime() - localStartWaiting
) <
233 subsequentPauseBase
* nbAttempt
) {
234 Thread
.sleep(conf
.getInt("hbase.lease.recovery.pause", 1000));
235 if (findIsFileClosedMeth
) {
237 isFileClosedMeth
= dfs
.getClass().getMethod("isFileClosed",
238 new Class
[]{ Path
.class });
239 } catch (NoSuchMethodException nsme
) {
240 LOG
.debug("isFileClosed not available");
242 findIsFileClosedMeth
= false;
245 if (isFileClosedMeth
!= null && isFileClosed(dfs
, isFileClosedMeth
, p
)) {
249 checkIfCancelled(reporter
);
252 } catch (InterruptedException ie
) {
253 InterruptedIOException iioe
= new InterruptedIOException();
261 boolean checkIfTimedout(final Configuration conf
, final long recoveryTimeout
,
262 final int nbAttempt
, final Path p
, final long startWaiting
) {
263 if (recoveryTimeout
< EnvironmentEdgeManager
.currentTime()) {
264 LOG
.warn("Cannot recoverLease after trying for " +
265 conf
.getInt("hbase.lease.recovery.timeout", 900000) +
266 "ms (hbase.lease.recovery.timeout); continuing, but may be DATALOSS!!!; " +
267 getLogMessageDetail(nbAttempt
, p
, startWaiting
));
274 * Try to recover the lease.
278 * @param startWaiting
279 * @return True if dfs#recoverLease came by true.
280 * @throws FileNotFoundException
282 boolean recoverLease(final DistributedFileSystem dfs
, final int nbAttempt
, final Path p
,
283 final long startWaiting
)
284 throws FileNotFoundException
{
285 boolean recovered
= false;
287 recovered
= dfs
.recoverLease(p
);
288 LOG
.info((recovered?
"Recovered lease, ": "Failed to recover lease, ") +
289 getLogMessageDetail(nbAttempt
, p
, startWaiting
));
290 } catch (IOException e
) {
291 if (e
instanceof LeaseExpiredException
&& e
.getMessage().contains("File does not exist")) {
292 // This exception comes out instead of FNFE, fix it
293 throw new FileNotFoundException("The given WAL wasn't found at " + p
);
294 } else if (e
instanceof FileNotFoundException
) {
295 throw (FileNotFoundException
)e
;
297 LOG
.warn(getLogMessageDetail(nbAttempt
, p
, startWaiting
), e
);
305 * @param startWaiting
306 * @return Detail to append to any log message around lease recovering.
308 private String
getLogMessageDetail(final int nbAttempt
, final Path p
, final long startWaiting
) {
309 return "attempt=" + nbAttempt
+ " on file=" + p
+ " after " +
310 (EnvironmentEdgeManager
.currentTime() - startWaiting
) + "ms";
314 * Call HDFS-4525 isFileClosed if it is available.
318 * @return True if file is closed.
320 private boolean isFileClosed(final DistributedFileSystem dfs
, final Method m
, final Path p
) {
322 return (Boolean
) m
.invoke(dfs
, p
);
323 } catch (SecurityException e
) {
324 LOG
.warn("No access", e
);
325 } catch (Exception e
) {
326 LOG
.warn("Failed invocation for " + p
.toString(), e
);
331 void checkIfCancelled(final CancelableProgressable reporter
)
332 throws InterruptedIOException
{
333 if (reporter
== null) return;
334 if (!reporter
.progress()) throw new InterruptedIOException("Operation cancelled");