2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import java
.io
.IOException
;
21 import java
.math
.BigInteger
;
22 import java
.security
.PrivilegedAction
;
23 import java
.security
.SecureRandom
;
24 import java
.util
.ArrayList
;
25 import java
.util
.HashMap
;
26 import java
.util
.List
;
28 import java
.util
.concurrent
.ConcurrentHashMap
;
29 import java
.util
.function
.Consumer
;
30 import org
.apache
.commons
.lang3
.mutable
.MutableInt
;
31 import org
.apache
.hadoop
.conf
.Configuration
;
32 import org
.apache
.hadoop
.fs
.FileStatus
;
33 import org
.apache
.hadoop
.fs
.FileSystem
;
34 import org
.apache
.hadoop
.fs
.FileUtil
;
35 import org
.apache
.hadoop
.fs
.Path
;
36 import org
.apache
.hadoop
.fs
.permission
.FsPermission
;
37 import org
.apache
.hadoop
.hbase
.DoNotRetryIOException
;
38 import org
.apache
.hadoop
.hbase
.HConstants
;
39 import org
.apache
.hadoop
.hbase
.TableName
;
40 import org
.apache
.hadoop
.hbase
.client
.AsyncConnection
;
41 import org
.apache
.hadoop
.hbase
.ipc
.RpcServer
;
42 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
.BulkLoadListener
;
43 import org
.apache
.hadoop
.hbase
.security
.User
;
44 import org
.apache
.hadoop
.hbase
.security
.UserProvider
;
45 import org
.apache
.hadoop
.hbase
.security
.token
.AuthenticationTokenIdentifier
;
46 import org
.apache
.hadoop
.hbase
.security
.token
.ClientTokenUtil
;
47 import org
.apache
.hadoop
.hbase
.security
.token
.FsDelegationToken
;
48 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
49 import org
.apache
.hadoop
.hbase
.util
.CommonFSUtils
;
50 import org
.apache
.hadoop
.hbase
.util
.FSUtils
;
51 import org
.apache
.hadoop
.hbase
.util
.Methods
;
52 import org
.apache
.hadoop
.hbase
.util
.Pair
;
53 import org
.apache
.hadoop
.io
.Text
;
54 import org
.apache
.hadoop
.security
.UserGroupInformation
;
55 import org
.apache
.hadoop
.security
.token
.Token
;
56 import org
.apache
.yetus
.audience
.InterfaceAudience
;
57 import org
.slf4j
.Logger
;
58 import org
.slf4j
.LoggerFactory
;
60 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
;
61 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.BulkLoadHFileRequest
;
62 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.CleanupBulkLoadRequest
;
63 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.PrepareBulkLoadRequest
;
66 * Bulk loads in secure mode.
68 * This service addresses two issues:
70 * <li>Moving files in a secure filesystem wherein the HBase Client
71 * and HBase Server are different filesystem users.</li>
72 * <li>Does moving in a secure manner. Assuming that the filesystem
73 * is POSIX compliant.</li>
76 * The algorithm is as follows:
78 * <li>Create an hbase owned staging directory which is
79 * world traversable (711): {@code /hbase/staging}</li>
80 * <li>A user writes out data to his secure output directory: {@code /user/foo/data}</li>
81 * <li>A call is made to hbase to create a secret staging directory
82 * which globally rwx (777): {@code /user/staging/averylongandrandomdirectoryname}</li>
83 * <li>The user moves the data into the random staging directory,
84 * then calls bulkLoadHFiles()</li>
87 * Like delegation tokens the strength of the security lies in the length
88 * and randomness of the secret directory.
91 @InterfaceAudience.Private
92 public class SecureBulkLoadManager
{
94 public static final long VERSION
= 0L;
96 //320/5 = 64 characters
97 private static final int RANDOM_WIDTH
= 320;
98 private static final int RANDOM_RADIX
= 32;
100 private static final Logger LOG
= LoggerFactory
.getLogger(SecureBulkLoadManager
.class);
102 private final static FsPermission PERM_ALL_ACCESS
= FsPermission
.valueOf("-rwxrwxrwx");
103 private final static FsPermission PERM_HIDDEN
= FsPermission
.valueOf("-rwx--x--x");
104 private SecureRandom random
;
105 private FileSystem fs
;
106 private Configuration conf
;
108 //two levels so it doesn't get deleted accidentally
109 //no sticky bit in Hadoop 1.0
110 private Path baseStagingDir
;
112 private UserProvider userProvider
;
113 private ConcurrentHashMap
<UserGroupInformation
, MutableInt
> ugiReferenceCounter
;
114 private AsyncConnection conn
;
116 SecureBulkLoadManager(Configuration conf
, AsyncConnection conn
) {
121 public void start() throws IOException
{
122 random
= new SecureRandom();
123 userProvider
= UserProvider
.instantiate(conf
);
124 ugiReferenceCounter
= new ConcurrentHashMap
<>();
125 fs
= FileSystem
.get(conf
);
126 baseStagingDir
= new Path(CommonFSUtils
.getRootDir(conf
), HConstants
.BULKLOAD_STAGING_DIR_NAME
);
128 if (conf
.get("hbase.bulkload.staging.dir") != null) {
129 LOG
.warn("hbase.bulkload.staging.dir " + " is deprecated. Bulkload staging directory is "
132 if (!fs
.exists(baseStagingDir
)) {
133 fs
.mkdirs(baseStagingDir
, PERM_HIDDEN
);
137 public void stop() throws IOException
{
140 public String
prepareBulkLoad(final HRegion region
, final PrepareBulkLoadRequest request
)
142 User user
= getActiveUser();
143 region
.getCoprocessorHost().prePrepareBulkLoad(user
);
146 createStagingDir(baseStagingDir
, user
, region
.getTableDescriptor().getTableName())
152 public void cleanupBulkLoad(final HRegion region
, final CleanupBulkLoadRequest request
)
154 region
.getCoprocessorHost().preCleanupBulkLoad(getActiveUser());
156 Path path
= new Path(request
.getBulkToken());
157 if (!fs
.delete(path
, true)) {
158 if (fs
.exists(path
)) {
159 throw new IOException("Failed to clean up " + path
);
162 LOG
.trace("Cleaned up {} successfully.", path
);
165 private Consumer
<HRegion
> fsCreatedListener
;
167 void setFsCreatedListener(Consumer
<HRegion
> fsCreatedListener
) {
168 this.fsCreatedListener
= fsCreatedListener
;
171 private void incrementUgiReference(UserGroupInformation ugi
) {
172 // if we haven't seen this ugi before, make a new counter
173 ugiReferenceCounter
.compute(ugi
, (key
, value
) -> {
175 value
= new MutableInt(1);
183 private void decrementUgiReference(UserGroupInformation ugi
) {
184 // if the count drops below 1 we remove the entry by returning null
185 ugiReferenceCounter
.computeIfPresent(ugi
, (key
, value
) -> {
186 if (value
.intValue() > 1) {
195 private boolean isUserReferenced(UserGroupInformation ugi
) {
196 // if the ugi is in the map, based on invariants above
197 // the count must be above zero
198 return ugiReferenceCounter
.containsKey(ugi
);
201 public Map
<byte[], List
<Path
>> secureBulkLoadHFiles(final HRegion region
,
202 final BulkLoadHFileRequest request
) throws IOException
{
203 return secureBulkLoadHFiles(region
, request
, null);
206 public Map
<byte[], List
<Path
>> secureBulkLoadHFiles(final HRegion region
,
207 final BulkLoadHFileRequest request
, List
<String
> clusterIds
) throws IOException
{
208 final List
<Pair
<byte[], String
>> familyPaths
= new ArrayList
<>(request
.getFamilyPathCount());
209 for(ClientProtos
.BulkLoadHFileRequest
.FamilyPath el
: request
.getFamilyPathList()) {
210 familyPaths
.add(new Pair
<>(el
.getFamily().toByteArray(), el
.getPath()));
213 Token
<AuthenticationTokenIdentifier
> userToken
= null;
214 if (userProvider
.isHadoopSecurityEnabled()) {
215 userToken
= new Token
<>(request
.getFsToken().getIdentifier().toByteArray(),
216 request
.getFsToken().getPassword().toByteArray(), new Text(request
.getFsToken().getKind()),
217 new Text(request
.getFsToken().getService()));
219 final String bulkToken
= request
.getBulkToken();
220 User user
= getActiveUser();
221 final UserGroupInformation ugi
= user
.getUGI();
222 if (userProvider
.isHadoopSecurityEnabled()) {
224 Token
<AuthenticationTokenIdentifier
> tok
= ClientTokenUtil
.obtainToken(conn
).get();
226 boolean b
= ugi
.addToken(tok
);
227 LOG
.debug("token added " + tok
+ " for user " + ugi
+ " return=" + b
);
229 } catch (Exception ioe
) {
230 LOG
.warn("unable to add token", ioe
);
233 if (userToken
!= null) {
234 ugi
.addToken(userToken
);
235 } else if (userProvider
.isHadoopSecurityEnabled()) {
236 //we allow this to pass through in "simple" security mode
237 //for mini cluster testing
238 throw new DoNotRetryIOException("User token cannot be null");
241 if (region
.getCoprocessorHost() != null) {
242 region
.getCoprocessorHost().preBulkLoadHFile(familyPaths
);
244 Map
<byte[], List
<Path
>> map
= null;
247 incrementUgiReference(ugi
);
248 // Get the target fs (HBase region server fs) delegation token
249 // Since we have checked the permission via 'preBulkLoadHFile', now let's give
250 // the 'request user' necessary token to operate on the target fs.
251 // After this point the 'doAs' user will hold two tokens, one for the source fs
252 // ('request user'), another for the target fs (HBase region server principal).
253 if (userProvider
.isHadoopSecurityEnabled()) {
254 FsDelegationToken targetfsDelegationToken
= new FsDelegationToken(userProvider
,"renewer");
255 targetfsDelegationToken
.acquireDelegationToken(fs
);
257 Token
<?
> targetFsToken
= targetfsDelegationToken
.getUserToken();
258 if (targetFsToken
!= null
259 && (userToken
== null || !targetFsToken
.getService().equals(userToken
.getService()))){
260 ugi
.addToken(targetFsToken
);
264 map
= ugi
.doAs(new PrivilegedAction
<Map
<byte[], List
<Path
>>>() {
266 public Map
<byte[], List
<Path
>> run() {
267 FileSystem fs
= null;
270 * This is creating and caching a new FileSystem instance. Other code called
271 * "beneath" this method will rely on this FileSystem instance being in the
272 * cache. This is important as those methods make _no_ attempt to close this
273 * FileSystem instance. It is critical that here, in SecureBulkLoadManager,
274 * we are tracking the lifecycle and closing the FS when safe to do so.
276 fs
= FileSystem
.get(conf
);
277 for(Pair
<byte[], String
> el
: familyPaths
) {
278 Path stageFamily
= new Path(bulkToken
, Bytes
.toString(el
.getFirst()));
279 if(!fs
.exists(stageFamily
)) {
280 fs
.mkdirs(stageFamily
);
281 fs
.setPermission(stageFamily
, PERM_ALL_ACCESS
);
284 if (fsCreatedListener
!= null) {
285 fsCreatedListener
.accept(region
);
287 //We call bulkLoadHFiles as requesting user
288 //To enable access prior to staging
289 return region
.bulkLoadHFiles(familyPaths
, true,
290 new SecureBulkLoadListener(fs
, bulkToken
, conf
), request
.getCopyFile(),
291 clusterIds
, request
.getReplicate());
292 } catch (Exception e
) {
293 LOG
.error("Failed to complete bulk load", e
);
299 decrementUgiReference(ugi
);
301 if (!UserGroupInformation
.getLoginUser().equals(ugi
) && !isUserReferenced(ugi
)) {
302 FileSystem
.closeAllForUGI(ugi
);
304 } catch (IOException e
) {
305 LOG
.error("Failed to close FileSystem for: {}", ugi
, e
);
307 if (region
.getCoprocessorHost() != null) {
308 region
.getCoprocessorHost().postBulkLoadHFile(familyPaths
, map
);
314 private Path
createStagingDir(Path baseDir
,
316 TableName tableName
) throws IOException
{
317 String tblName
= tableName
.getNameAsString().replace(":", "_");
318 String randomDir
= user
.getShortName()+"__"+ tblName
+"__"+
319 (new BigInteger(RANDOM_WIDTH
, random
).toString(RANDOM_RADIX
));
320 return createStagingDir(baseDir
, user
, randomDir
);
323 private Path
createStagingDir(Path baseDir
,
325 String randomDir
) throws IOException
{
326 Path p
= new Path(baseDir
, randomDir
);
327 fs
.mkdirs(p
, PERM_ALL_ACCESS
);
328 fs
.setPermission(p
, PERM_ALL_ACCESS
);
332 private User
getActiveUser() throws IOException
{
333 // for non-rpc handling, fallback to system user
334 User user
= RpcServer
.getRequestUser().orElse(userProvider
.getCurrent());
335 // this is for testing
336 if (userProvider
.isHadoopSecurityEnabled() &&
337 "simple".equalsIgnoreCase(conf
.get(User
.HBASE_SECURITY_CONF_KEY
))) {
338 return User
.createUserForTesting(conf
, user
.getShortName(), new String
[] {});
344 private static class SecureBulkLoadListener
implements BulkLoadListener
{
346 private final FileSystem fs
;
347 private final String stagingDir
;
348 private final Configuration conf
;
350 private FileSystem srcFs
= null;
351 private Map
<String
, FsPermission
> origPermissions
= null;
353 public SecureBulkLoadListener(FileSystem fs
, String stagingDir
, Configuration conf
) {
355 this.stagingDir
= stagingDir
;
357 this.origPermissions
= new HashMap
<>();
361 public String
prepareBulkLoad(final byte[] family
, final String srcPath
, boolean copyFile
)
363 Path p
= new Path(srcPath
);
364 Path stageP
= new Path(stagingDir
, new Path(Bytes
.toString(family
), p
.getName()));
366 // In case of Replication for bulk load files, hfiles are already copied in staging directory
367 if (p
.equals(stageP
)) {
368 LOG
.debug(p
.getName()
369 + " is already available in staging directory. Skipping copy or rename.");
370 return stageP
.toString();
374 srcFs
= FileSystem
.newInstance(p
.toUri(), conf
);
378 throw new IOException("Path does not reference a file: " + p
);
381 // Check to see if the source and target filesystems are the same
382 if (!FSUtils
.isSameHdfs(conf
, srcFs
, fs
)) {
383 LOG
.debug("Bulk-load file " + srcPath
+ " is on different filesystem than " +
384 "the destination filesystem. Copying file over to destination staging dir.");
385 FileUtil
.copy(srcFs
, p
, fs
, stageP
, false, conf
);
386 } else if (copyFile
) {
387 LOG
.debug("Bulk-load file " + srcPath
+ " is copied to destination staging dir.");
388 FileUtil
.copy(srcFs
, p
, fs
, stageP
, false, conf
);
390 LOG
.debug("Moving " + p
+ " to " + stageP
);
391 FileStatus origFileStatus
= fs
.getFileStatus(p
);
392 origPermissions
.put(srcPath
, origFileStatus
.getPermission());
393 if(!fs
.rename(p
, stageP
)) {
394 throw new IOException("Failed to move HFile: " + p
+ " to " + stageP
);
397 fs
.setPermission(stageP
, PERM_ALL_ACCESS
);
398 return stageP
.toString();
402 public void doneBulkLoad(byte[] family
, String srcPath
) throws IOException
{
403 LOG
.debug("Bulk Load done for: " + srcPath
);
407 private void closeSrcFs() throws IOException
{
415 public void failedBulkLoad(final byte[] family
, final String srcPath
) throws IOException
{
417 Path p
= new Path(srcPath
);
419 srcFs
= FileSystem
.newInstance(p
.toUri(), conf
);
421 if (!FSUtils
.isSameHdfs(conf
, srcFs
, fs
)) {
422 // files are copied so no need to move them back
425 Path stageP
= new Path(stagingDir
, new Path(Bytes
.toString(family
), p
.getName()));
427 // In case of Replication for bulk load files, hfiles are not renamed by end point during
428 // prepare stage, so no need of rename here again
429 if (p
.equals(stageP
)) {
430 LOG
.debug(p
.getName() + " is already available in source directory. Skipping rename.");
434 LOG
.debug("Moving " + stageP
+ " back to " + p
);
435 if (!fs
.rename(stageP
, p
)) {
436 throw new IOException("Failed to move HFile: " + stageP
+ " to " + p
);
439 // restore original permission
440 if (origPermissions
.containsKey(srcPath
)) {
441 fs
.setPermission(p
, origPermissions
.get(srcPath
));
443 LOG
.warn("Can't find previous permission for path=" + srcPath
);
451 * Check if the path is referencing a file.
452 * This is mainly needed to avoid symlinks.
454 * @return true if the p is a file
455 * @throws IOException
457 private boolean isFile(Path p
) throws IOException
{
458 FileStatus status
= srcFs
.getFileStatus(p
);
459 boolean isFile
= !status
.isDirectory();
461 isFile
= isFile
&& !(Boolean
)Methods
.call(FileStatus
.class, status
, "isSymlink", null, null);
462 } catch (Exception e
) {