2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.util
;
20 import java
.io
.IOException
;
21 import java
.io
.PrintWriter
;
22 import java
.io
.StringWriter
;
23 import java
.security
.PrivilegedExceptionAction
;
24 import java
.util
.HashMap
;
27 import org
.apache
.hadoop
.conf
.Configuration
;
28 import org
.apache
.hadoop
.hbase
.TableName
;
29 import org
.apache
.hadoop
.hbase
.client
.Append
;
30 import org
.apache
.hadoop
.hbase
.client
.Delete
;
31 import org
.apache
.hadoop
.hbase
.client
.Get
;
32 import org
.apache
.hadoop
.hbase
.client
.Increment
;
33 import org
.apache
.hadoop
.hbase
.client
.Mutation
;
34 import org
.apache
.hadoop
.hbase
.client
.Put
;
35 import org
.apache
.hadoop
.hbase
.client
.Result
;
36 import org
.apache
.hadoop
.hbase
.client
.RetriesExhaustedWithDetailsException
;
37 import org
.apache
.hadoop
.hbase
.client
.Table
;
38 import org
.apache
.hadoop
.hbase
.security
.HBaseKerberosUtils
;
39 import org
.apache
.hadoop
.hbase
.security
.User
;
40 import org
.apache
.hadoop
.hbase
.util
.test
.LoadTestDataGenerator
;
41 import org
.apache
.hadoop
.security
.UserGroupInformation
;
42 import org
.apache
.hadoop
.util
.StringUtils
;
43 import org
.slf4j
.Logger
;
44 import org
.slf4j
.LoggerFactory
;
47 * A MultiThreadUpdater that helps to work with ACL
49 public class MultiThreadedUpdaterWithACL
extends MultiThreadedUpdater
{
50 private static final Logger LOG
= LoggerFactory
.getLogger(MultiThreadedUpdaterWithACL
.class);
51 private final static String COMMA
= ",";
52 private User userOwner
;
54 * Maps user with Table instance. Because the table instance has to be created
55 * per user inorder to work in that user's context
57 private Map
<String
, Table
> userVsTable
= new HashMap
<>();
58 private Map
<String
, User
> users
= new HashMap
<>();
59 private String
[] userNames
;
61 public MultiThreadedUpdaterWithACL(LoadTestDataGenerator dataGen
, Configuration conf
,
62 TableName tableName
, double updatePercent
, User userOwner
, String userNames
)
64 super(dataGen
, conf
, tableName
, updatePercent
);
65 this.userOwner
= userOwner
;
66 this.userNames
= userNames
.split(COMMA
);
70 protected void addUpdaterThreads(int numThreads
) throws IOException
{
71 for (int i
= 0; i
< numThreads
; ++i
) {
72 HBaseUpdaterThread updater
= new HBaseUpdaterThreadWithACL(i
);
73 updaters
.add(updater
);
77 public class HBaseUpdaterThreadWithACL
extends HBaseUpdaterThread
{
80 private MutateAccessAction mutateAction
= new MutateAccessAction();
82 public HBaseUpdaterThreadWithACL(int updaterId
) throws IOException
{
87 protected Table
createTable() throws IOException
{
92 protected void closeHTable() {
97 for (Table table
: userVsTable
.values()) {
100 } catch (Exception e
) {
101 LOG
.error("Error while closing the table " + table
.getName(), e
);
104 } catch (Exception e
) {
105 LOG
.error("Error while closing the HTable "+table
.getName(), e
);
110 protected Result
getRow(final Get get
, final long rowKeyBase
, final byte[] cf
) {
111 PrivilegedExceptionAction
<Object
> action
= new PrivilegedExceptionAction
<Object
>() {
114 public Object
run() throws Exception
{
116 Table localTable
= null;
118 int mod
= ((int) rowKeyBase
% userNames
.length
);
119 if (userVsTable
.get(userNames
[mod
]) == null) {
120 localTable
= connection
.getTable(tableName
);
121 userVsTable
.put(userNames
[mod
], localTable
);
122 res
= localTable
.get(get
);
124 localTable
= userVsTable
.get(userNames
[mod
]);
125 res
= localTable
.get(get
);
127 } catch (IOException ie
) {
128 LOG
.warn("Failed to get the row for key = [" + Bytes
.toString(get
.getRow()) +
129 "], column family = [" + Bytes
.toString(cf
) + "]", ie
);
135 if (userNames
!= null && userNames
.length
> 0) {
136 int mod
= ((int) rowKeyBase
% userNames
.length
);
138 UserGroupInformation realUserUgi
;
140 if (!users
.containsKey(userNames
[mod
])) {
141 if (User
.isHBaseSecurityEnabled(conf
)) {
142 realUserUgi
= HBaseKerberosUtils
.loginAndReturnUGI(conf
, userNames
[mod
]);
144 realUserUgi
= UserGroupInformation
.createRemoteUser(userNames
[mod
]);
146 user
= User
.create(realUserUgi
);
147 users
.put(userNames
[mod
], user
);
149 user
= users
.get(userNames
[mod
]);
151 Result result
= (Result
) user
.runAs(action
);
153 } catch (Exception ie
) {
154 LOG
.warn("Failed to get the row for key = [" + Bytes
.toString(get
.getRow()) +
155 "], column family = [" + Bytes
.toString(cf
) + "]", ie
);
158 // This means that no users were present
163 public void mutate(final Table table
, Mutation m
, final long keyBase
, final byte[] row
,
164 final byte[] cf
, final byte[] q
, final byte[] v
) {
165 final long start
= System
.currentTimeMillis();
167 m
= dataGenerator
.beforeMutate(keyBase
, m
);
168 mutateAction
.setMutation(m
);
169 mutateAction
.setCF(cf
);
170 mutateAction
.setRow(row
);
171 mutateAction
.setQualifier(q
);
172 mutateAction
.setValue(v
);
173 mutateAction
.setStartTime(start
);
174 mutateAction
.setKeyBase(keyBase
);
175 userOwner
.runAs(mutateAction
);
176 } catch (IOException e
) {
177 recordFailure(m
, keyBase
, start
, e
);
178 } catch (InterruptedException e
) {
179 failedKeySet
.add(keyBase
);
183 class MutateAccessAction
implements PrivilegedExceptionAction
<Object
> {
187 private long keyBase
;
193 public MutateAccessAction() {
197 public void setStartTime(final long start
) {
201 public void setMutation(final Mutation m
) {
205 public void setRow(final byte[] row
) {
209 public void setCF(final byte[] cf
) {
213 public void setQualifier(final byte[] q
) {
217 public void setValue(final byte[] v
) {
221 public void setKeyBase(final long keyBase
) {
222 this.keyBase
= keyBase
;
226 public Object
run() throws Exception
{
229 table
= connection
.getTable(tableName
);
231 if (m
instanceof Increment
) {
232 table
.increment((Increment
) m
);
233 } else if (m
instanceof Append
) {
234 table
.append((Append
) m
);
235 } else if (m
instanceof Put
) {
236 table
.checkAndMutate(row
, cf
).qualifier(q
).ifEquals(v
).thenPut((Put
) m
);
237 } else if (m
instanceof Delete
) {
238 table
.checkAndMutate(row
, cf
).qualifier(q
).ifEquals(v
).thenDelete((Delete
) m
);
240 throw new IllegalArgumentException("unsupported mutation "
241 + m
.getClass().getSimpleName());
243 totalOpTimeMs
.addAndGet(System
.currentTimeMillis() - start
);
244 } catch (IOException e
) {
245 recordFailure(m
, keyBase
, start
, e
);
251 private void recordFailure(final Mutation m
, final long keyBase
,
252 final long start
, IOException e
) {
253 failedKeySet
.add(keyBase
);
254 String exceptionInfo
;
255 if (e
instanceof RetriesExhaustedWithDetailsException
) {
256 RetriesExhaustedWithDetailsException aggEx
= (RetriesExhaustedWithDetailsException
) e
;
257 exceptionInfo
= aggEx
.getExhaustiveDescription();
259 StringWriter stackWriter
= new StringWriter();
260 PrintWriter pw
= new PrintWriter(stackWriter
);
261 e
.printStackTrace(pw
);
263 exceptionInfo
= StringUtils
.stringifyException(e
);
265 LOG
.error("Failed to mutate: " + keyBase
+ " after " + (System
.currentTimeMillis() - start
)
266 + "ms; region information: " + getRegionDebugInfoSafe(table
, m
.getRow()) + "; errors: "