3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
20 package org
.apache
.hadoop
.hbase
;
22 import java
.util
.concurrent
.Callable
;
23 import java
.util
.concurrent
.Delayed
;
24 import java
.util
.concurrent
.ExecutionException
;
25 import java
.util
.concurrent
.RunnableScheduledFuture
;
26 import java
.util
.concurrent
.ScheduledThreadPoolExecutor
;
27 import java
.util
.concurrent
.ThreadFactory
;
28 import java
.util
.concurrent
.ThreadLocalRandom
;
29 import java
.util
.concurrent
.TimeUnit
;
30 import java
.util
.concurrent
.TimeoutException
;
32 import org
.apache
.yetus
.audience
.InterfaceAudience
;
35 * ScheduledThreadPoolExecutor that will add some jitter to the RunnableScheduledFuture.getDelay.
37 * This will spread out things on a distributed cluster.
39 @InterfaceAudience.Private
40 public class JitterScheduledThreadPoolExecutorImpl
extends ScheduledThreadPoolExecutor
{
41 private final double spread
;
45 * @param spread The percent up and down that RunnableScheduledFuture.getDelay should be jittered.
47 public JitterScheduledThreadPoolExecutorImpl(int corePoolSize
,
48 ThreadFactory threadFactory
,
50 super(corePoolSize
, threadFactory
);
55 protected <V
> java
.util
.concurrent
.RunnableScheduledFuture
<V
> decorateTask(
56 Runnable runnable
, java
.util
.concurrent
.RunnableScheduledFuture
<V
> task
) {
57 return new JitteredRunnableScheduledFuture
<>(task
);
61 protected <V
> java
.util
.concurrent
.RunnableScheduledFuture
<V
> decorateTask(
62 Callable
<V
> callable
, java
.util
.concurrent
.RunnableScheduledFuture
<V
> task
) {
63 return new JitteredRunnableScheduledFuture
<>(task
);
67 * Class that basically just defers to the wrapped future.
68 * The only exception is getDelay
70 protected class JitteredRunnableScheduledFuture
<V
> implements RunnableScheduledFuture
<V
> {
71 private final RunnableScheduledFuture
<V
> wrapped
;
72 JitteredRunnableScheduledFuture(RunnableScheduledFuture
<V
> wrapped
) {
73 this.wrapped
= wrapped
;
77 public boolean isPeriodic() {
78 return wrapped
.isPeriodic();
82 public long getDelay(TimeUnit unit
) {
83 long baseDelay
= wrapped
.getDelay(unit
);
84 long spreadTime
= (long) (baseDelay
* spread
);
85 long delay
= spreadTime
<= 0 ? baseDelay
86 : baseDelay
+ ThreadLocalRandom
.current().nextLong(-spreadTime
, spreadTime
);
87 // Ensure that we don't roll over for nanoseconds.
88 return (delay
< 0) ? baseDelay
: delay
;
92 public int compareTo(Delayed o
) {
93 return wrapped
.compareTo(o
);
97 public boolean equals(Object obj
) {
101 return obj
instanceof Delayed?
compareTo((Delayed
)obj
) == 0: false;
105 public int hashCode() {
106 return this.wrapped
.hashCode();
115 public boolean cancel(boolean mayInterruptIfRunning
) {
116 return wrapped
.cancel(mayInterruptIfRunning
);
120 public boolean isCancelled() {
121 return wrapped
.isCancelled();
125 public boolean isDone() {
126 return wrapped
.isDone();
130 public V
get() throws InterruptedException
, ExecutionException
{
131 return wrapped
.get();
135 public V
get(long timeout
,
136 TimeUnit unit
) throws InterruptedException
, ExecutionException
, TimeoutException
{
137 return wrapped
.get(timeout
, unit
);