2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.procedure2
;
20 import java
.io
.IOException
;
21 import java
.io
.InputStream
;
22 import java
.lang
.reflect
.Constructor
;
23 import java
.lang
.reflect
.Modifier
;
24 import java
.util
.concurrent
.TimeUnit
;
25 import org
.apache
.hadoop
.conf
.Configuration
;
26 import org
.apache
.hadoop
.hbase
.HConstants
;
27 import org
.apache
.hadoop
.hbase
.util
.NonceKey
;
28 import org
.apache
.hadoop
.hbase
.util
.RetryCounter
;
29 import org
.apache
.hadoop
.hbase
.util
.RetryCounter
.ExponentialBackoffPolicyWithLimit
;
30 import org
.apache
.hadoop
.hbase
.util
.RetryCounter
.RetryConfig
;
31 import org
.apache
.yetus
.audience
.InterfaceAudience
;
33 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Preconditions
;
34 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Any
;
35 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Internal
;
36 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.InvalidProtocolBufferException
;
37 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Message
;
38 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Parser
;
39 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.UnsafeByteOperations
;
41 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.LockServiceProtos
;
42 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ProcedureProtos
;
45 * Helper to convert to/from ProcedureProtos
47 @InterfaceAudience.Private
48 public final class ProcedureUtil
{
49 private ProcedureUtil() { }
51 // ==========================================================================
52 // Reflection helpers to create/validate a Procedure object
53 // ==========================================================================
54 private static Procedure
<?
> newProcedure(String className
) throws BadProcedureException
{
56 Class
<?
> clazz
= Class
.forName(className
);
57 if (!Modifier
.isPublic(clazz
.getModifiers())) {
58 throw new Exception("the " + clazz
+ " class is not public");
61 @SuppressWarnings("rawtypes")
62 Constructor
<?
extends Procedure
> ctor
= clazz
.asSubclass(Procedure
.class).getConstructor();
63 assert ctor
!= null : "no constructor found";
64 if (!Modifier
.isPublic(ctor
.getModifiers())) {
65 throw new Exception("the " + clazz
+ " constructor is not public");
67 return ctor
.newInstance();
68 } catch (Exception e
) {
69 throw new BadProcedureException(
70 "The procedure class " + className
+ " must be accessible and have an empty constructor",
75 static void validateClass(Procedure
<?
> proc
) throws BadProcedureException
{
77 Class
<?
> clazz
= proc
.getClass();
78 if (!Modifier
.isPublic(clazz
.getModifiers())) {
79 throw new Exception("the " + clazz
+ " class is not public");
82 Constructor
<?
> ctor
= clazz
.getConstructor();
84 if (!Modifier
.isPublic(ctor
.getModifiers())) {
85 throw new Exception("the " + clazz
+ " constructor is not public");
87 } catch (Exception e
) {
88 throw new BadProcedureException("The procedure class " + proc
.getClass().getName() +
89 " must be accessible and have an empty constructor", e
);
93 // ==========================================================================
94 // convert to and from Procedure object
95 // ==========================================================================
98 * A serializer for our Procedures. Instead of the previous serializer, it
99 * uses the stateMessage list to store the internal state of the Procedures.
101 private static class StateSerializer
implements ProcedureStateSerializer
{
102 private final ProcedureProtos
.Procedure
.Builder builder
;
103 private int deserializeIndex
;
105 public StateSerializer(ProcedureProtos
.Procedure
.Builder builder
) {
106 this.builder
= builder
;
110 public void serialize(Message message
) throws IOException
{
111 Any packedMessage
= Any
.pack(message
);
112 builder
.addStateMessage(packedMessage
);
116 public <M
extends Message
> M
deserialize(Class
<M
> clazz
)
118 if (deserializeIndex
>= builder
.getStateMessageCount()) {
119 throw new IOException("Invalid state message index: " + deserializeIndex
);
123 Any packedMessage
= builder
.getStateMessage(deserializeIndex
++);
124 return packedMessage
.unpack(clazz
);
125 } catch (InvalidProtocolBufferException e
) {
126 throw e
.unwrapIOException();
132 * A serializer (deserializer) for those Procedures which were serialized
133 * before this patch. It deserializes the old, binary stateData field.
135 private static class CompatStateSerializer
implements ProcedureStateSerializer
{
136 private InputStream inputStream
;
138 public CompatStateSerializer(InputStream inputStream
) {
139 this.inputStream
= inputStream
;
143 public void serialize(Message message
) throws IOException
{
144 throw new UnsupportedOperationException();
147 @SuppressWarnings("unchecked")
149 public <M
extends Message
> M
deserialize(Class
<M
> clazz
)
151 Parser
<M
> parser
= (Parser
<M
>) Internal
.getDefaultInstance(clazz
).getParserForType();
153 return parser
.parseDelimitedFrom(inputStream
);
154 } catch (InvalidProtocolBufferException e
) {
155 throw e
.unwrapIOException();
161 * Helper to convert the procedure to protobuf.
163 * Used by ProcedureStore implementations.
165 public static ProcedureProtos
.Procedure
convertToProtoProcedure(Procedure
<?
> proc
)
167 Preconditions
.checkArgument(proc
!= null);
170 final ProcedureProtos
.Procedure
.Builder builder
= ProcedureProtos
.Procedure
.newBuilder()
171 .setClassName(proc
.getClass().getName())
172 .setProcId(proc
.getProcId())
173 .setState(proc
.getState())
174 .setSubmittedTime(proc
.getSubmittedTime())
175 .setLastUpdate(proc
.getLastUpdate());
177 if (proc
.hasParent()) {
178 builder
.setParentId(proc
.getParentProcId());
181 if (proc
.hasTimeout()) {
182 builder
.setTimeout(proc
.getTimeout());
185 if (proc
.hasOwner()) {
186 builder
.setOwner(proc
.getOwner());
189 final int[] stackIds
= proc
.getStackIndexes();
190 if (stackIds
!= null) {
191 for (int i
= 0; i
< stackIds
.length
; ++i
) {
192 builder
.addStackId(stackIds
[i
]);
196 if (proc
.hasException()) {
197 RemoteProcedureException exception
= proc
.getException();
198 builder
.setException(
199 RemoteProcedureException
.toProto(exception
.getSource(), exception
.getCause()));
202 final byte[] result
= proc
.getResult();
203 if (result
!= null) {
204 builder
.setResult(UnsafeByteOperations
.unsafeWrap(result
));
207 ProcedureStateSerializer serializer
= new StateSerializer(builder
);
208 proc
.serializeStateData(serializer
);
210 if (proc
.getNonceKey() != null) {
211 builder
.setNonceGroup(proc
.getNonceKey().getNonceGroup());
212 builder
.setNonce(proc
.getNonceKey().getNonce());
215 if (proc
.hasLock()) {
216 builder
.setLocked(true);
219 if (proc
.isBypass()) {
220 builder
.setBypass(true);
222 return builder
.build();
226 * Helper to convert the protobuf procedure.
228 * Used by ProcedureStore implementations.
230 * TODO: OPTIMIZATION: some of the field never change during the execution (e.g. className,
231 * procId, parentId, ...). We can split in 'data' and 'state', and the store may take advantage of
232 * it by storing the data only on insert().
234 public static Procedure
<?
> convertToProcedure(ProcedureProtos
.Procedure proto
)
236 // Procedure from class name
237 Procedure
<?
> proc
= newProcedure(proto
.getClassName());
240 proc
.setProcId(proto
.getProcId());
241 proc
.setState(proto
.getState());
242 proc
.setSubmittedTime(proto
.getSubmittedTime());
243 proc
.setLastUpdate(proto
.getLastUpdate());
245 if (proto
.hasParentId()) {
246 proc
.setParentProcId(proto
.getParentId());
249 if (proto
.hasOwner()) {
250 proc
.setOwner(proto
.getOwner());
253 if (proto
.hasTimeout()) {
254 proc
.setTimeout(proto
.getTimeout());
257 if (proto
.getStackIdCount() > 0) {
258 proc
.setStackIndexes(proto
.getStackIdList());
261 if (proto
.hasException()) {
262 assert proc
.getState() == ProcedureProtos
.ProcedureState
.FAILED
||
263 proc
.getState() == ProcedureProtos
.ProcedureState
.ROLLEDBACK
:
264 "The procedure must be failed (waiting to rollback) or rolledback";
265 proc
.setFailure(RemoteProcedureException
.fromProto(proto
.getException()));
268 if (proto
.hasResult()) {
269 proc
.setResult(proto
.getResult().toByteArray());
272 if (proto
.getNonce() != HConstants
.NO_NONCE
) {
273 proc
.setNonceKey(new NonceKey(proto
.getNonceGroup(), proto
.getNonce()));
276 if (proto
.getLocked()) {
277 proc
.lockedWhenLoading();
280 if (proto
.getBypass()) {
284 ProcedureStateSerializer serializer
= null;
286 if (proto
.getStateMessageCount() > 0) {
287 serializer
= new StateSerializer(proto
.toBuilder());
288 } else if (proto
.hasStateData()) {
289 InputStream inputStream
= proto
.getStateData().newInput();
290 serializer
= new CompatStateSerializer(inputStream
);
293 if (serializer
!= null) {
294 proc
.deserializeStateData(serializer
);
300 // ==========================================================================
301 // convert from LockedResource object
302 // ==========================================================================
304 public static LockServiceProtos
.LockedResourceType
convertToProtoResourceType(
305 LockedResourceType resourceType
) {
306 return LockServiceProtos
.LockedResourceType
.valueOf(resourceType
.name());
309 public static LockServiceProtos
.LockType
convertToProtoLockType(LockType lockType
) {
310 return LockServiceProtos
.LockType
.valueOf(lockType
.name());
313 public static LockServiceProtos
.LockedResource
convertToProtoLockedResource(
314 LockedResource lockedResource
) throws IOException
{
315 LockServiceProtos
.LockedResource
.Builder builder
=
316 LockServiceProtos
.LockedResource
.newBuilder();
319 .setResourceType(convertToProtoResourceType(lockedResource
.getResourceType()))
320 .setResourceName(lockedResource
.getResourceName())
321 .setLockType(convertToProtoLockType(lockedResource
.getLockType()));
323 Procedure
<?
> exclusiveLockOwnerProcedure
= lockedResource
.getExclusiveLockOwnerProcedure();
325 if (exclusiveLockOwnerProcedure
!= null) {
326 ProcedureProtos
.Procedure exclusiveLockOwnerProcedureProto
=
327 convertToProtoProcedure(exclusiveLockOwnerProcedure
);
328 builder
.setExclusiveLockOwnerProcedure(exclusiveLockOwnerProcedureProto
);
331 builder
.setSharedLockCount(lockedResource
.getSharedLockCount());
333 for (Procedure
<?
> waitingProcedure
: lockedResource
.getWaitingProcedures()) {
334 ProcedureProtos
.Procedure waitingProcedureProto
=
335 convertToProtoProcedure(waitingProcedure
);
336 builder
.addWaitingProcedures(waitingProcedureProto
);
339 return builder
.build();
342 public static final String PROCEDURE_RETRY_SLEEP_INTERVAL_MS
=
343 "hbase.procedure.retry.sleep.interval.ms";
345 // default to 1 second
346 public static final long DEFAULT_PROCEDURE_RETRY_SLEEP_INTERVAL_MS
= 1000;
348 public static final String PROCEDURE_RETRY_MAX_SLEEP_TIME_MS
=
349 "hbase.procedure.retry.max.sleep.time.ms";
351 // default to 10 minutes
352 public static final long DEFAULT_PROCEDURE_RETRY_MAX_SLEEP_TIME_MS
=
353 TimeUnit
.MINUTES
.toMillis(10);
356 * Get a retry counter for getting the backoff time. We will use the
357 * {@link ExponentialBackoffPolicyWithLimit} policy, and the base unit is 1 second, max sleep time
358 * is 10 minutes by default.
360 * For UTs, you can set the {@link #PROCEDURE_RETRY_SLEEP_INTERVAL_MS} and
361 * {@link #PROCEDURE_RETRY_MAX_SLEEP_TIME_MS} to make more frequent retry so your UT will not
364 public static RetryCounter
createRetryCounter(Configuration conf
) {
365 long sleepIntervalMs
=
366 conf
.getLong(PROCEDURE_RETRY_SLEEP_INTERVAL_MS
, DEFAULT_PROCEDURE_RETRY_SLEEP_INTERVAL_MS
);
367 long maxSleepTimeMs
=
368 conf
.getLong(PROCEDURE_RETRY_MAX_SLEEP_TIME_MS
, DEFAULT_PROCEDURE_RETRY_MAX_SLEEP_TIME_MS
);
369 RetryConfig retryConfig
= new RetryConfig().setSleepInterval(sleepIntervalMs
)
370 .setMaxSleepTime(maxSleepTimeMs
).setBackoffPolicy(new ExponentialBackoffPolicyWithLimit());
371 return new RetryCounter(retryConfig
);
374 public static boolean isFinished(ProcedureProtos
.Procedure proc
) {
375 if (!proc
.hasParentId()) {
376 switch (proc
.getState()) {