2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.master
;
20 import java
.io
.Closeable
;
21 import java
.io
.IOException
;
22 import java
.text
.DecimalFormat
;
23 import java
.util
.ArrayList
;
24 import java
.util
.EnumSet
;
25 import java
.util
.HashMap
;
26 import java
.util
.HashSet
;
27 import java
.util
.List
;
29 import java
.util
.Random
;
30 import java
.util
.Scanner
;
32 import java
.util
.TreeMap
;
33 import org
.apache
.commons
.lang3
.StringUtils
;
34 import org
.apache
.hadoop
.conf
.Configuration
;
35 import org
.apache
.hadoop
.fs
.FileSystem
;
36 import org
.apache
.hadoop
.hbase
.ClusterMetrics
.Option
;
37 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
38 import org
.apache
.hadoop
.hbase
.HConstants
;
39 import org
.apache
.hadoop
.hbase
.ServerName
;
40 import org
.apache
.hadoop
.hbase
.TableName
;
41 import org
.apache
.hadoop
.hbase
.client
.AsyncClusterConnection
;
42 import org
.apache
.hadoop
.hbase
.client
.AsyncRegionServerAdmin
;
43 import org
.apache
.hadoop
.hbase
.client
.ClusterConnectionFactory
;
44 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
45 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
46 import org
.apache
.hadoop
.hbase
.favored
.FavoredNodeAssignmentHelper
;
47 import org
.apache
.hadoop
.hbase
.favored
.FavoredNodesPlan
;
48 import org
.apache
.hadoop
.hbase
.security
.User
;
49 import org
.apache
.hadoop
.hbase
.util
.FSUtils
;
50 import org
.apache
.hadoop
.hbase
.util
.FutureUtils
;
51 import org
.apache
.hadoop
.hbase
.util
.MunkresAssignment
;
52 import org
.apache
.hadoop
.hbase
.util
.Pair
;
53 import org
.apache
.yetus
.audience
.InterfaceAudience
;
54 import org
.slf4j
.Logger
;
55 import org
.slf4j
.LoggerFactory
;
57 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.io
.Closeables
;
58 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.CommandLine
;
59 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.GnuParser
;
60 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.HelpFormatter
;
61 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.Options
;
62 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.ParseException
;
64 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.RequestConverter
;
65 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.AdminProtos
.UpdateFavoredNodesRequest
;
66 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.AdminProtos
.UpdateFavoredNodesResponse
;
69 * A tool that is used for manipulating and viewing favored nodes information
70 * for regions. Run with -h to get a list of the options
72 @InterfaceAudience.Private
73 // TODO: Remove? Unused. Partially implemented only.
74 public class RegionPlacementMaintainer
implements Closeable
{
75 private static final Logger LOG
= LoggerFactory
.getLogger(RegionPlacementMaintainer
.class
77 //The cost of a placement that should never be assigned.
78 private static final float MAX_COST
= Float
.POSITIVE_INFINITY
;
80 // The cost of a placement that is undesirable but acceptable.
81 private static final float AVOID_COST
= 100000f
;
83 // The amount by which the cost of a placement is increased if it is the
84 // last slot of the server. This is done to more evenly distribute the slop
86 private static final float LAST_SLOT_COST_PENALTY
= 0.5f
;
88 // The amount by which the cost of a primary placement is penalized if it is
89 // not the host currently serving the region. This is done to minimize moves.
90 private static final float NOT_CURRENT_HOST_PENALTY
= 0.1f
;
92 private static boolean USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY
= false;
94 private Configuration conf
;
95 private final boolean enforceLocality
;
96 private final boolean enforceMinAssignmentMove
;
97 private RackManager rackManager
;
98 private Set
<TableName
> targetTableSet
;
99 private AsyncClusterConnection connection
;
101 public RegionPlacementMaintainer(Configuration conf
) throws IOException
{
102 this(conf
, true, true);
105 public RegionPlacementMaintainer(Configuration conf
, boolean enforceLocality
,
106 boolean enforceMinAssignmentMove
) {
108 this.enforceLocality
= enforceLocality
;
109 this.enforceMinAssignmentMove
= enforceMinAssignmentMove
;
110 this.targetTableSet
= new HashSet
<>();
111 this.rackManager
= new RackManager(conf
);
114 private static void printHelp(Options opt
) {
115 new HelpFormatter().printHelp(
116 "RegionPlacement < -w | -u | -n | -v | -t | -h | -overwrite -r regionName -f favoredNodes " +
118 " [-l false] [-m false] [-d] [-tables t1,t2,...tn] [-zk zk1,zk2,zk3]" +
119 " [-fs hdfs://a.b.c.d:9000] [-hbase_root /HBASE]", opt
);
122 private AsyncClusterConnection
getConnection() throws IOException
{
123 if (connection
== null) {
125 ClusterConnectionFactory
.createAsyncClusterConnection(this.conf
, null, User
.getCurrent());
130 public void setTargetTableName(String
[] tableNames
) {
131 if (tableNames
!= null) {
132 for (String table
: tableNames
)
133 this.targetTableSet
.add(TableName
.valueOf(table
));
138 * @return the new RegionAssignmentSnapshot
140 public SnapshotOfRegionAssignmentFromMeta
getRegionAssignmentSnapshot() throws IOException
{
141 SnapshotOfRegionAssignmentFromMeta currentAssignmentShapshot
=
142 new SnapshotOfRegionAssignmentFromMeta(ConnectionFactory
.createConnection(conf
));
143 currentAssignmentShapshot
.initialize();
144 return currentAssignmentShapshot
;
148 * Verify the region placement is consistent with the assignment plan
150 public List
<AssignmentVerificationReport
> verifyRegionPlacement(boolean isDetailMode
)
152 System
.out
.println("Start to verify the region assignment and " +
153 "generate the verification report");
154 // Get the region assignment snapshot
155 SnapshotOfRegionAssignmentFromMeta snapshot
= this.getRegionAssignmentSnapshot();
157 // Get all the tables
158 Set
<TableName
> tables
= snapshot
.getTableSet();
160 // Get the region locality map
161 Map
<String
, Map
<String
, Float
>> regionLocalityMap
= null;
162 if (this.enforceLocality
== true) {
163 regionLocalityMap
= FSUtils
.getRegionDegreeLocalityMappingFromFS(conf
);
165 List
<AssignmentVerificationReport
> reports
= new ArrayList
<>();
166 // Iterate all the tables to fill up the verification report
167 for (TableName table
: tables
) {
168 if (!this.targetTableSet
.isEmpty() &&
169 !this.targetTableSet
.contains(table
)) {
172 AssignmentVerificationReport report
= new AssignmentVerificationReport();
173 report
.fillUp(table
, snapshot
, regionLocalityMap
);
174 report
.print(isDetailMode
);
181 * Generate the assignment plan for the existing table
184 * @param assignmentSnapshot
185 * @param regionLocalityMap
187 * @param munkresForSecondaryAndTertiary if set on true the assignment plan
188 * for the tertiary and secondary will be generated with Munkres algorithm,
189 * otherwise will be generated using placeSecondaryAndTertiaryRS
190 * @throws IOException
192 private void genAssignmentPlan(TableName tableName
,
193 SnapshotOfRegionAssignmentFromMeta assignmentSnapshot
,
194 Map
<String
, Map
<String
, Float
>> regionLocalityMap
, FavoredNodesPlan plan
,
195 boolean munkresForSecondaryAndTertiary
) throws IOException
{
196 // Get the all the regions for the current table
197 List
<RegionInfo
> regions
=
198 assignmentSnapshot
.getTableToRegionMap().get(tableName
);
199 int numRegions
= regions
.size();
201 // Get the current assignment map
202 Map
<RegionInfo
, ServerName
> currentAssignmentMap
=
203 assignmentSnapshot
.getRegionToRegionServerMap();
205 // Get the all the region servers
206 List
<ServerName
> servers
= new ArrayList
<>();
208 FutureUtils
.get(getConnection().getAdmin().getClusterMetrics(EnumSet
.of(Option
.LIVE_SERVERS
)))
209 .getLiveServerMetrics().keySet());
211 LOG
.info("Start to generate assignment plan for " + numRegions
+
212 " regions from table " + tableName
+ " with " +
213 servers
.size() + " region servers");
215 int slotsPerServer
= (int) Math
.ceil((float) numRegions
/
217 int regionSlots
= slotsPerServer
* servers
.size();
219 // Compute the primary, secondary and tertiary costs for each region/server
220 // pair. These costs are based only on node locality and rack locality, and
221 // will be modified later.
222 float[][] primaryCost
= new float[numRegions
][regionSlots
];
223 float[][] secondaryCost
= new float[numRegions
][regionSlots
];
224 float[][] tertiaryCost
= new float[numRegions
][regionSlots
];
226 if (this.enforceLocality
&& regionLocalityMap
!= null) {
227 // Transform the locality mapping into a 2D array, assuming that any
228 // unspecified locality value is 0.
229 float[][] localityPerServer
= new float[numRegions
][regionSlots
];
230 for (int i
= 0; i
< numRegions
; i
++) {
231 Map
<String
, Float
> serverLocalityMap
=
232 regionLocalityMap
.get(regions
.get(i
).getEncodedName());
233 if (serverLocalityMap
== null) {
236 for (int j
= 0; j
< servers
.size(); j
++) {
237 String serverName
= servers
.get(j
).getHostname();
238 if (serverName
== null) {
241 Float locality
= serverLocalityMap
.get(serverName
);
242 if (locality
== null) {
245 for (int k
= 0; k
< slotsPerServer
; k
++) {
246 // If we can't find the locality of a region to a server, which occurs
247 // because locality is only reported for servers which have some
248 // blocks of a region local, then the locality for that pair is 0.
249 localityPerServer
[i
][j
* slotsPerServer
+ k
] = locality
.floatValue();
254 // Compute the total rack locality for each region in each rack. The total
255 // rack locality is the sum of the localities of a region on all servers in
257 Map
<String
, Map
<RegionInfo
, Float
>> rackRegionLocality
= new HashMap
<>();
258 for (int i
= 0; i
< numRegions
; i
++) {
259 RegionInfo region
= regions
.get(i
);
260 for (int j
= 0; j
< regionSlots
; j
+= slotsPerServer
) {
261 String rack
= rackManager
.getRack(servers
.get(j
/ slotsPerServer
));
262 Map
<RegionInfo
, Float
> rackLocality
= rackRegionLocality
.get(rack
);
263 if (rackLocality
== null) {
264 rackLocality
= new HashMap
<>();
265 rackRegionLocality
.put(rack
, rackLocality
);
267 Float localityObj
= rackLocality
.get(region
);
268 float locality
= localityObj
== null ?
0 : localityObj
.floatValue();
269 locality
+= localityPerServer
[i
][j
];
270 rackLocality
.put(region
, locality
);
273 for (int i
= 0; i
< numRegions
; i
++) {
274 for (int j
= 0; j
< regionSlots
; j
++) {
275 String rack
= rackManager
.getRack(servers
.get(j
/ slotsPerServer
));
276 Float totalRackLocalityObj
=
277 rackRegionLocality
.get(rack
).get(regions
.get(i
));
278 float totalRackLocality
= totalRackLocalityObj
== null ?
279 0 : totalRackLocalityObj
.floatValue();
281 // Primary cost aims to favor servers with high node locality and low
282 // rack locality, so that secondaries and tertiaries can be chosen for
283 // nodes with high rack locality. This might give primaries with
284 // slightly less locality at first compared to a cost which only
285 // considers the node locality, but should be better in the long run.
286 primaryCost
[i
][j
] = 1 - (2 * localityPerServer
[i
][j
] -
289 // Secondary cost aims to favor servers with high node locality and high
290 // rack locality since the tertiary will be chosen from the same rack as
291 // the secondary. This could be negative, but that is okay.
292 secondaryCost
[i
][j
] = 2 - (localityPerServer
[i
][j
] + totalRackLocality
);
294 // Tertiary cost is only concerned with the node locality. It will later
295 // be restricted to only hosts on the same rack as the secondary.
296 tertiaryCost
[i
][j
] = 1 - localityPerServer
[i
][j
];
301 if (this.enforceMinAssignmentMove
&& currentAssignmentMap
!= null) {
302 // We want to minimize the number of regions which move as the result of a
303 // new assignment. Therefore, slightly penalize any placement which is for
304 // a host that is not currently serving the region.
305 for (int i
= 0; i
< numRegions
; i
++) {
306 for (int j
= 0; j
< servers
.size(); j
++) {
307 ServerName currentAddress
= currentAssignmentMap
.get(regions
.get(i
));
308 if (currentAddress
!= null &&
309 !currentAddress
.equals(servers
.get(j
))) {
310 for (int k
= 0; k
< slotsPerServer
; k
++) {
311 primaryCost
[i
][j
* slotsPerServer
+ k
] += NOT_CURRENT_HOST_PENALTY
;
318 // Artificially increase cost of last slot of each server to evenly
319 // distribute the slop, otherwise there will be a few servers with too few
320 // regions and many servers with the max number of regions.
321 for (int i
= 0; i
< numRegions
; i
++) {
322 for (int j
= 0; j
< regionSlots
; j
+= slotsPerServer
) {
323 primaryCost
[i
][j
] += LAST_SLOT_COST_PENALTY
;
324 secondaryCost
[i
][j
] += LAST_SLOT_COST_PENALTY
;
325 tertiaryCost
[i
][j
] += LAST_SLOT_COST_PENALTY
;
329 RandomizedMatrix randomizedMatrix
= new RandomizedMatrix(numRegions
,
331 primaryCost
= randomizedMatrix
.transform(primaryCost
);
332 int[] primaryAssignment
= new MunkresAssignment(primaryCost
).solve();
333 primaryAssignment
= randomizedMatrix
.invertIndices(primaryAssignment
);
335 // Modify the secondary and tertiary costs for each region/server pair to
336 // prevent a region from being assigned to the same rack for both primary
337 // and either one of secondary or tertiary.
338 for (int i
= 0; i
< numRegions
; i
++) {
339 int slot
= primaryAssignment
[i
];
340 String rack
= rackManager
.getRack(servers
.get(slot
/ slotsPerServer
));
341 for (int k
= 0; k
< servers
.size(); k
++) {
342 if (!rackManager
.getRack(servers
.get(k
)).equals(rack
)) {
345 if (k
== slot
/ slotsPerServer
) {
346 // Same node, do not place secondary or tertiary here ever.
347 for (int m
= 0; m
< slotsPerServer
; m
++) {
348 secondaryCost
[i
][k
* slotsPerServer
+ m
] = MAX_COST
;
349 tertiaryCost
[i
][k
* slotsPerServer
+ m
] = MAX_COST
;
352 // Same rack, do not place secondary or tertiary here if possible.
353 for (int m
= 0; m
< slotsPerServer
; m
++) {
354 secondaryCost
[i
][k
* slotsPerServer
+ m
] = AVOID_COST
;
355 tertiaryCost
[i
][k
* slotsPerServer
+ m
] = AVOID_COST
;
360 if (munkresForSecondaryAndTertiary
) {
361 randomizedMatrix
= new RandomizedMatrix(numRegions
, regionSlots
);
362 secondaryCost
= randomizedMatrix
.transform(secondaryCost
);
363 int[] secondaryAssignment
= new MunkresAssignment(secondaryCost
).solve();
364 secondaryAssignment
= randomizedMatrix
.invertIndices(secondaryAssignment
);
366 // Modify the tertiary costs for each region/server pair to ensure that a
367 // region is assigned to a tertiary server on the same rack as its secondary
368 // server, but not the same server in that rack.
369 for (int i
= 0; i
< numRegions
; i
++) {
370 int slot
= secondaryAssignment
[i
];
371 String rack
= rackManager
.getRack(servers
.get(slot
/ slotsPerServer
));
372 for (int k
= 0; k
< servers
.size(); k
++) {
373 if (k
== slot
/ slotsPerServer
) {
374 // Same node, do not place tertiary here ever.
375 for (int m
= 0; m
< slotsPerServer
; m
++) {
376 tertiaryCost
[i
][k
* slotsPerServer
+ m
] = MAX_COST
;
379 if (rackManager
.getRack(servers
.get(k
)).equals(rack
)) {
382 // Different rack, do not place tertiary here if possible.
383 for (int m
= 0; m
< slotsPerServer
; m
++) {
384 tertiaryCost
[i
][k
* slotsPerServer
+ m
] = AVOID_COST
;
390 randomizedMatrix
= new RandomizedMatrix(numRegions
, regionSlots
);
391 tertiaryCost
= randomizedMatrix
.transform(tertiaryCost
);
392 int[] tertiaryAssignment
= new MunkresAssignment(tertiaryCost
).solve();
393 tertiaryAssignment
= randomizedMatrix
.invertIndices(tertiaryAssignment
);
395 for (int i
= 0; i
< numRegions
; i
++) {
396 List
<ServerName
> favoredServers
397 = new ArrayList
<>(FavoredNodeAssignmentHelper
.FAVORED_NODES_NUM
);
398 ServerName s
= servers
.get(primaryAssignment
[i
] / slotsPerServer
);
399 favoredServers
.add(ServerName
.valueOf(s
.getHostname(), s
.getPort(),
400 ServerName
.NON_STARTCODE
));
402 s
= servers
.get(secondaryAssignment
[i
] / slotsPerServer
);
403 favoredServers
.add(ServerName
.valueOf(s
.getHostname(), s
.getPort(),
404 ServerName
.NON_STARTCODE
));
406 s
= servers
.get(tertiaryAssignment
[i
] / slotsPerServer
);
407 favoredServers
.add(ServerName
.valueOf(s
.getHostname(), s
.getPort(),
408 ServerName
.NON_STARTCODE
));
409 // Update the assignment plan
410 plan
.updateFavoredNodesMap(regions
.get(i
), favoredServers
);
412 LOG
.info("Generated the assignment plan for " + numRegions
+
413 " regions from table " + tableName
+ " with " +
414 servers
.size() + " region servers");
415 LOG
.info("Assignment plan for secondary and tertiary generated " +
416 "using MunkresAssignment");
418 Map
<RegionInfo
, ServerName
> primaryRSMap
= new HashMap
<>();
419 for (int i
= 0; i
< numRegions
; i
++) {
420 primaryRSMap
.put(regions
.get(i
), servers
.get(primaryAssignment
[i
] / slotsPerServer
));
422 FavoredNodeAssignmentHelper favoredNodeHelper
=
423 new FavoredNodeAssignmentHelper(servers
, conf
);
424 favoredNodeHelper
.initialize();
425 Map
<RegionInfo
, ServerName
[]> secondaryAndTertiaryMap
=
426 favoredNodeHelper
.placeSecondaryAndTertiaryWithRestrictions(primaryRSMap
);
427 for (int i
= 0; i
< numRegions
; i
++) {
428 List
<ServerName
> favoredServers
429 = new ArrayList
<>(FavoredNodeAssignmentHelper
.FAVORED_NODES_NUM
);
430 RegionInfo currentRegion
= regions
.get(i
);
431 ServerName s
= primaryRSMap
.get(currentRegion
);
432 favoredServers
.add(ServerName
.valueOf(s
.getHostname(), s
.getPort(),
433 ServerName
.NON_STARTCODE
));
435 ServerName
[] secondaryAndTertiary
=
436 secondaryAndTertiaryMap
.get(currentRegion
);
437 s
= secondaryAndTertiary
[0];
438 favoredServers
.add(ServerName
.valueOf(s
.getHostname(), s
.getPort(),
439 ServerName
.NON_STARTCODE
));
441 s
= secondaryAndTertiary
[1];
442 favoredServers
.add(ServerName
.valueOf(s
.getHostname(), s
.getPort(),
443 ServerName
.NON_STARTCODE
));
444 // Update the assignment plan
445 plan
.updateFavoredNodesMap(regions
.get(i
), favoredServers
);
447 LOG
.info("Generated the assignment plan for " + numRegions
+
448 " regions from table " + tableName
+ " with " +
449 servers
.size() + " region servers");
450 LOG
.info("Assignment plan for secondary and tertiary generated " +
451 "using placeSecondaryAndTertiaryWithRestrictions method");
455 public FavoredNodesPlan
getNewAssignmentPlan() throws IOException
{
456 // Get the current region assignment snapshot by scanning from the META
457 SnapshotOfRegionAssignmentFromMeta assignmentSnapshot
=
458 this.getRegionAssignmentSnapshot();
460 // Get the region locality map
461 Map
<String
, Map
<String
, Float
>> regionLocalityMap
= null;
462 if (this.enforceLocality
) {
463 regionLocalityMap
= FSUtils
.getRegionDegreeLocalityMappingFromFS(conf
);
465 // Initialize the assignment plan
466 FavoredNodesPlan plan
= new FavoredNodesPlan();
468 // Get the table to region mapping
469 Map
<TableName
, List
<RegionInfo
>> tableToRegionMap
=
470 assignmentSnapshot
.getTableToRegionMap();
471 LOG
.info("Start to generate the new assignment plan for the " +
472 + tableToRegionMap
.keySet().size() + " tables" );
473 for (TableName table
: tableToRegionMap
.keySet()) {
475 if (!this.targetTableSet
.isEmpty() &&
476 !this.targetTableSet
.contains(table
)) {
479 // TODO: maybe run the placement in parallel for each table
480 genAssignmentPlan(table
, assignmentSnapshot
, regionLocalityMap
, plan
,
481 USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY
);
482 } catch (Exception e
) {
483 LOG
.error("Get some exceptions for placing primary region server" +
484 "for table " + table
+ " because " + e
);
487 LOG
.info("Finish to generate the new assignment plan for the " +
488 + tableToRegionMap
.keySet().size() + " tables" );
493 public void close() throws IOException
{
494 Closeables
.close(connection
, true);
498 * Some algorithms for solving the assignment problem may traverse workers or
499 * jobs in linear order which may result in skewing the assignments of the
500 * first jobs in the matrix toward the last workers in the matrix if the
501 * costs are uniform. To avoid this kind of clumping, we can randomize the
502 * rows and columns of the cost matrix in a reversible way, such that the
503 * solution to the assignment problem can be interpreted in terms of the
504 * original untransformed cost matrix. Rows and columns are transformed
505 * independently such that the elements contained in any row of the input
506 * matrix are the same as the elements in the corresponding output matrix,
507 * and each row has its elements transformed in the same way. Similarly for
510 protected static class RandomizedMatrix
{
511 private final int rows
;
512 private final int cols
;
513 private final int[] rowTransform
;
514 private final int[] rowInverse
;
515 private final int[] colTransform
;
516 private final int[] colInverse
;
519 * Create a randomization scheme for a matrix of a given size.
520 * @param rows the number of rows in the matrix
521 * @param cols the number of columns in the matrix
523 public RandomizedMatrix(int rows
, int cols
) {
526 Random random
= new Random();
527 rowTransform
= new int[rows
];
528 rowInverse
= new int[rows
];
529 for (int i
= 0; i
< rows
; i
++) {
532 // Shuffle the row indices.
533 for (int i
= rows
- 1; i
>= 0; i
--) {
534 int r
= random
.nextInt(i
+ 1);
535 int temp
= rowTransform
[r
];
536 rowTransform
[r
] = rowTransform
[i
];
537 rowTransform
[i
] = temp
;
539 // Generate the inverse row indices.
540 for (int i
= 0; i
< rows
; i
++) {
541 rowInverse
[rowTransform
[i
]] = i
;
544 colTransform
= new int[cols
];
545 colInverse
= new int[cols
];
546 for (int i
= 0; i
< cols
; i
++) {
549 // Shuffle the column indices.
550 for (int i
= cols
- 1; i
>= 0; i
--) {
551 int r
= random
.nextInt(i
+ 1);
552 int temp
= colTransform
[r
];
553 colTransform
[r
] = colTransform
[i
];
554 colTransform
[i
] = temp
;
556 // Generate the inverse column indices.
557 for (int i
= 0; i
< cols
; i
++) {
558 colInverse
[colTransform
[i
]] = i
;
563 * Copy a given matrix into a new matrix, transforming each row index and
564 * each column index according to the randomization scheme that was created
565 * at construction time.
566 * @param matrix the cost matrix to transform
567 * @return a new matrix with row and column indices transformed
569 public float[][] transform(float[][] matrix
) {
570 float[][] result
= new float[rows
][cols
];
571 for (int i
= 0; i
< rows
; i
++) {
572 for (int j
= 0; j
< cols
; j
++) {
573 result
[rowTransform
[i
]][colTransform
[j
]] = matrix
[i
][j
];
580 * Copy a given matrix into a new matrix, transforming each row index and
581 * each column index according to the inverse of the randomization scheme
582 * that was created at construction time.
583 * @param matrix the cost matrix to be inverted
584 * @return a new matrix with row and column indices inverted
586 public float[][] invert(float[][] matrix
) {
587 float[][] result
= new float[rows
][cols
];
588 for (int i
= 0; i
< rows
; i
++) {
589 for (int j
= 0; j
< cols
; j
++) {
590 result
[rowInverse
[i
]][colInverse
[j
]] = matrix
[i
][j
];
597 * Given an array where each element {@code indices[i]} represents the
598 * randomized column index corresponding to randomized row index {@code i},
599 * create a new array with the corresponding inverted indices.
600 * @param indices an array of transformed indices to be inverted
601 * @return an array of inverted indices
603 public int[] invertIndices(int[] indices
) {
604 int[] result
= new int[indices
.length
];
605 for (int i
= 0; i
< indices
.length
; i
++) {
606 result
[rowInverse
[i
]] = colInverse
[indices
[i
]];
613 * Print the assignment plan to the system output stream
616 public static void printAssignmentPlan(FavoredNodesPlan plan
) {
617 if (plan
== null) return;
618 LOG
.info("========== Start to print the assignment plan ================");
619 // sort the map based on region info
620 Map
<String
, List
<ServerName
>> assignmentMap
= new TreeMap
<>(plan
.getAssignmentMap());
622 for (Map
.Entry
<String
, List
<ServerName
>> entry
: assignmentMap
.entrySet()) {
624 String serverList
= FavoredNodeAssignmentHelper
.getFavoredNodesAsString(entry
.getValue());
625 String regionName
= entry
.getKey();
626 LOG
.info("Region: " + regionName
);
627 LOG
.info("Its favored nodes: " + serverList
);
629 LOG
.info("========== Finish to print the assignment plan ================");
633 * Update the assignment plan into hbase:meta
634 * @param plan the assignments plan to be updated into hbase:meta
635 * @throws IOException if cannot update assignment plan in hbase:meta
637 public void updateAssignmentPlanToMeta(FavoredNodesPlan plan
)
640 LOG
.info("Start to update the hbase:meta with the new assignment plan");
641 Map
<String
, List
<ServerName
>> assignmentMap
= plan
.getAssignmentMap();
642 Map
<RegionInfo
, List
<ServerName
>> planToUpdate
= new HashMap
<>(assignmentMap
.size());
643 Map
<String
, RegionInfo
> regionToRegionInfoMap
=
644 getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap();
645 for (Map
.Entry
<String
, List
<ServerName
>> entry
: assignmentMap
.entrySet()) {
646 planToUpdate
.put(regionToRegionInfoMap
.get(entry
.getKey()), entry
.getValue());
649 FavoredNodeAssignmentHelper
.updateMetaWithFavoredNodesInfo(planToUpdate
, conf
);
650 LOG
.info("Updated the hbase:meta with the new assignment plan");
651 } catch (Exception e
) {
652 LOG
.error("Failed to update hbase:meta with the new assignment" +
653 "plan because " + e
.getMessage());
658 * Update the assignment plan to all the region servers
660 * @throws IOException
662 private void updateAssignmentPlanToRegionServers(FavoredNodesPlan plan
)
664 LOG
.info("Start to update the region servers with the new assignment plan");
665 // Get the region to region server map
666 Map
<ServerName
, List
<RegionInfo
>> currentAssignment
=
667 this.getRegionAssignmentSnapshot().getRegionServerToRegionMap();
669 // track of the failed and succeeded updates
670 int succeededNum
= 0;
671 Map
<ServerName
, Exception
> failedUpdateMap
= new HashMap
<>();
673 for (Map
.Entry
<ServerName
, List
<RegionInfo
>> entry
:
674 currentAssignment
.entrySet()) {
675 List
<Pair
<RegionInfo
, List
<ServerName
>>> regionUpdateInfos
= new ArrayList
<>();
677 // Keep track of the favored updates for the current region server
678 FavoredNodesPlan singleServerPlan
= null;
679 // Find out all the updates for the current region server
680 for (RegionInfo region
: entry
.getValue()) {
681 List
<ServerName
> favoredServerList
= plan
.getFavoredNodes(region
);
682 if (favoredServerList
!= null &&
683 favoredServerList
.size() == FavoredNodeAssignmentHelper
.FAVORED_NODES_NUM
) {
684 // Create the single server plan if necessary
685 if (singleServerPlan
== null) {
686 singleServerPlan
= new FavoredNodesPlan();
688 // Update the single server update
689 singleServerPlan
.updateFavoredNodesMap(region
, favoredServerList
);
690 regionUpdateInfos
.add(new Pair
<>(region
, favoredServerList
));
693 if (singleServerPlan
!= null) {
694 // Update the current region server with its updated favored nodes
695 AsyncRegionServerAdmin rsAdmin
= getConnection().getRegionServerAdmin(entry
.getKey());
696 UpdateFavoredNodesRequest request
=
697 RequestConverter
.buildUpdateFavoredNodesRequest(regionUpdateInfos
);
698 UpdateFavoredNodesResponse updateFavoredNodesResponse
=
699 FutureUtils
.get(rsAdmin
.updateFavoredNodes(request
));
700 LOG
.info("Region server " +
701 FutureUtils
.get(rsAdmin
.getServerInfo(RequestConverter
.buildGetServerInfoRequest()))
703 " has updated " + updateFavoredNodesResponse
.getResponse() + " / " +
704 singleServerPlan
.getAssignmentMap().size() + " regions with the assignment plan");
707 } catch (Exception e
) {
708 failedUpdateMap
.put(entry
.getKey(), e
);
711 // log the succeeded updates
712 LOG
.info("Updated " + succeededNum
+ " region servers with " +
713 "the new assignment plan");
715 // log the failed updates
716 int failedNum
= failedUpdateMap
.size();
717 if (failedNum
!= 0) {
718 LOG
.error("Failed to update the following + " + failedNum
+
719 " region servers with its corresponding favored nodes");
720 for (Map
.Entry
<ServerName
, Exception
> entry
:
721 failedUpdateMap
.entrySet() ) {
722 LOG
.error("Failed to update " + entry
.getKey().getAddress() +
723 " because of " + entry
.getValue().getMessage());
728 public void updateAssignmentPlan(FavoredNodesPlan plan
)
730 LOG
.info("Start to update the new assignment plan for the hbase:meta table and" +
731 " the region servers");
732 // Update the new assignment plan to META
733 updateAssignmentPlanToMeta(plan
);
734 // Update the new assignment plan to Region Servers
735 updateAssignmentPlanToRegionServers(plan
);
736 LOG
.info("Finish to update the new assignment plan for the hbase:meta table and" +
737 " the region servers");
741 * Return how many regions will move per table since their primary RS will
744 * @param newPlan - new AssignmentPlan
745 * @return how many primaries will move per table
747 public Map
<TableName
, Integer
> getRegionsMovement(FavoredNodesPlan newPlan
)
749 Map
<TableName
, Integer
> movesPerTable
= new HashMap
<>();
750 SnapshotOfRegionAssignmentFromMeta snapshot
= this.getRegionAssignmentSnapshot();
751 Map
<TableName
, List
<RegionInfo
>> tableToRegions
= snapshot
752 .getTableToRegionMap();
753 FavoredNodesPlan oldPlan
= snapshot
.getExistingAssignmentPlan();
754 Set
<TableName
> tables
= snapshot
.getTableSet();
755 for (TableName table
: tables
) {
756 int movedPrimaries
= 0;
757 if (!this.targetTableSet
.isEmpty()
758 && !this.targetTableSet
.contains(table
)) {
761 List
<RegionInfo
> regions
= tableToRegions
.get(table
);
762 for (RegionInfo region
: regions
) {
763 List
<ServerName
> oldServers
= oldPlan
.getFavoredNodes(region
);
764 List
<ServerName
> newServers
= newPlan
.getFavoredNodes(region
);
765 if (oldServers
!= null && newServers
!= null) {
766 ServerName oldPrimary
= oldServers
.get(0);
767 ServerName newPrimary
= newServers
.get(0);
768 if (oldPrimary
.compareTo(newPrimary
) != 0) {
773 movesPerTable
.put(table
, movedPrimaries
);
775 return movesPerTable
;
779 * Compares two plans and check whether the locality dropped or increased
780 * (prints the information as a string) also prints the baseline locality
782 * @param movesPerTable - how many primary regions will move per table
783 * @param regionLocalityMap - locality map from FS
784 * @param newPlan - new assignment plan
785 * @throws IOException
787 public void checkDifferencesWithOldPlan(Map
<TableName
, Integer
> movesPerTable
,
788 Map
<String
, Map
<String
, Float
>> regionLocalityMap
, FavoredNodesPlan newPlan
)
790 // localities for primary, secondary and tertiary
791 SnapshotOfRegionAssignmentFromMeta snapshot
= this.getRegionAssignmentSnapshot();
792 FavoredNodesPlan oldPlan
= snapshot
.getExistingAssignmentPlan();
793 Set
<TableName
> tables
= snapshot
.getTableSet();
794 Map
<TableName
, List
<RegionInfo
>> tableToRegionsMap
= snapshot
.getTableToRegionMap();
795 for (TableName table
: tables
) {
796 float[] deltaLocality
= new float[3];
797 float[] locality
= new float[3];
798 if (!this.targetTableSet
.isEmpty()
799 && !this.targetTableSet
.contains(table
)) {
802 List
<RegionInfo
> regions
= tableToRegionsMap
.get(table
);
803 System
.out
.println("==================================================");
804 System
.out
.println("Assignment Plan Projection Report For Table: " + table
);
805 System
.out
.println("\t Total regions: " + regions
.size());
806 System
.out
.println("\t" + movesPerTable
.get(table
)
807 + " primaries will move due to their primary has changed");
808 for (RegionInfo currentRegion
: regions
) {
809 Map
<String
, Float
> regionLocality
= regionLocalityMap
.get(currentRegion
811 if (regionLocality
== null) {
814 List
<ServerName
> oldServers
= oldPlan
.getFavoredNodes(currentRegion
);
815 List
<ServerName
> newServers
= newPlan
.getFavoredNodes(currentRegion
);
816 if (newServers
!= null && oldServers
!= null) {
818 for (FavoredNodesPlan
.Position p
: FavoredNodesPlan
.Position
.values()) {
819 ServerName newServer
= newServers
.get(p
.ordinal());
820 ServerName oldServer
= oldServers
.get(p
.ordinal());
821 Float oldLocality
= 0f
;
822 if (oldServers
!= null) {
823 oldLocality
= regionLocality
.get(oldServer
.getHostname());
824 if (oldLocality
== null) {
827 locality
[i
] += oldLocality
;
829 Float newLocality
= regionLocality
.get(newServer
.getHostname());
830 if (newLocality
== null) {
833 deltaLocality
[i
] += newLocality
- oldLocality
;
838 DecimalFormat df
= new java
.text
.DecimalFormat( "#.##");
839 for (int i
= 0; i
< deltaLocality
.length
; i
++) {
840 System
.out
.print("\t\t Baseline locality for ");
842 System
.out
.print("primary ");
844 System
.out
.print("secondary ");
846 System
.out
.print("tertiary ");
848 System
.out
.println(df
.format(100 * locality
[i
] / regions
.size()) + "%");
849 System
.out
.print("\t\t Locality will change with the new plan: ");
850 System
.out
.println(df
.format(100 * deltaLocality
[i
] / regions
.size())
853 System
.out
.println("\t Baseline dispersion");
854 printDispersionScores(table
, snapshot
, regions
.size(), null, true);
855 System
.out
.println("\t Projected dispersion");
856 printDispersionScores(table
, snapshot
, regions
.size(), newPlan
, true);
860 public void printDispersionScores(TableName table
,
861 SnapshotOfRegionAssignmentFromMeta snapshot
, int numRegions
, FavoredNodesPlan newPlan
,
862 boolean simplePrint
) {
863 if (!this.targetTableSet
.isEmpty() && !this.targetTableSet
.contains(table
)) {
866 AssignmentVerificationReport report
= new AssignmentVerificationReport();
867 report
.fillUpDispersion(table
, snapshot
, newPlan
);
868 List
<Float
> dispersion
= report
.getDispersionInformation();
870 DecimalFormat df
= new java
.text
.DecimalFormat("#.##");
871 System
.out
.println("\tAvg dispersion score: "
872 + df
.format(dispersion
.get(0)) + " hosts;\tMax dispersion score: "
873 + df
.format(dispersion
.get(1)) + " hosts;\tMin dispersion score: "
874 + df
.format(dispersion
.get(2)) + " hosts;");
876 LOG
.info("For Table: " + table
+ " ; #Total Regions: " + numRegions
877 + " ; The average dispersion score is " + dispersion
.get(0));
881 public void printLocalityAndDispersionForCurrentPlan(
882 Map
<String
, Map
<String
, Float
>> regionLocalityMap
) throws IOException
{
883 SnapshotOfRegionAssignmentFromMeta snapshot
= this.getRegionAssignmentSnapshot();
884 FavoredNodesPlan assignmentPlan
= snapshot
.getExistingAssignmentPlan();
885 Set
<TableName
> tables
= snapshot
.getTableSet();
886 Map
<TableName
, List
<RegionInfo
>> tableToRegionsMap
= snapshot
887 .getTableToRegionMap();
888 for (TableName table
: tables
) {
889 float[] locality
= new float[3];
890 if (!this.targetTableSet
.isEmpty()
891 && !this.targetTableSet
.contains(table
)) {
894 List
<RegionInfo
> regions
= tableToRegionsMap
.get(table
);
895 for (RegionInfo currentRegion
: regions
) {
896 Map
<String
, Float
> regionLocality
= regionLocalityMap
.get(currentRegion
898 if (regionLocality
== null) {
901 List
<ServerName
> servers
= assignmentPlan
.getFavoredNodes(currentRegion
);
902 if (servers
!= null) {
904 for (FavoredNodesPlan
.Position p
: FavoredNodesPlan
.Position
.values()) {
905 ServerName server
= servers
.get(p
.ordinal());
906 Float currentLocality
= 0f
;
907 if (servers
!= null) {
908 currentLocality
= regionLocality
.get(server
.getHostname());
909 if (currentLocality
== null) {
910 currentLocality
= 0f
;
912 locality
[i
] += currentLocality
;
918 for (int i
= 0; i
< locality
.length
; i
++) {
927 float avgLocality
= 100 * locality
[i
] / regions
.size();
928 LOG
.info("For Table: " + table
+ " ; #Total Regions: " + regions
.size()
929 + " ; The average locality for " + copy
+ " is " + avgLocality
+ " %");
931 printDispersionScores(table
, snapshot
, regions
.size(), null, false);
936 * @param favoredNodesStr The String of favored nodes
937 * @return the list of ServerName for the byte array of favored nodes.
939 public static List
<ServerName
> getFavoredNodeList(String favoredNodesStr
) {
940 String
[] favoredNodesArray
= StringUtils
.split(favoredNodesStr
, ",");
941 if (favoredNodesArray
== null)
944 List
<ServerName
> serverList
= new ArrayList
<>();
945 for (String hostNameAndPort
: favoredNodesArray
) {
946 serverList
.add(ServerName
.valueOf(hostNameAndPort
, ServerName
.NON_STARTCODE
));
951 public static void main(String args
[]) throws IOException
{
952 Options opt
= new Options();
953 opt
.addOption("w", "write", false, "write the assignments to hbase:meta only");
954 opt
.addOption("u", "update", false,
955 "update the assignments to hbase:meta and RegionServers together");
956 opt
.addOption("n", "dry-run", false, "do not write assignments to META");
957 opt
.addOption("v", "verify", false, "verify current assignments against META");
958 opt
.addOption("p", "print", false, "print the current assignment plan in META");
959 opt
.addOption("h", "help", false, "print usage");
960 opt
.addOption("d", "verification-details", false,
961 "print the details of verification report");
963 opt
.addOption("zk", true, "to set the zookeeper quorum");
964 opt
.addOption("fs", true, "to set HDFS");
965 opt
.addOption("hbase_root", true, "to set hbase_root directory");
967 opt
.addOption("overwrite", false,
968 "overwrite the favored nodes for a single region," +
969 "for example: -update -r regionName -f server1:port,server2:port,server3:port");
970 opt
.addOption("r", true, "The region name that needs to be updated");
971 opt
.addOption("f", true, "The new favored nodes");
973 opt
.addOption("tables", true,
974 "The list of table names splitted by ',' ;" +
975 "For example: -tables: t1,t2,...,tn");
976 opt
.addOption("l", "locality", true, "enforce the maximum locality");
977 opt
.addOption("m", "min-move", true, "enforce minimum assignment move");
978 opt
.addOption("diff", false, "calculate difference between assignment plans");
979 opt
.addOption("munkres", false,
980 "use munkres to place secondaries and tertiaries");
981 opt
.addOption("ld", "locality-dispersion", false, "print locality and dispersion " +
982 "information for current plan");
984 CommandLine cmd
= new GnuParser().parse(opt
, args
);
985 Configuration conf
= HBaseConfiguration
.create();
987 boolean enforceMinAssignmentMove
= true;
988 boolean enforceLocality
= true;
989 boolean verificationDetails
= false;
991 // Read all the options
992 if ((cmd
.hasOption("l") &&
993 cmd
.getOptionValue("l").equalsIgnoreCase("false")) ||
994 (cmd
.hasOption("locality") &&
995 cmd
.getOptionValue("locality").equalsIgnoreCase("false"))) {
996 enforceLocality
= false;
999 if ((cmd
.hasOption("m") &&
1000 cmd
.getOptionValue("m").equalsIgnoreCase("false")) ||
1001 (cmd
.hasOption("min-move") &&
1002 cmd
.getOptionValue("min-move").equalsIgnoreCase("false"))) {
1003 enforceMinAssignmentMove
= false;
1006 if (cmd
.hasOption("zk")) {
1007 conf
.set(HConstants
.ZOOKEEPER_QUORUM
, cmd
.getOptionValue("zk"));
1008 LOG
.info("Setting the zk quorum: " + conf
.get(HConstants
.ZOOKEEPER_QUORUM
));
1011 if (cmd
.hasOption("fs")) {
1012 conf
.set(FileSystem
.FS_DEFAULT_NAME_KEY
, cmd
.getOptionValue("fs"));
1013 LOG
.info("Setting the HDFS: " + conf
.get(FileSystem
.FS_DEFAULT_NAME_KEY
));
1016 if (cmd
.hasOption("hbase_root")) {
1017 conf
.set(HConstants
.HBASE_DIR
, cmd
.getOptionValue("hbase_root"));
1018 LOG
.info("Setting the hbase root directory: " + conf
.get(HConstants
.HBASE_DIR
));
1021 // Create the region placement obj
1022 try (RegionPlacementMaintainer rp
=
1023 new RegionPlacementMaintainer(conf
, enforceLocality
, enforceMinAssignmentMove
)) {
1025 if (cmd
.hasOption("d") || cmd
.hasOption("verification-details")) {
1026 verificationDetails
= true;
1029 if (cmd
.hasOption("tables")) {
1030 String tableNameListStr
= cmd
.getOptionValue("tables");
1031 String
[] tableNames
= StringUtils
.split(tableNameListStr
, ",");
1032 rp
.setTargetTableName(tableNames
);
1035 if (cmd
.hasOption("munkres")) {
1036 USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY
= true;
1039 // Read all the modes
1040 if (cmd
.hasOption("v") || cmd
.hasOption("verify")) {
1041 // Verify the region placement.
1042 rp
.verifyRegionPlacement(verificationDetails
);
1043 } else if (cmd
.hasOption("n") || cmd
.hasOption("dry-run")) {
1044 // Generate the assignment plan only without updating the hbase:meta and RS
1045 FavoredNodesPlan plan
= rp
.getNewAssignmentPlan();
1046 printAssignmentPlan(plan
);
1047 } else if (cmd
.hasOption("w") || cmd
.hasOption("write")) {
1048 // Generate the new assignment plan
1049 FavoredNodesPlan plan
= rp
.getNewAssignmentPlan();
1050 // Print the new assignment plan
1051 printAssignmentPlan(plan
);
1052 // Write the new assignment plan to META
1053 rp
.updateAssignmentPlanToMeta(plan
);
1054 } else if (cmd
.hasOption("u") || cmd
.hasOption("update")) {
1055 // Generate the new assignment plan
1056 FavoredNodesPlan plan
= rp
.getNewAssignmentPlan();
1057 // Print the new assignment plan
1058 printAssignmentPlan(plan
);
1059 // Update the assignment to hbase:meta and Region Servers
1060 rp
.updateAssignmentPlan(plan
);
1061 } else if (cmd
.hasOption("diff")) {
1062 FavoredNodesPlan newPlan
= rp
.getNewAssignmentPlan();
1063 Map
<String
, Map
<String
, Float
>> locality
=
1064 FSUtils
.getRegionDegreeLocalityMappingFromFS(conf
);
1065 Map
<TableName
, Integer
> movesPerTable
= rp
.getRegionsMovement(newPlan
);
1066 rp
.checkDifferencesWithOldPlan(movesPerTable
, locality
, newPlan
);
1067 System
.out
.println("Do you want to update the assignment plan? [y/n]");
1068 Scanner s
= new Scanner(System
.in
);
1069 String input
= s
.nextLine().trim();
1070 if (input
.equals("y")) {
1071 System
.out
.println("Updating assignment plan...");
1072 rp
.updateAssignmentPlan(newPlan
);
1075 } else if (cmd
.hasOption("ld")) {
1076 Map
<String
, Map
<String
, Float
>> locality
=
1077 FSUtils
.getRegionDegreeLocalityMappingFromFS(conf
);
1078 rp
.printLocalityAndDispersionForCurrentPlan(locality
);
1079 } else if (cmd
.hasOption("p") || cmd
.hasOption("print")) {
1080 FavoredNodesPlan plan
= rp
.getRegionAssignmentSnapshot().getExistingAssignmentPlan();
1081 printAssignmentPlan(plan
);
1082 } else if (cmd
.hasOption("overwrite")) {
1083 if (!cmd
.hasOption("f") || !cmd
.hasOption("r")) {
1084 throw new IllegalArgumentException("Please specify: " +
1085 " -update -r regionName -f server1:port,server2:port,server3:port");
1088 String regionName
= cmd
.getOptionValue("r");
1089 String favoredNodesStr
= cmd
.getOptionValue("f");
1090 LOG
.info("Going to update the region " + regionName
+ " with the new favored nodes " +
1092 List
<ServerName
> favoredNodes
= null;
1093 RegionInfo regionInfo
=
1094 rp
.getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap().get(regionName
);
1095 if (regionInfo
== null) {
1096 LOG
.error("Cannot find the region " + regionName
+ " from the META");
1099 favoredNodes
= getFavoredNodeList(favoredNodesStr
);
1100 } catch (IllegalArgumentException e
) {
1101 LOG
.error("Cannot parse the invalid favored nodes because " + e
);
1103 FavoredNodesPlan newPlan
= new FavoredNodesPlan();
1104 newPlan
.updateFavoredNodesMap(regionInfo
, favoredNodes
);
1105 rp
.updateAssignmentPlan(newPlan
);
1111 } catch (ParseException e
) {