HBASE-26921 Rewrite the counting cells part in TestMultiVersions (#4316)
[hbase.git] / hbase-procedure / src / main / java / org / apache / hadoop / hbase / procedure2 / ProcedureUtil.java
blobc557c2021b4000b3ac22f03ad6800db014bec91a
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.procedure2;
20 import java.io.IOException;
21 import java.io.InputStream;
22 import java.lang.reflect.Constructor;
23 import java.lang.reflect.Modifier;
24 import java.util.concurrent.TimeUnit;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.hbase.HConstants;
27 import org.apache.hadoop.hbase.util.NonceKey;
28 import org.apache.hadoop.hbase.util.RetryCounter;
29 import org.apache.hadoop.hbase.util.RetryCounter.ExponentialBackoffPolicyWithLimit;
30 import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
31 import org.apache.yetus.audience.InterfaceAudience;
33 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
34 import org.apache.hbase.thirdparty.com.google.protobuf.Any;
35 import org.apache.hbase.thirdparty.com.google.protobuf.Internal;
36 import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
37 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
38 import org.apache.hbase.thirdparty.com.google.protobuf.Parser;
39 import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
41 import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos;
42 import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
44 /**
45 * Helper to convert to/from ProcedureProtos
47 @InterfaceAudience.Private
48 public final class ProcedureUtil {
49 private ProcedureUtil() { }
51 // ==========================================================================
52 // Reflection helpers to create/validate a Procedure object
53 // ==========================================================================
54 private static Procedure<?> newProcedure(String className) throws BadProcedureException {
55 try {
56 Class<?> clazz = Class.forName(className);
57 if (!Modifier.isPublic(clazz.getModifiers())) {
58 throw new Exception("the " + clazz + " class is not public");
61 @SuppressWarnings("rawtypes")
62 Constructor<? extends Procedure> ctor = clazz.asSubclass(Procedure.class).getConstructor();
63 assert ctor != null : "no constructor found";
64 if (!Modifier.isPublic(ctor.getModifiers())) {
65 throw new Exception("the " + clazz + " constructor is not public");
67 return ctor.newInstance();
68 } catch (Exception e) {
69 throw new BadProcedureException(
70 "The procedure class " + className + " must be accessible and have an empty constructor",
71 e);
75 static void validateClass(Procedure<?> proc) throws BadProcedureException {
76 try {
77 Class<?> clazz = proc.getClass();
78 if (!Modifier.isPublic(clazz.getModifiers())) {
79 throw new Exception("the " + clazz + " class is not public");
82 Constructor<?> ctor = clazz.getConstructor();
83 assert ctor != null;
84 if (!Modifier.isPublic(ctor.getModifiers())) {
85 throw new Exception("the " + clazz + " constructor is not public");
87 } catch (Exception e) {
88 throw new BadProcedureException("The procedure class " + proc.getClass().getName() +
89 " must be accessible and have an empty constructor", e);
93 // ==========================================================================
94 // convert to and from Procedure object
95 // ==========================================================================
97 /**
98 * A serializer for our Procedures. Instead of the previous serializer, it
99 * uses the stateMessage list to store the internal state of the Procedures.
101 private static class StateSerializer implements ProcedureStateSerializer {
102 private final ProcedureProtos.Procedure.Builder builder;
103 private int deserializeIndex;
105 public StateSerializer(ProcedureProtos.Procedure.Builder builder) {
106 this.builder = builder;
109 @Override
110 public void serialize(Message message) throws IOException {
111 Any packedMessage = Any.pack(message);
112 builder.addStateMessage(packedMessage);
115 @Override
116 public <M extends Message> M deserialize(Class<M> clazz)
117 throws IOException {
118 if (deserializeIndex >= builder.getStateMessageCount()) {
119 throw new IOException("Invalid state message index: " + deserializeIndex);
122 try {
123 Any packedMessage = builder.getStateMessage(deserializeIndex++);
124 return packedMessage.unpack(clazz);
125 } catch (InvalidProtocolBufferException e) {
126 throw e.unwrapIOException();
132 * A serializer (deserializer) for those Procedures which were serialized
133 * before this patch. It deserializes the old, binary stateData field.
135 private static class CompatStateSerializer implements ProcedureStateSerializer {
136 private InputStream inputStream;
138 public CompatStateSerializer(InputStream inputStream) {
139 this.inputStream = inputStream;
142 @Override
143 public void serialize(Message message) throws IOException {
144 throw new UnsupportedOperationException();
147 @SuppressWarnings("unchecked")
148 @Override
149 public <M extends Message> M deserialize(Class<M> clazz)
150 throws IOException {
151 Parser<M> parser = (Parser<M>) Internal.getDefaultInstance(clazz).getParserForType();
152 try {
153 return parser.parseDelimitedFrom(inputStream);
154 } catch (InvalidProtocolBufferException e) {
155 throw e.unwrapIOException();
161 * Helper to convert the procedure to protobuf.
162 * <p/>
163 * Used by ProcedureStore implementations.
165 public static ProcedureProtos.Procedure convertToProtoProcedure(Procedure<?> proc)
166 throws IOException {
167 Preconditions.checkArgument(proc != null);
168 validateClass(proc);
170 final ProcedureProtos.Procedure.Builder builder = ProcedureProtos.Procedure.newBuilder()
171 .setClassName(proc.getClass().getName())
172 .setProcId(proc.getProcId())
173 .setState(proc.getState())
174 .setSubmittedTime(proc.getSubmittedTime())
175 .setLastUpdate(proc.getLastUpdate());
177 if (proc.hasParent()) {
178 builder.setParentId(proc.getParentProcId());
181 if (proc.hasTimeout()) {
182 builder.setTimeout(proc.getTimeout());
185 if (proc.hasOwner()) {
186 builder.setOwner(proc.getOwner());
189 final int[] stackIds = proc.getStackIndexes();
190 if (stackIds != null) {
191 for (int i = 0; i < stackIds.length; ++i) {
192 builder.addStackId(stackIds[i]);
196 if (proc.hasException()) {
197 RemoteProcedureException exception = proc.getException();
198 builder.setException(
199 RemoteProcedureException.toProto(exception.getSource(), exception.getCause()));
202 final byte[] result = proc.getResult();
203 if (result != null) {
204 builder.setResult(UnsafeByteOperations.unsafeWrap(result));
207 ProcedureStateSerializer serializer = new StateSerializer(builder);
208 proc.serializeStateData(serializer);
210 if (proc.getNonceKey() != null) {
211 builder.setNonceGroup(proc.getNonceKey().getNonceGroup());
212 builder.setNonce(proc.getNonceKey().getNonce());
215 if (proc.hasLock()) {
216 builder.setLocked(true);
219 if (proc.isBypass()) {
220 builder.setBypass(true);
222 return builder.build();
226 * Helper to convert the protobuf procedure.
227 * <p/>
228 * Used by ProcedureStore implementations.
229 * <p/>
230 * TODO: OPTIMIZATION: some of the field never change during the execution (e.g. className,
231 * procId, parentId, ...). We can split in 'data' and 'state', and the store may take advantage of
232 * it by storing the data only on insert().
234 public static Procedure<?> convertToProcedure(ProcedureProtos.Procedure proto)
235 throws IOException {
236 // Procedure from class name
237 Procedure<?> proc = newProcedure(proto.getClassName());
239 // set fields
240 proc.setProcId(proto.getProcId());
241 proc.setState(proto.getState());
242 proc.setSubmittedTime(proto.getSubmittedTime());
243 proc.setLastUpdate(proto.getLastUpdate());
245 if (proto.hasParentId()) {
246 proc.setParentProcId(proto.getParentId());
249 if (proto.hasOwner()) {
250 proc.setOwner(proto.getOwner());
253 if (proto.hasTimeout()) {
254 proc.setTimeout(proto.getTimeout());
257 if (proto.getStackIdCount() > 0) {
258 proc.setStackIndexes(proto.getStackIdList());
261 if (proto.hasException()) {
262 assert proc.getState() == ProcedureProtos.ProcedureState.FAILED ||
263 proc.getState() == ProcedureProtos.ProcedureState.ROLLEDBACK :
264 "The procedure must be failed (waiting to rollback) or rolledback";
265 proc.setFailure(RemoteProcedureException.fromProto(proto.getException()));
268 if (proto.hasResult()) {
269 proc.setResult(proto.getResult().toByteArray());
272 if (proto.getNonce() != HConstants.NO_NONCE) {
273 proc.setNonceKey(new NonceKey(proto.getNonceGroup(), proto.getNonce()));
276 if (proto.getLocked()) {
277 proc.lockedWhenLoading();
280 if (proto.getBypass()) {
281 proc.bypass(null);
284 ProcedureStateSerializer serializer = null;
286 if (proto.getStateMessageCount() > 0) {
287 serializer = new StateSerializer(proto.toBuilder());
288 } else if (proto.hasStateData()) {
289 InputStream inputStream = proto.getStateData().newInput();
290 serializer = new CompatStateSerializer(inputStream);
293 if (serializer != null) {
294 proc.deserializeStateData(serializer);
297 return proc;
300 // ==========================================================================
301 // convert from LockedResource object
302 // ==========================================================================
304 public static LockServiceProtos.LockedResourceType convertToProtoResourceType(
305 LockedResourceType resourceType) {
306 return LockServiceProtos.LockedResourceType.valueOf(resourceType.name());
309 public static LockServiceProtos.LockType convertToProtoLockType(LockType lockType) {
310 return LockServiceProtos.LockType.valueOf(lockType.name());
313 public static LockServiceProtos.LockedResource convertToProtoLockedResource(
314 LockedResource lockedResource) throws IOException {
315 LockServiceProtos.LockedResource.Builder builder =
316 LockServiceProtos.LockedResource.newBuilder();
318 builder
319 .setResourceType(convertToProtoResourceType(lockedResource.getResourceType()))
320 .setResourceName(lockedResource.getResourceName())
321 .setLockType(convertToProtoLockType(lockedResource.getLockType()));
323 Procedure<?> exclusiveLockOwnerProcedure = lockedResource.getExclusiveLockOwnerProcedure();
325 if (exclusiveLockOwnerProcedure != null) {
326 ProcedureProtos.Procedure exclusiveLockOwnerProcedureProto =
327 convertToProtoProcedure(exclusiveLockOwnerProcedure);
328 builder.setExclusiveLockOwnerProcedure(exclusiveLockOwnerProcedureProto);
331 builder.setSharedLockCount(lockedResource.getSharedLockCount());
333 for (Procedure<?> waitingProcedure : lockedResource.getWaitingProcedures()) {
334 ProcedureProtos.Procedure waitingProcedureProto =
335 convertToProtoProcedure(waitingProcedure);
336 builder.addWaitingProcedures(waitingProcedureProto);
339 return builder.build();
342 public static final String PROCEDURE_RETRY_SLEEP_INTERVAL_MS =
343 "hbase.procedure.retry.sleep.interval.ms";
345 // default to 1 second
346 public static final long DEFAULT_PROCEDURE_RETRY_SLEEP_INTERVAL_MS = 1000;
348 public static final String PROCEDURE_RETRY_MAX_SLEEP_TIME_MS =
349 "hbase.procedure.retry.max.sleep.time.ms";
351 // default to 10 minutes
352 public static final long DEFAULT_PROCEDURE_RETRY_MAX_SLEEP_TIME_MS =
353 TimeUnit.MINUTES.toMillis(10);
356 * Get a retry counter for getting the backoff time. We will use the
357 * {@link ExponentialBackoffPolicyWithLimit} policy, and the base unit is 1 second, max sleep time
358 * is 10 minutes by default.
359 * <p/>
360 * For UTs, you can set the {@link #PROCEDURE_RETRY_SLEEP_INTERVAL_MS} and
361 * {@link #PROCEDURE_RETRY_MAX_SLEEP_TIME_MS} to make more frequent retry so your UT will not
362 * timeout.
364 public static RetryCounter createRetryCounter(Configuration conf) {
365 long sleepIntervalMs =
366 conf.getLong(PROCEDURE_RETRY_SLEEP_INTERVAL_MS, DEFAULT_PROCEDURE_RETRY_SLEEP_INTERVAL_MS);
367 long maxSleepTimeMs =
368 conf.getLong(PROCEDURE_RETRY_MAX_SLEEP_TIME_MS, DEFAULT_PROCEDURE_RETRY_MAX_SLEEP_TIME_MS);
369 RetryConfig retryConfig = new RetryConfig().setSleepInterval(sleepIntervalMs)
370 .setMaxSleepTime(maxSleepTimeMs).setBackoffPolicy(new ExponentialBackoffPolicyWithLimit());
371 return new RetryCounter(retryConfig);
374 public static boolean isFinished(ProcedureProtos.Procedure proc) {
375 if (!proc.hasParentId()) {
376 switch (proc.getState()) {
377 case ROLLEDBACK:
378 case SUCCESS:
379 return true;
380 default:
381 break;
384 return false;