2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.regionserver
;
21 import org
.apache
.commons
.logging
.Log
;
22 import org
.apache
.commons
.logging
.LogFactory
;
23 import org
.apache
.hadoop
.conf
.Configuration
;
24 import org
.apache
.hadoop
.fs
.FileStatus
;
25 import org
.apache
.hadoop
.fs
.FileSystem
;
26 import org
.apache
.hadoop
.fs
.FileUtil
;
27 import org
.apache
.hadoop
.fs
.Path
;
28 import org
.apache
.hadoop
.fs
.permission
.FsPermission
;
29 import org
.apache
.hadoop
.hbase
.DoNotRetryIOException
;
30 import org
.apache
.hadoop
.hbase
.HConstants
;
31 import org
.apache
.hadoop
.hbase
.TableName
;
32 import org
.apache
.hadoop
.hbase
.classification
.InterfaceAudience
;
33 import org
.apache
.hadoop
.hbase
.client
.Connection
;
34 import org
.apache
.hadoop
.hbase
.coprocessor
.BulkLoadObserver
;
35 import org
.apache
.hadoop
.hbase
.coprocessor
.ObserverContext
;
36 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessorEnvironment
;
37 import org
.apache
.hadoop
.hbase
.ipc
.RpcServer
;
38 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
;
39 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.BulkLoadHFileRequest
;
40 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.CleanupBulkLoadRequest
;
41 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.PrepareBulkLoadRequest
;
42 import org
.apache
.hadoop
.hbase
.regionserver
.Region
.BulkLoadListener
;
43 import org
.apache
.hadoop
.hbase
.security
.User
;
44 import org
.apache
.hadoop
.hbase
.security
.UserProvider
;
45 import org
.apache
.hadoop
.hbase
.security
.token
.FsDelegationToken
;
46 import org
.apache
.hadoop
.hbase
.security
.token
.TokenUtil
;
47 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
48 import org
.apache
.hadoop
.hbase
.util
.FSHDFSUtils
;
49 import org
.apache
.hadoop
.hbase
.util
.FSUtils
;
50 import org
.apache
.hadoop
.hbase
.util
.Methods
;
51 import org
.apache
.hadoop
.hbase
.util
.Pair
;
52 import org
.apache
.hadoop
.io
.Text
;
53 import org
.apache
.hadoop
.security
.UserGroupInformation
;
54 import org
.apache
.hadoop
.security
.token
.Token
;
56 import java
.io
.IOException
;
57 import java
.math
.BigInteger
;
58 import java
.security
.PrivilegedAction
;
59 import java
.security
.SecureRandom
;
60 import java
.util
.ArrayList
;
61 import java
.util
.HashMap
;
62 import java
.util
.List
;
66 * Bulk loads in secure mode.
68 * This service addresses two issues:
70 * <li>Moving files in a secure filesystem wherein the HBase Client
71 * and HBase Server are different filesystem users.</li>
72 * <li>Does moving in a secure manner. Assuming that the filesystem
73 * is POSIX compliant.</li>
76 * The algorithm is as follows:
78 * <li>Create an hbase owned staging directory which is
79 * world traversable (711): {@code /hbase/staging}</li>
80 * <li>A user writes out data to his secure output directory: {@code /user/foo/data}</li>
81 * <li>A call is made to hbase to create a secret staging directory
82 * which globally rwx (777): {@code /user/staging/averylongandrandomdirectoryname}</li>
83 * <li>The user moves the data into the random staging directory,
84 * then calls bulkLoadHFiles()</li>
87 * Like delegation tokens the strength of the security lies in the length
88 * and randomness of the secret directory.
91 @InterfaceAudience.Private
92 public class SecureBulkLoadManager
{
94 public static final long VERSION
= 0L;
96 //320/5 = 64 characters
97 private static final int RANDOM_WIDTH
= 320;
98 private static final int RANDOM_RADIX
= 32;
100 private static final Log LOG
= LogFactory
.getLog(SecureBulkLoadManager
.class);
102 private final static FsPermission PERM_ALL_ACCESS
= FsPermission
.valueOf("-rwxrwxrwx");
103 private final static FsPermission PERM_HIDDEN
= FsPermission
.valueOf("-rwx--x--x");
104 private SecureRandom random
;
105 private FileSystem fs
;
106 private Configuration conf
;
108 //two levels so it doesn't get deleted accidentally
109 //no sticky bit in Hadoop 1.0
110 private Path baseStagingDir
;
112 private UserProvider userProvider
;
113 private Connection conn
;
115 SecureBulkLoadManager(Configuration conf
, Connection conn
) {
120 public void start() throws IOException
{
121 random
= new SecureRandom();
122 userProvider
= UserProvider
.instantiate(conf
);
123 fs
= FileSystem
.get(conf
);
124 baseStagingDir
= new Path(FSUtils
.getRootDir(conf
), HConstants
.BULKLOAD_STAGING_DIR_NAME
);
126 if (conf
.get("hbase.bulkload.staging.dir") != null) {
127 LOG
.warn("hbase.bulkload.staging.dir " + " is deprecated. Bulkload staging directory is "
130 if (!fs
.exists(baseStagingDir
)) {
131 fs
.mkdirs(baseStagingDir
, PERM_HIDDEN
);
135 public void stop() throws IOException
{
138 public String
prepareBulkLoad(final Region region
, final PrepareBulkLoadRequest request
)
140 List
<BulkLoadObserver
> bulkLoadObservers
= getBulkLoadObservers(region
);
142 if (bulkLoadObservers
!= null && bulkLoadObservers
.size() != 0) {
143 ObserverContext
<RegionCoprocessorEnvironment
> ctx
= new ObserverContext
<>(getActiveUser());
144 ctx
.prepare((RegionCoprocessorEnvironment
) region
.getCoprocessorHost()
145 .findCoprocessorEnvironment(BulkLoadObserver
.class).get(0));
147 for (BulkLoadObserver bulkLoadObserver
: bulkLoadObservers
) {
148 bulkLoadObserver
.prePrepareBulkLoad(ctx
, request
);
153 createStagingDir(baseStagingDir
, getActiveUser(), region
.getTableDesc().getTableName())
159 public void cleanupBulkLoad(final Region region
, final CleanupBulkLoadRequest request
)
161 List
<BulkLoadObserver
> bulkLoadObservers
= getBulkLoadObservers(region
);
163 if (bulkLoadObservers
!= null && bulkLoadObservers
.size() != 0) {
164 ObserverContext
<RegionCoprocessorEnvironment
> ctx
= new ObserverContext
<>(getActiveUser());
165 ctx
.prepare((RegionCoprocessorEnvironment
) region
.getCoprocessorHost()
166 .findCoprocessorEnvironment(BulkLoadObserver
.class).get(0));
168 for (BulkLoadObserver bulkLoadObserver
: bulkLoadObservers
) {
169 bulkLoadObserver
.preCleanupBulkLoad(ctx
, request
);
173 fs
.delete(new Path(request
.getBulkToken()), true);
176 public Map
<byte[], List
<Path
>> secureBulkLoadHFiles(final Region region
,
177 final BulkLoadHFileRequest request
) throws IOException
{
178 final List
<Pair
<byte[], String
>> familyPaths
= new ArrayList
<>(request
.getFamilyPathCount());
179 for(ClientProtos
.BulkLoadHFileRequest
.FamilyPath el
: request
.getFamilyPathList()) {
180 familyPaths
.add(new Pair
<>(el
.getFamily().toByteArray(), el
.getPath()));
183 Token userToken
= null;
184 if (userProvider
.isHadoopSecurityEnabled()) {
185 userToken
= new Token(request
.getFsToken().getIdentifier().toByteArray(), request
.getFsToken()
186 .getPassword().toByteArray(), new Text(request
.getFsToken().getKind()), new Text(
187 request
.getFsToken().getService()));
189 final String bulkToken
= request
.getBulkToken();
190 User user
= getActiveUser();
191 final UserGroupInformation ugi
= user
.getUGI();
192 if (userProvider
.isHadoopSecurityEnabled()) {
194 Token tok
= TokenUtil
.obtainToken(conn
);
196 boolean b
= ugi
.addToken(tok
);
197 LOG
.debug("token added " + tok
+ " for user " + ugi
+ " return=" + b
);
199 } catch (IOException ioe
) {
200 LOG
.warn("unable to add token", ioe
);
203 if (userToken
!= null) {
204 ugi
.addToken(userToken
);
205 } else if (userProvider
.isHadoopSecurityEnabled()) {
206 //we allow this to pass through in "simple" security mode
207 //for mini cluster testing
208 throw new DoNotRetryIOException("User token cannot be null");
211 boolean bypass
= false;
212 if (region
.getCoprocessorHost() != null) {
213 bypass
= region
.getCoprocessorHost().preBulkLoadHFile(familyPaths
);
215 boolean loaded
= false;
216 Map
<byte[], List
<Path
>> map
= null;
220 // Get the target fs (HBase region server fs) delegation token
221 // Since we have checked the permission via 'preBulkLoadHFile', now let's give
222 // the 'request user' necessary token to operate on the target fs.
223 // After this point the 'doAs' user will hold two tokens, one for the source fs
224 // ('request user'), another for the target fs (HBase region server principal).
225 if (userProvider
.isHadoopSecurityEnabled()) {
226 FsDelegationToken targetfsDelegationToken
= new FsDelegationToken(userProvider
,"renewer");
227 targetfsDelegationToken
.acquireDelegationToken(fs
);
229 Token
<?
> targetFsToken
= targetfsDelegationToken
.getUserToken();
230 if (targetFsToken
!= null
231 && (userToken
== null || !targetFsToken
.getService().equals(userToken
.getService()))){
232 ugi
.addToken(targetFsToken
);
236 map
= ugi
.doAs(new PrivilegedAction
<Map
<byte[], List
<Path
>>>() {
238 public Map
<byte[], List
<Path
>> run() {
239 FileSystem fs
= null;
241 fs
= FileSystem
.get(conf
);
242 for(Pair
<byte[], String
> el
: familyPaths
) {
243 Path stageFamily
= new Path(bulkToken
, Bytes
.toString(el
.getFirst()));
244 if(!fs
.exists(stageFamily
)) {
245 fs
.mkdirs(stageFamily
);
246 fs
.setPermission(stageFamily
, PERM_ALL_ACCESS
);
249 //We call bulkLoadHFiles as requesting user
250 //To enable access prior to staging
251 return region
.bulkLoadHFiles(familyPaths
, true,
252 new SecureBulkLoadListener(fs
, bulkToken
, conf
), request
.getCopyFile());
253 } catch (Exception e
) {
254 LOG
.error("Failed to complete bulk load", e
);
264 if (region
.getCoprocessorHost() != null) {
265 region
.getCoprocessorHost().postBulkLoadHFile(familyPaths
, map
, loaded
);
271 private List
<BulkLoadObserver
> getBulkLoadObservers(Region region
) {
272 List
<BulkLoadObserver
> coprocessorList
=
273 region
.getCoprocessorHost().findCoprocessors(BulkLoadObserver
.class);
275 return coprocessorList
;
278 private Path
createStagingDir(Path baseDir
,
280 TableName tableName
) throws IOException
{
281 String tblName
= tableName
.getNameAsString().replace(":", "_");
282 String randomDir
= user
.getShortName()+"__"+ tblName
+"__"+
283 (new BigInteger(RANDOM_WIDTH
, random
).toString(RANDOM_RADIX
));
284 return createStagingDir(baseDir
, user
, randomDir
);
287 private Path
createStagingDir(Path baseDir
,
289 String randomDir
) throws IOException
{
290 Path p
= new Path(baseDir
, randomDir
);
291 fs
.mkdirs(p
, PERM_ALL_ACCESS
);
292 fs
.setPermission(p
, PERM_ALL_ACCESS
);
296 private User
getActiveUser() throws IOException
{
297 User user
= RpcServer
.getRequestUser();
299 // for non-rpc handling, fallback to system user
300 user
= userProvider
.getCurrent();
303 //this is for testing
304 if (userProvider
.isHadoopSecurityEnabled()
305 && "simple".equalsIgnoreCase(conf
.get(User
.HBASE_SECURITY_CONF_KEY
))) {
306 return User
.createUserForTesting(conf
, user
.getShortName(), new String
[]{});
312 private static class SecureBulkLoadListener
implements BulkLoadListener
{
314 private final FileSystem fs
;
315 private final String stagingDir
;
316 private final Configuration conf
;
318 private FileSystem srcFs
= null;
319 private Map
<String
, FsPermission
> origPermissions
= null;
321 public SecureBulkLoadListener(FileSystem fs
, String stagingDir
, Configuration conf
) {
323 this.stagingDir
= stagingDir
;
325 this.origPermissions
= new HashMap
<>();
329 public String
prepareBulkLoad(final byte[] family
, final String srcPath
, boolean copyFile
)
331 Path p
= new Path(srcPath
);
332 Path stageP
= new Path(stagingDir
, new Path(Bytes
.toString(family
), p
.getName()));
334 // In case of Replication for bulk load files, hfiles are already copied in staging directory
335 if (p
.equals(stageP
)) {
336 LOG
.debug(p
.getName()
337 + " is already available in staging directory. Skipping copy or rename.");
338 return stageP
.toString();
342 srcFs
= FileSystem
.get(p
.toUri(), conf
);
346 throw new IOException("Path does not reference a file: " + p
);
349 // Check to see if the source and target filesystems are the same
350 if (!FSHDFSUtils
.isSameHdfs(conf
, srcFs
, fs
)) {
351 LOG
.debug("Bulk-load file " + srcPath
+ " is on different filesystem than " +
352 "the destination filesystem. Copying file over to destination staging dir.");
353 FileUtil
.copy(srcFs
, p
, fs
, stageP
, false, conf
);
354 } else if (copyFile
) {
355 LOG
.debug("Bulk-load file " + srcPath
+ " is copied to destination staging dir.");
356 FileUtil
.copy(srcFs
, p
, fs
, stageP
, false, conf
);
358 LOG
.debug("Moving " + p
+ " to " + stageP
);
359 FileStatus origFileStatus
= fs
.getFileStatus(p
);
360 origPermissions
.put(srcPath
, origFileStatus
.getPermission());
361 if(!fs
.rename(p
, stageP
)) {
362 throw new IOException("Failed to move HFile: " + p
+ " to " + stageP
);
365 fs
.setPermission(stageP
, PERM_ALL_ACCESS
);
366 return stageP
.toString();
370 public void doneBulkLoad(byte[] family
, String srcPath
) throws IOException
{
371 LOG
.debug("Bulk Load done for: " + srcPath
);
375 public void failedBulkLoad(final byte[] family
, final String srcPath
) throws IOException
{
376 if (!FSHDFSUtils
.isSameHdfs(conf
, srcFs
, fs
)) {
377 // files are copied so no need to move them back
380 Path p
= new Path(srcPath
);
381 Path stageP
= new Path(stagingDir
,
382 new Path(Bytes
.toString(family
), p
.getName()));
384 // In case of Replication for bulk load files, hfiles are not renamed by end point during
385 // prepare stage, so no need of rename here again
386 if (p
.equals(stageP
)) {
387 LOG
.debug(p
.getName() + " is already available in source directory. Skipping rename.");
391 LOG
.debug("Moving " + stageP
+ " back to " + p
);
392 if(!fs
.rename(stageP
, p
))
393 throw new IOException("Failed to move HFile: " + stageP
+ " to " + p
);
395 // restore original permission
396 if (origPermissions
.containsKey(srcPath
)) {
397 fs
.setPermission(p
, origPermissions
.get(srcPath
));
399 LOG
.warn("Can't find previous permission for path=" + srcPath
);
404 * Check if the path is referencing a file.
405 * This is mainly needed to avoid symlinks.
407 * @return true if the p is a file
408 * @throws IOException
410 private boolean isFile(Path p
) throws IOException
{
411 FileStatus status
= srcFs
.getFileStatus(p
);
412 boolean isFile
= !status
.isDirectory();
414 isFile
= isFile
&& !(Boolean
)Methods
.call(FileStatus
.class, status
, "isSymlink", null, null);
415 } catch (Exception e
) {