2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with this
4 * work for additional information regarding copyright ownership. The ASF
5 * licenses this file to you under the Apache License, Version 2.0 (the
6 * "License"); you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations
17 package org
.apache
.hadoop
.hbase
.util
;
19 import java
.io
.IOException
;
20 import java
.security
.PrivilegedExceptionAction
;
21 import java
.util
.HashMap
;
24 import org
.apache
.commons
.logging
.Log
;
25 import org
.apache
.commons
.logging
.LogFactory
;
26 import org
.apache
.hadoop
.conf
.Configuration
;
27 import org
.apache
.hadoop
.hbase
.TableName
;
28 import org
.apache
.hadoop
.hbase
.client
.Get
;
29 import org
.apache
.hadoop
.hbase
.client
.Result
;
30 import org
.apache
.hadoop
.hbase
.client
.Table
;
31 import org
.apache
.hadoop
.hbase
.security
.User
;
32 import org
.apache
.hadoop
.hbase
.util
.test
.LoadTestDataGenerator
;
33 import org
.apache
.hadoop
.security
.UserGroupInformation
;
36 * A MultiThreadReader that helps to work with ACL
38 public class MultiThreadedReaderWithACL
extends MultiThreadedReader
{
39 private static final Log LOG
= LogFactory
.getLog(MultiThreadedReaderWithACL
.class);
41 private static final String COMMA
= ",";
43 * Maps user with Table instance. Because the table instance has to be created
44 * per user inorder to work in that user's context
46 private Map
<String
, Table
> userVsTable
= new HashMap
<>();
47 private Map
<String
, User
> users
= new HashMap
<>();
48 private String
[] userNames
;
50 public MultiThreadedReaderWithACL(LoadTestDataGenerator dataGen
, Configuration conf
,
51 TableName tableName
, double verifyPercent
, String userNames
) throws IOException
{
52 super(dataGen
, conf
, tableName
, verifyPercent
);
53 this.userNames
= userNames
.split(COMMA
);
57 protected void addReaderThreads(int numThreads
) throws IOException
{
58 for (int i
= 0; i
< numThreads
; ++i
) {
59 HBaseReaderThread reader
= new HBaseReaderThreadWithACL(i
);
64 public class HBaseReaderThreadWithACL
extends HBaseReaderThread
{
66 public HBaseReaderThreadWithACL(int readerId
) throws IOException
{
71 protected Table
createTable() throws IOException
{
76 protected void closeTable() {
77 for (Table table
: userVsTable
.values()) {
80 } catch (Exception e
) {
81 LOG
.error("Error while closing the table " + table
.getName(), e
);
87 public void queryKey(final Get get
, final boolean verify
, final long keyToRead
)
89 final String rowKey
= Bytes
.toString(get
.getRow());
92 final long start
= System
.nanoTime();
93 PrivilegedExceptionAction
<Object
> action
= new PrivilegedExceptionAction
<Object
>() {
95 public Object
run() throws Exception
{
96 Table localTable
= null;
99 int specialPermCellInsertionFactor
= Integer
.parseInt(dataGenerator
.getArgs()[2]);
100 int mod
= ((int) keyToRead
% userNames
.length
);
101 if (userVsTable
.get(userNames
[mod
]) == null) {
102 localTable
= connection
.getTable(tableName
);
103 userVsTable
.put(userNames
[mod
], localTable
);
104 result
= localTable
.get(get
);
106 localTable
= userVsTable
.get(userNames
[mod
]);
107 result
= localTable
.get(get
);
109 boolean isNullExpected
= ((((int) keyToRead
% specialPermCellInsertionFactor
)) == 0);
110 long end
= System
.nanoTime();
111 verifyResultsAndUpdateMetrics(verify
, get
, end
- start
, result
, localTable
, isNullExpected
);
112 } catch (IOException e
) {
113 recordFailure(keyToRead
);
118 if (userNames
!= null && userNames
.length
> 0) {
119 int mod
= ((int) keyToRead
% userNames
.length
);
121 UserGroupInformation realUserUgi
;
122 if(!users
.containsKey(userNames
[mod
])) {
123 if(User
.isHBaseSecurityEnabled(conf
)) {
124 realUserUgi
= LoadTestTool
.loginAndReturnUGI(conf
, userNames
[mod
]);
126 realUserUgi
= UserGroupInformation
.createRemoteUser(userNames
[mod
]);
128 user
= User
.create(realUserUgi
);
129 users
.put(userNames
[mod
], user
);
131 user
= users
.get(userNames
[mod
]);
135 } catch (Exception e
) {
136 recordFailure(keyToRead
);
141 private void recordFailure(final long keyToRead
) {
142 numReadFailures
.addAndGet(1);
143 LOG
.debug("[" + readerId
+ "] FAILED read, key = " + (keyToRead
+ "") + ", "
144 + "time from start: " + (System
.currentTimeMillis() - startTimeMs
) + " ms");