3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
20 package org
.apache
.hadoop
.hbase
.util
;
22 import java
.io
.IOException
;
23 import java
.io
.InterruptedIOException
;
24 import java
.util
.ArrayList
;
25 import java
.util
.Collection
;
26 import java
.util
.List
;
27 import java
.util
.concurrent
.Callable
;
28 import java
.util
.concurrent
.CompletionService
;
29 import java
.util
.concurrent
.ExecutionException
;
30 import java
.util
.concurrent
.ExecutorCompletionService
;
31 import java
.util
.concurrent
.ThreadFactory
;
32 import java
.util
.concurrent
.ThreadPoolExecutor
;
33 import java
.util
.concurrent
.TimeUnit
;
35 import org
.apache
.hadoop
.conf
.Configuration
;
36 import org
.apache
.hadoop
.fs
.Path
;
37 import org
.apache
.hadoop
.hbase
.HConstants
;
38 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
39 import org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
;
40 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
41 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
;
42 import org
.apache
.yetus
.audience
.InterfaceAudience
;
43 import org
.slf4j
.Logger
;
44 import org
.slf4j
.LoggerFactory
;
47 * Utility methods for interacting with the regions.
49 @InterfaceAudience.Private
50 public abstract class ModifyRegionUtils
{
51 private static final Logger LOG
= LoggerFactory
.getLogger(ModifyRegionUtils
.class);
53 private ModifyRegionUtils() {
56 public interface RegionFillTask
{
57 void fillRegion(final HRegion region
) throws IOException
;
60 public interface RegionEditTask
{
61 void editRegion(final RegionInfo region
) throws IOException
;
64 public static RegionInfo
[] createRegionInfos(TableDescriptor tableDescriptor
,
66 long regionId
= System
.currentTimeMillis();
67 RegionInfo
[] hRegionInfos
= null;
68 if (splitKeys
== null || splitKeys
.length
== 0) {
69 hRegionInfos
= new RegionInfo
[]{
70 RegionInfoBuilder
.newBuilder(tableDescriptor
.getTableName())
74 .setRegionId(regionId
)
78 int numRegions
= splitKeys
.length
+ 1;
79 hRegionInfos
= new RegionInfo
[numRegions
];
80 byte[] startKey
= null;
82 for (int i
= 0; i
< numRegions
; i
++) {
83 endKey
= (i
== splitKeys
.length
) ?
null : splitKeys
[i
];
85 RegionInfoBuilder
.newBuilder(tableDescriptor
.getTableName())
86 .setStartKey(startKey
)
89 .setRegionId(regionId
)
98 * Create new set of regions on the specified file-system.
99 * NOTE: that you should add the regions to hbase:meta after this operation.
101 * @param conf {@link Configuration}
102 * @param rootDir Root directory for HBase instance
103 * @param tableDescriptor description of the table
104 * @param newRegions {@link RegionInfo} that describes the regions to create
105 * @param task {@link RegionFillTask} custom code to populate region after creation
106 * @throws IOException
108 public static List
<RegionInfo
> createRegions(final Configuration conf
, final Path rootDir
,
109 final TableDescriptor tableDescriptor
, final RegionInfo
[] newRegions
,
110 final RegionFillTask task
) throws IOException
{
111 if (newRegions
== null) return null;
112 int regionNumber
= newRegions
.length
;
113 ThreadPoolExecutor exec
= getRegionOpenAndInitThreadPool(conf
,
114 "RegionOpenAndInitThread-" + tableDescriptor
.getTableName(), regionNumber
);
116 return createRegions(exec
, conf
, rootDir
, tableDescriptor
, newRegions
, task
);
123 * Create new set of regions on the specified file-system.
124 * NOTE: that you should add the regions to hbase:meta after this operation.
126 * @param exec Thread Pool Executor
127 * @param conf {@link Configuration}
128 * @param rootDir Root directory for HBase instance
129 * @param tableDescriptor description of the table
130 * @param newRegions {@link RegionInfo} that describes the regions to create
131 * @param task {@link RegionFillTask} custom code to populate region after creation
132 * @throws IOException
134 public static List
<RegionInfo
> createRegions(final ThreadPoolExecutor exec
,
135 final Configuration conf
, final Path rootDir
,
136 final TableDescriptor tableDescriptor
, final RegionInfo
[] newRegions
,
137 final RegionFillTask task
) throws IOException
{
138 if (newRegions
== null) return null;
139 int regionNumber
= newRegions
.length
;
140 CompletionService
<RegionInfo
> completionService
= new ExecutorCompletionService
<>(exec
);
141 List
<RegionInfo
> regionInfos
= new ArrayList
<>();
142 for (final RegionInfo newRegion
: newRegions
) {
143 completionService
.submit(new Callable
<RegionInfo
>() {
145 public RegionInfo
call() throws IOException
{
146 return createRegion(conf
, rootDir
, tableDescriptor
, newRegion
, task
);
151 // wait for all regions to finish creation
152 for (int i
= 0; i
< regionNumber
; i
++) {
153 regionInfos
.add(completionService
.take().get());
155 } catch (InterruptedException e
) {
156 LOG
.error("Caught " + e
+ " during region creation");
157 throw new InterruptedIOException(e
.getMessage());
158 } catch (ExecutionException e
) {
159 throw new IOException(e
);
165 * Create new set of regions on the specified file-system.
166 * @param conf {@link Configuration}
167 * @param rootDir Root directory for HBase instance
168 * @param tableDescriptor description of the table
169 * @param newRegion {@link RegionInfo} that describes the region to create
170 * @param task {@link RegionFillTask} custom code to populate region after creation
171 * @throws IOException
173 public static RegionInfo
createRegion(final Configuration conf
, final Path rootDir
,
174 final TableDescriptor tableDescriptor
, final RegionInfo newRegion
,
175 final RegionFillTask task
) throws IOException
{
177 // The WAL subsystem will use the default rootDir rather than the passed in rootDir
178 // unless I pass along via the conf.
179 Configuration confForWAL
= new Configuration(conf
);
180 confForWAL
.set(HConstants
.HBASE_DIR
, rootDir
.toString());
181 HRegion region
= HRegion
.createHRegion(newRegion
, rootDir
, conf
, tableDescriptor
, null, false);
183 // 2. Custom user code to interact with the created region
185 task
.fillRegion(region
);
188 // 3. Close the new region to flush to disk. Close log file too.
191 return region
.getRegionInfo();
195 * Execute the task on the specified set of regions.
197 * @param exec Thread Pool Executor
198 * @param regions {@link RegionInfo} that describes the regions to edit
199 * @param task {@link RegionFillTask} custom code to edit the region
200 * @throws IOException
202 public static void editRegions(final ThreadPoolExecutor exec
,
203 final Collection
<RegionInfo
> regions
, final RegionEditTask task
) throws IOException
{
204 final ExecutorCompletionService
<Void
> completionService
= new ExecutorCompletionService
<>(exec
);
205 for (final RegionInfo hri
: regions
) {
206 completionService
.submit(new Callable
<Void
>() {
208 public Void
call() throws IOException
{
209 task
.editRegion(hri
);
216 for (RegionInfo hri
: regions
) {
217 completionService
.take().get();
219 } catch (InterruptedException e
) {
220 throw new InterruptedIOException(e
.getMessage());
221 } catch (ExecutionException e
) {
222 IOException ex
= new IOException();
223 ex
.initCause(e
.getCause());
229 * used by createRegions() to get the thread pool executor based on the
230 * "hbase.hregion.open.and.init.threads.max" property.
232 static ThreadPoolExecutor
getRegionOpenAndInitThreadPool(final Configuration conf
,
233 final String threadNamePrefix
, int regionNumber
) {
234 int maxThreads
= Math
.min(regionNumber
, conf
.getInt(
235 "hbase.hregion.open.and.init.threads.max", 16));
236 ThreadPoolExecutor regionOpenAndInitThreadPool
= Threads
237 .getBoundedCachedThreadPool(maxThreads
, 30L, TimeUnit
.SECONDS
,
238 new ThreadFactory() {
239 private int count
= 1;
242 public Thread
newThread(Runnable r
) {
243 return new Thread(r
, threadNamePrefix
+ "-" + count
++);
246 return regionOpenAndInitThreadPool
;