2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.procedure
;
20 import java
.io
.Closeable
;
21 import java
.io
.IOException
;
22 import java
.util
.ArrayList
;
23 import java
.util
.List
;
24 import java
.util
.concurrent
.Callable
;
25 import java
.util
.concurrent
.ExecutionException
;
26 import java
.util
.concurrent
.ExecutorCompletionService
;
27 import java
.util
.concurrent
.Future
;
28 import java
.util
.concurrent
.LinkedBlockingQueue
;
29 import java
.util
.concurrent
.ThreadPoolExecutor
;
30 import java
.util
.concurrent
.TimeUnit
;
32 import org
.apache
.hadoop
.conf
.Configuration
;
33 import org
.apache
.hadoop
.hbase
.Abortable
;
34 import org
.apache
.hadoop
.hbase
.regionserver
.RegionServerServices
;
35 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
36 import org
.apache
.hadoop
.hbase
.util
.Threads
;
37 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKWatcher
;
38 import org
.apache
.hadoop
.hbase
.errorhandling
.ForeignException
;
39 import org
.apache
.hadoop
.hbase
.errorhandling
.ForeignExceptionDispatcher
;
40 import org
.apache
.zookeeper
.KeeperException
;
41 import org
.slf4j
.Logger
;
42 import org
.slf4j
.LoggerFactory
;
44 public class SimpleRSProcedureManager
extends RegionServerProcedureManager
{
46 private static final Logger LOG
= LoggerFactory
.getLogger(SimpleRSProcedureManager
.class);
48 private RegionServerServices rss
;
49 private ProcedureMemberRpcs memberRpcs
;
50 private ProcedureMember member
;
53 public void initialize(RegionServerServices rss
) throws KeeperException
{
55 ZKWatcher zkw
= rss
.getZooKeeper();
56 this.memberRpcs
= new ZKProcedureMemberRpcs(zkw
, getProcedureSignature());
58 ThreadPoolExecutor pool
=
59 ProcedureMember
.defaultPool(rss
.getServerName().toString(), 1);
60 this.member
= new ProcedureMember(memberRpcs
, pool
, new SimleSubprocedureBuilder());
61 LOG
.info("Initialized: " + rss
.getServerName().toString());
66 this.memberRpcs
.start(rss
.getServerName().toString(), member
);
71 public void stop(boolean force
) throws IOException
{
72 LOG
.info("stop: " + force
);
76 this.memberRpcs
.close();
81 public String
getProcedureSignature() {
82 return SimpleMasterProcedureManager
.SIMPLE_SIGNATURE
;
86 * If in a running state, creates the specified subprocedure for handling a procedure.
87 * @return Subprocedure to submit to the ProcedureMemeber.
89 public Subprocedure
buildSubprocedure(String name
) {
91 // don't run a procedure if the parent is stop(ping)
92 if (rss
.isStopping() || rss
.isStopped()) {
93 throw new IllegalStateException("Can't start procedure on RS: " + rss
.getServerName()
94 + ", because stopping/stopped!");
97 LOG
.info("Attempting to run a procedure.");
98 ForeignExceptionDispatcher errorDispatcher
= new ForeignExceptionDispatcher();
99 Configuration conf
= rss
.getConfiguration();
101 SimpleSubprocedurePool taskManager
=
102 new SimpleSubprocedurePool(rss
.getServerName().toString(), conf
);
103 return new SimpleSubprocedure(rss
, member
, errorDispatcher
, taskManager
, name
);
107 * Build the actual procedure runner that will do all the 'hard' work
109 public class SimleSubprocedureBuilder
implements SubprocedureFactory
{
112 public Subprocedure
buildSubprocedure(String name
, byte[] data
) {
113 LOG
.info("Building procedure: " + name
);
114 return SimpleRSProcedureManager
.this.buildSubprocedure(name
);
118 public class SimpleSubprocedurePool
implements Closeable
, Abortable
{
120 private final ExecutorCompletionService
<Void
> taskPool
;
121 private final ThreadPoolExecutor executor
;
122 private volatile boolean aborted
;
123 private final List
<Future
<Void
>> futures
= new ArrayList
<>();
124 private final String name
;
126 public SimpleSubprocedurePool(String name
, Configuration conf
) {
128 executor
= new ThreadPoolExecutor(1, 1, 500,
129 TimeUnit
.SECONDS
, new LinkedBlockingQueue
<>(),
130 Threads
.newDaemonThreadFactory("rs(" + name
+ ")-procedure"));
131 taskPool
= new ExecutorCompletionService
<>(executor
);
135 * Submit a task to the pool.
137 public void submitTask(final Callable
<Void
> task
) {
138 Future
<Void
> f
= this.taskPool
.submit(task
);
143 * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}
145 * @return <tt>true</tt> on success, <tt>false</tt> otherwise
146 * @throws ForeignException
148 public boolean waitForOutstandingTasks() throws ForeignException
{
149 LOG
.debug("Waiting for procedure to finish.");
152 for (Future
<Void
> f
: futures
) {
156 } catch (InterruptedException e
) {
157 if (aborted
) throw new ForeignException(
158 "Interrupted and found to be aborted while waiting for tasks!", e
);
159 Thread
.currentThread().interrupt();
160 } catch (ExecutionException e
) {
161 if (e
.getCause() instanceof ForeignException
) {
162 throw (ForeignException
) e
.getCause();
164 throw new ForeignException(name
, e
.getCause());
166 // close off remaining tasks
167 for (Future
<Void
> f
: futures
) {
177 * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly
181 public void close() {
186 public void abort(String why
, Throwable e
) {
187 if (this.aborted
) return;
190 LOG
.warn("Aborting because: " + why
, e
);
191 this.executor
.shutdownNow();
195 public boolean isAborted() {
200 public class SimpleSubprocedure
extends Subprocedure
{
201 private final RegionServerServices rss
;
202 private final SimpleSubprocedurePool taskManager
;
204 public SimpleSubprocedure(RegionServerServices rss
, ProcedureMember member
,
205 ForeignExceptionDispatcher errorListener
, SimpleSubprocedurePool taskManager
, String name
) {
206 super(member
, name
, errorListener
, 500, 60000);
207 LOG
.info("Constructing a SimpleSubprocedure.");
209 this.taskManager
= taskManager
;
214 * TODO. We don't need a thread pool to execute roll log. This can be simplified
215 * with no use of subprocedurepool.
217 class RSSimpleTask
implements Callable
<Void
> {
221 public Void
call() throws Exception
{
222 LOG
.info("Execute subprocedure on " + rss
.getServerName().toString());
228 private void execute() throws ForeignException
{
230 monitor
.rethrowException();
232 // running a task (e.g., roll log, flush table) on region server
233 taskManager
.submitTask(new RSSimpleTask());
234 monitor
.rethrowException();
236 // wait for everything to complete.
237 taskManager
.waitForOutstandingTasks();
238 monitor
.rethrowException();
243 public void acquireBarrier() throws ForeignException
{
244 // do nothing, executing in inside barrier step.
251 public byte[] insideBarrier() throws ForeignException
{
253 return Bytes
.toBytes(SimpleMasterProcedureManager
.SIMPLE_DATA
);
257 * Cancel threads if they haven't finished.
260 public void cleanup(Exception e
) {
261 taskManager
.abort("Aborting simple subprocedure tasks due to error", e
);