2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.procedure
;
20 import java
.io
.Closeable
;
21 import java
.io
.IOException
;
22 import java
.util
.ArrayList
;
23 import java
.util
.List
;
24 import java
.util
.concurrent
.Callable
;
25 import java
.util
.concurrent
.ExecutionException
;
26 import java
.util
.concurrent
.ExecutorCompletionService
;
27 import java
.util
.concurrent
.ExecutorService
;
28 import java
.util
.concurrent
.Executors
;
29 import java
.util
.concurrent
.Future
;
30 import java
.util
.concurrent
.ThreadPoolExecutor
;
32 import org
.apache
.hadoop
.conf
.Configuration
;
33 import org
.apache
.hadoop
.hbase
.Abortable
;
34 import org
.apache
.hadoop
.hbase
.regionserver
.RegionServerServices
;
35 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
36 import org
.apache
.hadoop
.hbase
.util
.Threads
;
37 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKWatcher
;
38 import org
.apache
.hadoop
.hbase
.errorhandling
.ForeignException
;
39 import org
.apache
.hadoop
.hbase
.errorhandling
.ForeignExceptionDispatcher
;
40 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.util
.concurrent
.ThreadFactoryBuilder
;
41 import org
.apache
.zookeeper
.KeeperException
;
42 import org
.slf4j
.Logger
;
43 import org
.slf4j
.LoggerFactory
;
45 public class SimpleRSProcedureManager
extends RegionServerProcedureManager
{
47 private static final Logger LOG
= LoggerFactory
.getLogger(SimpleRSProcedureManager
.class);
49 private RegionServerServices rss
;
50 private ProcedureMemberRpcs memberRpcs
;
51 private ProcedureMember member
;
54 public void initialize(RegionServerServices rss
) throws KeeperException
{
56 ZKWatcher zkw
= rss
.getZooKeeper();
57 this.memberRpcs
= new ZKProcedureMemberRpcs(zkw
, getProcedureSignature());
59 ThreadPoolExecutor pool
=
60 ProcedureMember
.defaultPool(rss
.getServerName().toString(), 1);
61 this.member
= new ProcedureMember(memberRpcs
, pool
, new SimleSubprocedureBuilder());
62 LOG
.info("Initialized: " + rss
.getServerName().toString());
67 this.memberRpcs
.start(rss
.getServerName().toString(), member
);
72 public void stop(boolean force
) throws IOException
{
73 LOG
.info("stop: " + force
);
77 this.memberRpcs
.close();
82 public String
getProcedureSignature() {
83 return SimpleMasterProcedureManager
.SIMPLE_SIGNATURE
;
87 * If in a running state, creates the specified subprocedure for handling a procedure.
88 * @return Subprocedure to submit to the ProcedureMember.
90 public Subprocedure
buildSubprocedure(String name
) {
92 // don't run a procedure if the parent is stop(ping)
93 if (rss
.isStopping() || rss
.isStopped()) {
94 throw new IllegalStateException("Can't start procedure on RS: " + rss
.getServerName()
95 + ", because stopping/stopped!");
98 LOG
.info("Attempting to run a procedure.");
99 ForeignExceptionDispatcher errorDispatcher
= new ForeignExceptionDispatcher();
100 Configuration conf
= rss
.getConfiguration();
102 SimpleSubprocedurePool taskManager
=
103 new SimpleSubprocedurePool(rss
.getServerName().toString(), conf
);
104 return new SimpleSubprocedure(rss
, member
, errorDispatcher
, taskManager
, name
);
108 * Build the actual procedure runner that will do all the 'hard' work
110 public class SimleSubprocedureBuilder
implements SubprocedureFactory
{
113 public Subprocedure
buildSubprocedure(String name
, byte[] data
) {
114 LOG
.info("Building procedure: " + name
);
115 return SimpleRSProcedureManager
.this.buildSubprocedure(name
);
119 public static class SimpleSubprocedurePool
implements Closeable
, Abortable
{
121 private final ExecutorCompletionService
<Void
> taskPool
;
122 private final ExecutorService executor
;
123 private volatile boolean aborted
;
124 private final List
<Future
<Void
>> futures
= new ArrayList
<>();
125 private final String name
;
127 public SimpleSubprocedurePool(String name
, Configuration conf
) {
129 executor
= Executors
.newSingleThreadExecutor(
130 new ThreadFactoryBuilder().setNameFormat("rs(" + name
+ ")-procedure-pool-%d")
131 .setDaemon(true).setUncaughtExceptionHandler(Threads
.LOGGING_EXCEPTION_HANDLER
).build());
132 taskPool
= new ExecutorCompletionService
<>(executor
);
136 * Submit a task to the pool.
138 public void submitTask(final Callable
<Void
> task
) {
139 Future
<Void
> f
= this.taskPool
.submit(task
);
144 * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}
146 * @return <tt>true</tt> on success, <tt>false</tt> otherwise
147 * @throws ForeignException
149 public boolean waitForOutstandingTasks() throws ForeignException
{
150 LOG
.debug("Waiting for procedure to finish.");
153 for (Future
<Void
> f
: futures
) {
157 } catch (InterruptedException e
) {
158 if (aborted
) throw new ForeignException(
159 "Interrupted and found to be aborted while waiting for tasks!", e
);
160 Thread
.currentThread().interrupt();
161 } catch (ExecutionException e
) {
162 if (e
.getCause() instanceof ForeignException
) {
163 throw (ForeignException
) e
.getCause();
165 throw new ForeignException(name
, e
.getCause());
167 // close off remaining tasks
168 for (Future
<Void
> f
: futures
) {
178 * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly
182 public void close() {
187 public void abort(String why
, Throwable e
) {
188 if (this.aborted
) return;
191 LOG
.warn("Aborting because: " + why
, e
);
192 this.executor
.shutdownNow();
196 public boolean isAborted() {
201 public class SimpleSubprocedure
extends Subprocedure
{
202 private final RegionServerServices rss
;
203 private final SimpleSubprocedurePool taskManager
;
205 public SimpleSubprocedure(RegionServerServices rss
, ProcedureMember member
,
206 ForeignExceptionDispatcher errorListener
, SimpleSubprocedurePool taskManager
, String name
) {
207 super(member
, name
, errorListener
, 500, 60000);
208 LOG
.info("Constructing a SimpleSubprocedure.");
210 this.taskManager
= taskManager
;
215 * TODO. We don't need a thread pool to execute roll log. This can be simplified
216 * with no use of subprocedurepool.
218 class RSSimpleTask
implements Callable
<Void
> {
222 public Void
call() throws Exception
{
223 LOG
.info("Execute subprocedure on " + rss
.getServerName().toString());
229 private void execute() throws ForeignException
{
231 monitor
.rethrowException();
233 // running a task (e.g., roll log, flush table) on region server
234 taskManager
.submitTask(new RSSimpleTask());
235 monitor
.rethrowException();
237 // wait for everything to complete.
238 taskManager
.waitForOutstandingTasks();
239 monitor
.rethrowException();
244 public void acquireBarrier() throws ForeignException
{
245 // do nothing, executing in inside barrier step.
252 public byte[] insideBarrier() throws ForeignException
{
254 return Bytes
.toBytes(SimpleMasterProcedureManager
.SIMPLE_DATA
);
258 * Cancel threads if they haven't finished.
261 public void cleanup(Exception e
) {
262 taskManager
.abort("Aborting simple subprocedure tasks due to error", e
);