HBASE-22768 Revert to MPIR 2.9 (#433)
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / master / RegionPlacementMaintainer.java
blobfda0a9cf3026a2539507fe02ae56a642af5cefcf
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.master;
20 import java.io.Closeable;
21 import java.io.IOException;
22 import java.text.DecimalFormat;
23 import java.util.ArrayList;
24 import java.util.EnumSet;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Random;
30 import java.util.Scanner;
31 import java.util.Set;
32 import java.util.TreeMap;
33 import org.apache.commons.lang3.StringUtils;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.fs.FileSystem;
36 import org.apache.hadoop.hbase.ClusterMetrics.Option;
37 import org.apache.hadoop.hbase.HBaseConfiguration;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.ServerName;
40 import org.apache.hadoop.hbase.TableName;
41 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
42 import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
43 import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
44 import org.apache.hadoop.hbase.client.ConnectionFactory;
45 import org.apache.hadoop.hbase.client.RegionInfo;
46 import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper;
47 import org.apache.hadoop.hbase.favored.FavoredNodesPlan;
48 import org.apache.hadoop.hbase.security.User;
49 import org.apache.hadoop.hbase.util.FSUtils;
50 import org.apache.hadoop.hbase.util.FutureUtils;
51 import org.apache.hadoop.hbase.util.MunkresAssignment;
52 import org.apache.hadoop.hbase.util.Pair;
53 import org.apache.yetus.audience.InterfaceAudience;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
57 import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
58 import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
59 import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
60 import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
61 import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
62 import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
64 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
65 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
66 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
68 /**
69 * A tool that is used for manipulating and viewing favored nodes information
70 * for regions. Run with -h to get a list of the options
72 @InterfaceAudience.Private
73 // TODO: Remove? Unused. Partially implemented only.
74 public class RegionPlacementMaintainer implements Closeable {
75 private static final Logger LOG = LoggerFactory.getLogger(RegionPlacementMaintainer.class
76 .getName());
77 //The cost of a placement that should never be assigned.
78 private static final float MAX_COST = Float.POSITIVE_INFINITY;
80 // The cost of a placement that is undesirable but acceptable.
81 private static final float AVOID_COST = 100000f;
83 // The amount by which the cost of a placement is increased if it is the
84 // last slot of the server. This is done to more evenly distribute the slop
85 // amongst servers.
86 private static final float LAST_SLOT_COST_PENALTY = 0.5f;
88 // The amount by which the cost of a primary placement is penalized if it is
89 // not the host currently serving the region. This is done to minimize moves.
90 private static final float NOT_CURRENT_HOST_PENALTY = 0.1f;
92 private static boolean USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = false;
94 private Configuration conf;
95 private final boolean enforceLocality;
96 private final boolean enforceMinAssignmentMove;
97 private RackManager rackManager;
98 private Set<TableName> targetTableSet;
99 private AsyncClusterConnection connection;
101 public RegionPlacementMaintainer(Configuration conf) throws IOException {
102 this(conf, true, true);
105 public RegionPlacementMaintainer(Configuration conf, boolean enforceLocality,
106 boolean enforceMinAssignmentMove) {
107 this.conf = conf;
108 this.enforceLocality = enforceLocality;
109 this.enforceMinAssignmentMove = enforceMinAssignmentMove;
110 this.targetTableSet = new HashSet<>();
111 this.rackManager = new RackManager(conf);
114 private static void printHelp(Options opt) {
115 new HelpFormatter().printHelp(
116 "RegionPlacement < -w | -u | -n | -v | -t | -h | -overwrite -r regionName -f favoredNodes " +
117 "-diff>" +
118 " [-l false] [-m false] [-d] [-tables t1,t2,...tn] [-zk zk1,zk2,zk3]" +
119 " [-fs hdfs://a.b.c.d:9000] [-hbase_root /HBASE]", opt);
122 private AsyncClusterConnection getConnection() throws IOException {
123 if (connection == null) {
124 connection =
125 ClusterConnectionFactory.createAsyncClusterConnection(this.conf, null, User.getCurrent());
127 return connection;
130 public void setTargetTableName(String[] tableNames) {
131 if (tableNames != null) {
132 for (String table : tableNames)
133 this.targetTableSet.add(TableName.valueOf(table));
138 * @return the new RegionAssignmentSnapshot
140 public SnapshotOfRegionAssignmentFromMeta getRegionAssignmentSnapshot() throws IOException {
141 SnapshotOfRegionAssignmentFromMeta currentAssignmentShapshot =
142 new SnapshotOfRegionAssignmentFromMeta(ConnectionFactory.createConnection(conf));
143 currentAssignmentShapshot.initialize();
144 return currentAssignmentShapshot;
148 * Verify the region placement is consistent with the assignment plan
150 public List<AssignmentVerificationReport> verifyRegionPlacement(boolean isDetailMode)
151 throws IOException {
152 System.out.println("Start to verify the region assignment and " +
153 "generate the verification report");
154 // Get the region assignment snapshot
155 SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
157 // Get all the tables
158 Set<TableName> tables = snapshot.getTableSet();
160 // Get the region locality map
161 Map<String, Map<String, Float>> regionLocalityMap = null;
162 if (this.enforceLocality == true) {
163 regionLocalityMap = FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
165 List<AssignmentVerificationReport> reports = new ArrayList<>();
166 // Iterate all the tables to fill up the verification report
167 for (TableName table : tables) {
168 if (!this.targetTableSet.isEmpty() &&
169 !this.targetTableSet.contains(table)) {
170 continue;
172 AssignmentVerificationReport report = new AssignmentVerificationReport();
173 report.fillUp(table, snapshot, regionLocalityMap);
174 report.print(isDetailMode);
175 reports.add(report);
177 return reports;
181 * Generate the assignment plan for the existing table
183 * @param tableName
184 * @param assignmentSnapshot
185 * @param regionLocalityMap
186 * @param plan
187 * @param munkresForSecondaryAndTertiary if set on true the assignment plan
188 * for the tertiary and secondary will be generated with Munkres algorithm,
189 * otherwise will be generated using placeSecondaryAndTertiaryRS
190 * @throws IOException
192 private void genAssignmentPlan(TableName tableName,
193 SnapshotOfRegionAssignmentFromMeta assignmentSnapshot,
194 Map<String, Map<String, Float>> regionLocalityMap, FavoredNodesPlan plan,
195 boolean munkresForSecondaryAndTertiary) throws IOException {
196 // Get the all the regions for the current table
197 List<RegionInfo> regions =
198 assignmentSnapshot.getTableToRegionMap().get(tableName);
199 int numRegions = regions.size();
201 // Get the current assignment map
202 Map<RegionInfo, ServerName> currentAssignmentMap =
203 assignmentSnapshot.getRegionToRegionServerMap();
205 // Get the all the region servers
206 List<ServerName> servers = new ArrayList<>();
207 servers.addAll(
208 FutureUtils.get(getConnection().getAdmin().getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)))
209 .getLiveServerMetrics().keySet());
211 LOG.info("Start to generate assignment plan for " + numRegions +
212 " regions from table " + tableName + " with " +
213 servers.size() + " region servers");
215 int slotsPerServer = (int) Math.ceil((float) numRegions /
216 servers.size());
217 int regionSlots = slotsPerServer * servers.size();
219 // Compute the primary, secondary and tertiary costs for each region/server
220 // pair. These costs are based only on node locality and rack locality, and
221 // will be modified later.
222 float[][] primaryCost = new float[numRegions][regionSlots];
223 float[][] secondaryCost = new float[numRegions][regionSlots];
224 float[][] tertiaryCost = new float[numRegions][regionSlots];
226 if (this.enforceLocality && regionLocalityMap != null) {
227 // Transform the locality mapping into a 2D array, assuming that any
228 // unspecified locality value is 0.
229 float[][] localityPerServer = new float[numRegions][regionSlots];
230 for (int i = 0; i < numRegions; i++) {
231 Map<String, Float> serverLocalityMap =
232 regionLocalityMap.get(regions.get(i).getEncodedName());
233 if (serverLocalityMap == null) {
234 continue;
236 for (int j = 0; j < servers.size(); j++) {
237 String serverName = servers.get(j).getHostname();
238 if (serverName == null) {
239 continue;
241 Float locality = serverLocalityMap.get(serverName);
242 if (locality == null) {
243 continue;
245 for (int k = 0; k < slotsPerServer; k++) {
246 // If we can't find the locality of a region to a server, which occurs
247 // because locality is only reported for servers which have some
248 // blocks of a region local, then the locality for that pair is 0.
249 localityPerServer[i][j * slotsPerServer + k] = locality.floatValue();
254 // Compute the total rack locality for each region in each rack. The total
255 // rack locality is the sum of the localities of a region on all servers in
256 // a rack.
257 Map<String, Map<RegionInfo, Float>> rackRegionLocality = new HashMap<>();
258 for (int i = 0; i < numRegions; i++) {
259 RegionInfo region = regions.get(i);
260 for (int j = 0; j < regionSlots; j += slotsPerServer) {
261 String rack = rackManager.getRack(servers.get(j / slotsPerServer));
262 Map<RegionInfo, Float> rackLocality = rackRegionLocality.get(rack);
263 if (rackLocality == null) {
264 rackLocality = new HashMap<>();
265 rackRegionLocality.put(rack, rackLocality);
267 Float localityObj = rackLocality.get(region);
268 float locality = localityObj == null ? 0 : localityObj.floatValue();
269 locality += localityPerServer[i][j];
270 rackLocality.put(region, locality);
273 for (int i = 0; i < numRegions; i++) {
274 for (int j = 0; j < regionSlots; j++) {
275 String rack = rackManager.getRack(servers.get(j / slotsPerServer));
276 Float totalRackLocalityObj =
277 rackRegionLocality.get(rack).get(regions.get(i));
278 float totalRackLocality = totalRackLocalityObj == null ?
279 0 : totalRackLocalityObj.floatValue();
281 // Primary cost aims to favor servers with high node locality and low
282 // rack locality, so that secondaries and tertiaries can be chosen for
283 // nodes with high rack locality. This might give primaries with
284 // slightly less locality at first compared to a cost which only
285 // considers the node locality, but should be better in the long run.
286 primaryCost[i][j] = 1 - (2 * localityPerServer[i][j] -
287 totalRackLocality);
289 // Secondary cost aims to favor servers with high node locality and high
290 // rack locality since the tertiary will be chosen from the same rack as
291 // the secondary. This could be negative, but that is okay.
292 secondaryCost[i][j] = 2 - (localityPerServer[i][j] + totalRackLocality);
294 // Tertiary cost is only concerned with the node locality. It will later
295 // be restricted to only hosts on the same rack as the secondary.
296 tertiaryCost[i][j] = 1 - localityPerServer[i][j];
301 if (this.enforceMinAssignmentMove && currentAssignmentMap != null) {
302 // We want to minimize the number of regions which move as the result of a
303 // new assignment. Therefore, slightly penalize any placement which is for
304 // a host that is not currently serving the region.
305 for (int i = 0; i < numRegions; i++) {
306 for (int j = 0; j < servers.size(); j++) {
307 ServerName currentAddress = currentAssignmentMap.get(regions.get(i));
308 if (currentAddress != null &&
309 !currentAddress.equals(servers.get(j))) {
310 for (int k = 0; k < slotsPerServer; k++) {
311 primaryCost[i][j * slotsPerServer + k] += NOT_CURRENT_HOST_PENALTY;
318 // Artificially increase cost of last slot of each server to evenly
319 // distribute the slop, otherwise there will be a few servers with too few
320 // regions and many servers with the max number of regions.
321 for (int i = 0; i < numRegions; i++) {
322 for (int j = 0; j < regionSlots; j += slotsPerServer) {
323 primaryCost[i][j] += LAST_SLOT_COST_PENALTY;
324 secondaryCost[i][j] += LAST_SLOT_COST_PENALTY;
325 tertiaryCost[i][j] += LAST_SLOT_COST_PENALTY;
329 RandomizedMatrix randomizedMatrix = new RandomizedMatrix(numRegions,
330 regionSlots);
331 primaryCost = randomizedMatrix.transform(primaryCost);
332 int[] primaryAssignment = new MunkresAssignment(primaryCost).solve();
333 primaryAssignment = randomizedMatrix.invertIndices(primaryAssignment);
335 // Modify the secondary and tertiary costs for each region/server pair to
336 // prevent a region from being assigned to the same rack for both primary
337 // and either one of secondary or tertiary.
338 for (int i = 0; i < numRegions; i++) {
339 int slot = primaryAssignment[i];
340 String rack = rackManager.getRack(servers.get(slot / slotsPerServer));
341 for (int k = 0; k < servers.size(); k++) {
342 if (!rackManager.getRack(servers.get(k)).equals(rack)) {
343 continue;
345 if (k == slot / slotsPerServer) {
346 // Same node, do not place secondary or tertiary here ever.
347 for (int m = 0; m < slotsPerServer; m++) {
348 secondaryCost[i][k * slotsPerServer + m] = MAX_COST;
349 tertiaryCost[i][k * slotsPerServer + m] = MAX_COST;
351 } else {
352 // Same rack, do not place secondary or tertiary here if possible.
353 for (int m = 0; m < slotsPerServer; m++) {
354 secondaryCost[i][k * slotsPerServer + m] = AVOID_COST;
355 tertiaryCost[i][k * slotsPerServer + m] = AVOID_COST;
360 if (munkresForSecondaryAndTertiary) {
361 randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots);
362 secondaryCost = randomizedMatrix.transform(secondaryCost);
363 int[] secondaryAssignment = new MunkresAssignment(secondaryCost).solve();
364 secondaryAssignment = randomizedMatrix.invertIndices(secondaryAssignment);
366 // Modify the tertiary costs for each region/server pair to ensure that a
367 // region is assigned to a tertiary server on the same rack as its secondary
368 // server, but not the same server in that rack.
369 for (int i = 0; i < numRegions; i++) {
370 int slot = secondaryAssignment[i];
371 String rack = rackManager.getRack(servers.get(slot / slotsPerServer));
372 for (int k = 0; k < servers.size(); k++) {
373 if (k == slot / slotsPerServer) {
374 // Same node, do not place tertiary here ever.
375 for (int m = 0; m < slotsPerServer; m++) {
376 tertiaryCost[i][k * slotsPerServer + m] = MAX_COST;
378 } else {
379 if (rackManager.getRack(servers.get(k)).equals(rack)) {
380 continue;
382 // Different rack, do not place tertiary here if possible.
383 for (int m = 0; m < slotsPerServer; m++) {
384 tertiaryCost[i][k * slotsPerServer + m] = AVOID_COST;
390 randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots);
391 tertiaryCost = randomizedMatrix.transform(tertiaryCost);
392 int[] tertiaryAssignment = new MunkresAssignment(tertiaryCost).solve();
393 tertiaryAssignment = randomizedMatrix.invertIndices(tertiaryAssignment);
395 for (int i = 0; i < numRegions; i++) {
396 List<ServerName> favoredServers
397 = new ArrayList<>(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM);
398 ServerName s = servers.get(primaryAssignment[i] / slotsPerServer);
399 favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(),
400 ServerName.NON_STARTCODE));
402 s = servers.get(secondaryAssignment[i] / slotsPerServer);
403 favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(),
404 ServerName.NON_STARTCODE));
406 s = servers.get(tertiaryAssignment[i] / slotsPerServer);
407 favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(),
408 ServerName.NON_STARTCODE));
409 // Update the assignment plan
410 plan.updateFavoredNodesMap(regions.get(i), favoredServers);
412 LOG.info("Generated the assignment plan for " + numRegions +
413 " regions from table " + tableName + " with " +
414 servers.size() + " region servers");
415 LOG.info("Assignment plan for secondary and tertiary generated " +
416 "using MunkresAssignment");
417 } else {
418 Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>();
419 for (int i = 0; i < numRegions; i++) {
420 primaryRSMap.put(regions.get(i), servers.get(primaryAssignment[i] / slotsPerServer));
422 FavoredNodeAssignmentHelper favoredNodeHelper =
423 new FavoredNodeAssignmentHelper(servers, conf);
424 favoredNodeHelper.initialize();
425 Map<RegionInfo, ServerName[]> secondaryAndTertiaryMap =
426 favoredNodeHelper.placeSecondaryAndTertiaryWithRestrictions(primaryRSMap);
427 for (int i = 0; i < numRegions; i++) {
428 List<ServerName> favoredServers
429 = new ArrayList<>(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM);
430 RegionInfo currentRegion = regions.get(i);
431 ServerName s = primaryRSMap.get(currentRegion);
432 favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(),
433 ServerName.NON_STARTCODE));
435 ServerName[] secondaryAndTertiary =
436 secondaryAndTertiaryMap.get(currentRegion);
437 s = secondaryAndTertiary[0];
438 favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(),
439 ServerName.NON_STARTCODE));
441 s = secondaryAndTertiary[1];
442 favoredServers.add(ServerName.valueOf(s.getHostname(), s.getPort(),
443 ServerName.NON_STARTCODE));
444 // Update the assignment plan
445 plan.updateFavoredNodesMap(regions.get(i), favoredServers);
447 LOG.info("Generated the assignment plan for " + numRegions +
448 " regions from table " + tableName + " with " +
449 servers.size() + " region servers");
450 LOG.info("Assignment plan for secondary and tertiary generated " +
451 "using placeSecondaryAndTertiaryWithRestrictions method");
455 public FavoredNodesPlan getNewAssignmentPlan() throws IOException {
456 // Get the current region assignment snapshot by scanning from the META
457 SnapshotOfRegionAssignmentFromMeta assignmentSnapshot =
458 this.getRegionAssignmentSnapshot();
460 // Get the region locality map
461 Map<String, Map<String, Float>> regionLocalityMap = null;
462 if (this.enforceLocality) {
463 regionLocalityMap = FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
465 // Initialize the assignment plan
466 FavoredNodesPlan plan = new FavoredNodesPlan();
468 // Get the table to region mapping
469 Map<TableName, List<RegionInfo>> tableToRegionMap =
470 assignmentSnapshot.getTableToRegionMap();
471 LOG.info("Start to generate the new assignment plan for the " +
472 + tableToRegionMap.keySet().size() + " tables" );
473 for (TableName table : tableToRegionMap.keySet()) {
474 try {
475 if (!this.targetTableSet.isEmpty() &&
476 !this.targetTableSet.contains(table)) {
477 continue;
479 // TODO: maybe run the placement in parallel for each table
480 genAssignmentPlan(table, assignmentSnapshot, regionLocalityMap, plan,
481 USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY);
482 } catch (Exception e) {
483 LOG.error("Get some exceptions for placing primary region server" +
484 "for table " + table + " because " + e);
487 LOG.info("Finish to generate the new assignment plan for the " +
488 + tableToRegionMap.keySet().size() + " tables" );
489 return plan;
492 @Override
493 public void close() throws IOException {
494 Closeables.close(connection, true);
498 * Some algorithms for solving the assignment problem may traverse workers or
499 * jobs in linear order which may result in skewing the assignments of the
500 * first jobs in the matrix toward the last workers in the matrix if the
501 * costs are uniform. To avoid this kind of clumping, we can randomize the
502 * rows and columns of the cost matrix in a reversible way, such that the
503 * solution to the assignment problem can be interpreted in terms of the
504 * original untransformed cost matrix. Rows and columns are transformed
505 * independently such that the elements contained in any row of the input
506 * matrix are the same as the elements in the corresponding output matrix,
507 * and each row has its elements transformed in the same way. Similarly for
508 * columns.
510 protected static class RandomizedMatrix {
511 private final int rows;
512 private final int cols;
513 private final int[] rowTransform;
514 private final int[] rowInverse;
515 private final int[] colTransform;
516 private final int[] colInverse;
519 * Create a randomization scheme for a matrix of a given size.
520 * @param rows the number of rows in the matrix
521 * @param cols the number of columns in the matrix
523 public RandomizedMatrix(int rows, int cols) {
524 this.rows = rows;
525 this.cols = cols;
526 Random random = new Random();
527 rowTransform = new int[rows];
528 rowInverse = new int[rows];
529 for (int i = 0; i < rows; i++) {
530 rowTransform[i] = i;
532 // Shuffle the row indices.
533 for (int i = rows - 1; i >= 0; i--) {
534 int r = random.nextInt(i + 1);
535 int temp = rowTransform[r];
536 rowTransform[r] = rowTransform[i];
537 rowTransform[i] = temp;
539 // Generate the inverse row indices.
540 for (int i = 0; i < rows; i++) {
541 rowInverse[rowTransform[i]] = i;
544 colTransform = new int[cols];
545 colInverse = new int[cols];
546 for (int i = 0; i < cols; i++) {
547 colTransform[i] = i;
549 // Shuffle the column indices.
550 for (int i = cols - 1; i >= 0; i--) {
551 int r = random.nextInt(i + 1);
552 int temp = colTransform[r];
553 colTransform[r] = colTransform[i];
554 colTransform[i] = temp;
556 // Generate the inverse column indices.
557 for (int i = 0; i < cols; i++) {
558 colInverse[colTransform[i]] = i;
563 * Copy a given matrix into a new matrix, transforming each row index and
564 * each column index according to the randomization scheme that was created
565 * at construction time.
566 * @param matrix the cost matrix to transform
567 * @return a new matrix with row and column indices transformed
569 public float[][] transform(float[][] matrix) {
570 float[][] result = new float[rows][cols];
571 for (int i = 0; i < rows; i++) {
572 for (int j = 0; j < cols; j++) {
573 result[rowTransform[i]][colTransform[j]] = matrix[i][j];
576 return result;
580 * Copy a given matrix into a new matrix, transforming each row index and
581 * each column index according to the inverse of the randomization scheme
582 * that was created at construction time.
583 * @param matrix the cost matrix to be inverted
584 * @return a new matrix with row and column indices inverted
586 public float[][] invert(float[][] matrix) {
587 float[][] result = new float[rows][cols];
588 for (int i = 0; i < rows; i++) {
589 for (int j = 0; j < cols; j++) {
590 result[rowInverse[i]][colInverse[j]] = matrix[i][j];
593 return result;
597 * Given an array where each element {@code indices[i]} represents the
598 * randomized column index corresponding to randomized row index {@code i},
599 * create a new array with the corresponding inverted indices.
600 * @param indices an array of transformed indices to be inverted
601 * @return an array of inverted indices
603 public int[] invertIndices(int[] indices) {
604 int[] result = new int[indices.length];
605 for (int i = 0; i < indices.length; i++) {
606 result[rowInverse[i]] = colInverse[indices[i]];
608 return result;
613 * Print the assignment plan to the system output stream
614 * @param plan
616 public static void printAssignmentPlan(FavoredNodesPlan plan) {
617 if (plan == null) return;
618 LOG.info("========== Start to print the assignment plan ================");
619 // sort the map based on region info
620 Map<String, List<ServerName>> assignmentMap = new TreeMap<>(plan.getAssignmentMap());
622 for (Map.Entry<String, List<ServerName>> entry : assignmentMap.entrySet()) {
624 String serverList = FavoredNodeAssignmentHelper.getFavoredNodesAsString(entry.getValue());
625 String regionName = entry.getKey();
626 LOG.info("Region: " + regionName );
627 LOG.info("Its favored nodes: " + serverList);
629 LOG.info("========== Finish to print the assignment plan ================");
633 * Update the assignment plan into hbase:meta
634 * @param plan the assignments plan to be updated into hbase:meta
635 * @throws IOException if cannot update assignment plan in hbase:meta
637 public void updateAssignmentPlanToMeta(FavoredNodesPlan plan)
638 throws IOException {
639 try {
640 LOG.info("Start to update the hbase:meta with the new assignment plan");
641 Map<String, List<ServerName>> assignmentMap = plan.getAssignmentMap();
642 Map<RegionInfo, List<ServerName>> planToUpdate = new HashMap<>(assignmentMap.size());
643 Map<String, RegionInfo> regionToRegionInfoMap =
644 getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap();
645 for (Map.Entry<String, List<ServerName>> entry : assignmentMap.entrySet()) {
646 planToUpdate.put(regionToRegionInfoMap.get(entry.getKey()), entry.getValue());
649 FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(planToUpdate, conf);
650 LOG.info("Updated the hbase:meta with the new assignment plan");
651 } catch (Exception e) {
652 LOG.error("Failed to update hbase:meta with the new assignment" +
653 "plan because " + e.getMessage());
658 * Update the assignment plan to all the region servers
659 * @param plan
660 * @throws IOException
662 private void updateAssignmentPlanToRegionServers(FavoredNodesPlan plan)
663 throws IOException{
664 LOG.info("Start to update the region servers with the new assignment plan");
665 // Get the region to region server map
666 Map<ServerName, List<RegionInfo>> currentAssignment =
667 this.getRegionAssignmentSnapshot().getRegionServerToRegionMap();
669 // track of the failed and succeeded updates
670 int succeededNum = 0;
671 Map<ServerName, Exception> failedUpdateMap = new HashMap<>();
673 for (Map.Entry<ServerName, List<RegionInfo>> entry :
674 currentAssignment.entrySet()) {
675 List<Pair<RegionInfo, List<ServerName>>> regionUpdateInfos = new ArrayList<>();
676 try {
677 // Keep track of the favored updates for the current region server
678 FavoredNodesPlan singleServerPlan = null;
679 // Find out all the updates for the current region server
680 for (RegionInfo region : entry.getValue()) {
681 List<ServerName> favoredServerList = plan.getFavoredNodes(region);
682 if (favoredServerList != null &&
683 favoredServerList.size() == FavoredNodeAssignmentHelper.FAVORED_NODES_NUM) {
684 // Create the single server plan if necessary
685 if (singleServerPlan == null) {
686 singleServerPlan = new FavoredNodesPlan();
688 // Update the single server update
689 singleServerPlan.updateFavoredNodesMap(region, favoredServerList);
690 regionUpdateInfos.add(new Pair<>(region, favoredServerList));
693 if (singleServerPlan != null) {
694 // Update the current region server with its updated favored nodes
695 AsyncRegionServerAdmin rsAdmin = getConnection().getRegionServerAdmin(entry.getKey());
696 UpdateFavoredNodesRequest request =
697 RequestConverter.buildUpdateFavoredNodesRequest(regionUpdateInfos);
698 UpdateFavoredNodesResponse updateFavoredNodesResponse =
699 FutureUtils.get(rsAdmin.updateFavoredNodes(request));
700 LOG.info("Region server " +
701 FutureUtils.get(rsAdmin.getServerInfo(RequestConverter.buildGetServerInfoRequest()))
702 .getServerInfo() +
703 " has updated " + updateFavoredNodesResponse.getResponse() + " / " +
704 singleServerPlan.getAssignmentMap().size() + " regions with the assignment plan");
705 succeededNum++;
707 } catch (Exception e) {
708 failedUpdateMap.put(entry.getKey(), e);
711 // log the succeeded updates
712 LOG.info("Updated " + succeededNum + " region servers with " +
713 "the new assignment plan");
715 // log the failed updates
716 int failedNum = failedUpdateMap.size();
717 if (failedNum != 0) {
718 LOG.error("Failed to update the following + " + failedNum +
719 " region servers with its corresponding favored nodes");
720 for (Map.Entry<ServerName, Exception> entry :
721 failedUpdateMap.entrySet() ) {
722 LOG.error("Failed to update " + entry.getKey().getAddress() +
723 " because of " + entry.getValue().getMessage());
728 public void updateAssignmentPlan(FavoredNodesPlan plan)
729 throws IOException {
730 LOG.info("Start to update the new assignment plan for the hbase:meta table and" +
731 " the region servers");
732 // Update the new assignment plan to META
733 updateAssignmentPlanToMeta(plan);
734 // Update the new assignment plan to Region Servers
735 updateAssignmentPlanToRegionServers(plan);
736 LOG.info("Finish to update the new assignment plan for the hbase:meta table and" +
737 " the region servers");
741 * Return how many regions will move per table since their primary RS will
742 * change
744 * @param newPlan - new AssignmentPlan
745 * @return how many primaries will move per table
747 public Map<TableName, Integer> getRegionsMovement(FavoredNodesPlan newPlan)
748 throws IOException {
749 Map<TableName, Integer> movesPerTable = new HashMap<>();
750 SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
751 Map<TableName, List<RegionInfo>> tableToRegions = snapshot
752 .getTableToRegionMap();
753 FavoredNodesPlan oldPlan = snapshot.getExistingAssignmentPlan();
754 Set<TableName> tables = snapshot.getTableSet();
755 for (TableName table : tables) {
756 int movedPrimaries = 0;
757 if (!this.targetTableSet.isEmpty()
758 && !this.targetTableSet.contains(table)) {
759 continue;
761 List<RegionInfo> regions = tableToRegions.get(table);
762 for (RegionInfo region : regions) {
763 List<ServerName> oldServers = oldPlan.getFavoredNodes(region);
764 List<ServerName> newServers = newPlan.getFavoredNodes(region);
765 if (oldServers != null && newServers != null) {
766 ServerName oldPrimary = oldServers.get(0);
767 ServerName newPrimary = newServers.get(0);
768 if (oldPrimary.compareTo(newPrimary) != 0) {
769 movedPrimaries++;
773 movesPerTable.put(table, movedPrimaries);
775 return movesPerTable;
779 * Compares two plans and check whether the locality dropped or increased
780 * (prints the information as a string) also prints the baseline locality
782 * @param movesPerTable - how many primary regions will move per table
783 * @param regionLocalityMap - locality map from FS
784 * @param newPlan - new assignment plan
785 * @throws IOException
787 public void checkDifferencesWithOldPlan(Map<TableName, Integer> movesPerTable,
788 Map<String, Map<String, Float>> regionLocalityMap, FavoredNodesPlan newPlan)
789 throws IOException {
790 // localities for primary, secondary and tertiary
791 SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
792 FavoredNodesPlan oldPlan = snapshot.getExistingAssignmentPlan();
793 Set<TableName> tables = snapshot.getTableSet();
794 Map<TableName, List<RegionInfo>> tableToRegionsMap = snapshot.getTableToRegionMap();
795 for (TableName table : tables) {
796 float[] deltaLocality = new float[3];
797 float[] locality = new float[3];
798 if (!this.targetTableSet.isEmpty()
799 && !this.targetTableSet.contains(table)) {
800 continue;
802 List<RegionInfo> regions = tableToRegionsMap.get(table);
803 System.out.println("==================================================");
804 System.out.println("Assignment Plan Projection Report For Table: " + table);
805 System.out.println("\t Total regions: " + regions.size());
806 System.out.println("\t" + movesPerTable.get(table)
807 + " primaries will move due to their primary has changed");
808 for (RegionInfo currentRegion : regions) {
809 Map<String, Float> regionLocality = regionLocalityMap.get(currentRegion
810 .getEncodedName());
811 if (regionLocality == null) {
812 continue;
814 List<ServerName> oldServers = oldPlan.getFavoredNodes(currentRegion);
815 List<ServerName> newServers = newPlan.getFavoredNodes(currentRegion);
816 if (newServers != null && oldServers != null) {
817 int i=0;
818 for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) {
819 ServerName newServer = newServers.get(p.ordinal());
820 ServerName oldServer = oldServers.get(p.ordinal());
821 Float oldLocality = 0f;
822 if (oldServers != null) {
823 oldLocality = regionLocality.get(oldServer.getHostname());
824 if (oldLocality == null) {
825 oldLocality = 0f;
827 locality[i] += oldLocality;
829 Float newLocality = regionLocality.get(newServer.getHostname());
830 if (newLocality == null) {
831 newLocality = 0f;
833 deltaLocality[i] += newLocality - oldLocality;
834 i++;
838 DecimalFormat df = new java.text.DecimalFormat( "#.##");
839 for (int i = 0; i < deltaLocality.length; i++) {
840 System.out.print("\t\t Baseline locality for ");
841 if (i == 0) {
842 System.out.print("primary ");
843 } else if (i == 1) {
844 System.out.print("secondary ");
845 } else if (i == 2) {
846 System.out.print("tertiary ");
848 System.out.println(df.format(100 * locality[i] / regions.size()) + "%");
849 System.out.print("\t\t Locality will change with the new plan: ");
850 System.out.println(df.format(100 * deltaLocality[i] / regions.size())
851 + "%");
853 System.out.println("\t Baseline dispersion");
854 printDispersionScores(table, snapshot, regions.size(), null, true);
855 System.out.println("\t Projected dispersion");
856 printDispersionScores(table, snapshot, regions.size(), newPlan, true);
860 public void printDispersionScores(TableName table,
861 SnapshotOfRegionAssignmentFromMeta snapshot, int numRegions, FavoredNodesPlan newPlan,
862 boolean simplePrint) {
863 if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) {
864 return;
866 AssignmentVerificationReport report = new AssignmentVerificationReport();
867 report.fillUpDispersion(table, snapshot, newPlan);
868 List<Float> dispersion = report.getDispersionInformation();
869 if (simplePrint) {
870 DecimalFormat df = new java.text.DecimalFormat("#.##");
871 System.out.println("\tAvg dispersion score: "
872 + df.format(dispersion.get(0)) + " hosts;\tMax dispersion score: "
873 + df.format(dispersion.get(1)) + " hosts;\tMin dispersion score: "
874 + df.format(dispersion.get(2)) + " hosts;");
875 } else {
876 LOG.info("For Table: " + table + " ; #Total Regions: " + numRegions
877 + " ; The average dispersion score is " + dispersion.get(0));
881 public void printLocalityAndDispersionForCurrentPlan(
882 Map<String, Map<String, Float>> regionLocalityMap) throws IOException {
883 SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
884 FavoredNodesPlan assignmentPlan = snapshot.getExistingAssignmentPlan();
885 Set<TableName> tables = snapshot.getTableSet();
886 Map<TableName, List<RegionInfo>> tableToRegionsMap = snapshot
887 .getTableToRegionMap();
888 for (TableName table : tables) {
889 float[] locality = new float[3];
890 if (!this.targetTableSet.isEmpty()
891 && !this.targetTableSet.contains(table)) {
892 continue;
894 List<RegionInfo> regions = tableToRegionsMap.get(table);
895 for (RegionInfo currentRegion : regions) {
896 Map<String, Float> regionLocality = regionLocalityMap.get(currentRegion
897 .getEncodedName());
898 if (regionLocality == null) {
899 continue;
901 List<ServerName> servers = assignmentPlan.getFavoredNodes(currentRegion);
902 if (servers != null) {
903 int i = 0;
904 for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) {
905 ServerName server = servers.get(p.ordinal());
906 Float currentLocality = 0f;
907 if (servers != null) {
908 currentLocality = regionLocality.get(server.getHostname());
909 if (currentLocality == null) {
910 currentLocality = 0f;
912 locality[i] += currentLocality;
914 i++;
918 for (int i = 0; i < locality.length; i++) {
919 String copy = null;
920 if (i == 0) {
921 copy = "primary";
922 } else if (i == 1) {
923 copy = "secondary";
924 } else if (i == 2) {
925 copy = "tertiary" ;
927 float avgLocality = 100 * locality[i] / regions.size();
928 LOG.info("For Table: " + table + " ; #Total Regions: " + regions.size()
929 + " ; The average locality for " + copy+ " is " + avgLocality + " %");
931 printDispersionScores(table, snapshot, regions.size(), null, false);
936 * @param favoredNodesStr The String of favored nodes
937 * @return the list of ServerName for the byte array of favored nodes.
939 public static List<ServerName> getFavoredNodeList(String favoredNodesStr) {
940 String[] favoredNodesArray = StringUtils.split(favoredNodesStr, ",");
941 if (favoredNodesArray == null)
942 return null;
944 List<ServerName> serverList = new ArrayList<>();
945 for (String hostNameAndPort : favoredNodesArray) {
946 serverList.add(ServerName.valueOf(hostNameAndPort, ServerName.NON_STARTCODE));
948 return serverList;
951 public static void main(String args[]) throws IOException {
952 Options opt = new Options();
953 opt.addOption("w", "write", false, "write the assignments to hbase:meta only");
954 opt.addOption("u", "update", false,
955 "update the assignments to hbase:meta and RegionServers together");
956 opt.addOption("n", "dry-run", false, "do not write assignments to META");
957 opt.addOption("v", "verify", false, "verify current assignments against META");
958 opt.addOption("p", "print", false, "print the current assignment plan in META");
959 opt.addOption("h", "help", false, "print usage");
960 opt.addOption("d", "verification-details", false,
961 "print the details of verification report");
963 opt.addOption("zk", true, "to set the zookeeper quorum");
964 opt.addOption("fs", true, "to set HDFS");
965 opt.addOption("hbase_root", true, "to set hbase_root directory");
967 opt.addOption("overwrite", false,
968 "overwrite the favored nodes for a single region," +
969 "for example: -update -r regionName -f server1:port,server2:port,server3:port");
970 opt.addOption("r", true, "The region name that needs to be updated");
971 opt.addOption("f", true, "The new favored nodes");
973 opt.addOption("tables", true,
974 "The list of table names splitted by ',' ;" +
975 "For example: -tables: t1,t2,...,tn");
976 opt.addOption("l", "locality", true, "enforce the maximum locality");
977 opt.addOption("m", "min-move", true, "enforce minimum assignment move");
978 opt.addOption("diff", false, "calculate difference between assignment plans");
979 opt.addOption("munkres", false,
980 "use munkres to place secondaries and tertiaries");
981 opt.addOption("ld", "locality-dispersion", false, "print locality and dispersion " +
982 "information for current plan");
983 try {
984 CommandLine cmd = new GnuParser().parse(opt, args);
985 Configuration conf = HBaseConfiguration.create();
987 boolean enforceMinAssignmentMove = true;
988 boolean enforceLocality = true;
989 boolean verificationDetails = false;
991 // Read all the options
992 if ((cmd.hasOption("l") &&
993 cmd.getOptionValue("l").equalsIgnoreCase("false")) ||
994 (cmd.hasOption("locality") &&
995 cmd.getOptionValue("locality").equalsIgnoreCase("false"))) {
996 enforceLocality = false;
999 if ((cmd.hasOption("m") &&
1000 cmd.getOptionValue("m").equalsIgnoreCase("false")) ||
1001 (cmd.hasOption("min-move") &&
1002 cmd.getOptionValue("min-move").equalsIgnoreCase("false"))) {
1003 enforceMinAssignmentMove = false;
1006 if (cmd.hasOption("zk")) {
1007 conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue("zk"));
1008 LOG.info("Setting the zk quorum: " + conf.get(HConstants.ZOOKEEPER_QUORUM));
1011 if (cmd.hasOption("fs")) {
1012 conf.set(FileSystem.FS_DEFAULT_NAME_KEY, cmd.getOptionValue("fs"));
1013 LOG.info("Setting the HDFS: " + conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
1016 if (cmd.hasOption("hbase_root")) {
1017 conf.set(HConstants.HBASE_DIR, cmd.getOptionValue("hbase_root"));
1018 LOG.info("Setting the hbase root directory: " + conf.get(HConstants.HBASE_DIR));
1021 // Create the region placement obj
1022 try (RegionPlacementMaintainer rp =
1023 new RegionPlacementMaintainer(conf, enforceLocality, enforceMinAssignmentMove)) {
1025 if (cmd.hasOption("d") || cmd.hasOption("verification-details")) {
1026 verificationDetails = true;
1029 if (cmd.hasOption("tables")) {
1030 String tableNameListStr = cmd.getOptionValue("tables");
1031 String[] tableNames = StringUtils.split(tableNameListStr, ",");
1032 rp.setTargetTableName(tableNames);
1035 if (cmd.hasOption("munkres")) {
1036 USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = true;
1039 // Read all the modes
1040 if (cmd.hasOption("v") || cmd.hasOption("verify")) {
1041 // Verify the region placement.
1042 rp.verifyRegionPlacement(verificationDetails);
1043 } else if (cmd.hasOption("n") || cmd.hasOption("dry-run")) {
1044 // Generate the assignment plan only without updating the hbase:meta and RS
1045 FavoredNodesPlan plan = rp.getNewAssignmentPlan();
1046 printAssignmentPlan(plan);
1047 } else if (cmd.hasOption("w") || cmd.hasOption("write")) {
1048 // Generate the new assignment plan
1049 FavoredNodesPlan plan = rp.getNewAssignmentPlan();
1050 // Print the new assignment plan
1051 printAssignmentPlan(plan);
1052 // Write the new assignment plan to META
1053 rp.updateAssignmentPlanToMeta(plan);
1054 } else if (cmd.hasOption("u") || cmd.hasOption("update")) {
1055 // Generate the new assignment plan
1056 FavoredNodesPlan plan = rp.getNewAssignmentPlan();
1057 // Print the new assignment plan
1058 printAssignmentPlan(plan);
1059 // Update the assignment to hbase:meta and Region Servers
1060 rp.updateAssignmentPlan(plan);
1061 } else if (cmd.hasOption("diff")) {
1062 FavoredNodesPlan newPlan = rp.getNewAssignmentPlan();
1063 Map<String, Map<String, Float>> locality =
1064 FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
1065 Map<TableName, Integer> movesPerTable = rp.getRegionsMovement(newPlan);
1066 rp.checkDifferencesWithOldPlan(movesPerTable, locality, newPlan);
1067 System.out.println("Do you want to update the assignment plan? [y/n]");
1068 Scanner s = new Scanner(System.in);
1069 String input = s.nextLine().trim();
1070 if (input.equals("y")) {
1071 System.out.println("Updating assignment plan...");
1072 rp.updateAssignmentPlan(newPlan);
1074 s.close();
1075 } else if (cmd.hasOption("ld")) {
1076 Map<String, Map<String, Float>> locality =
1077 FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
1078 rp.printLocalityAndDispersionForCurrentPlan(locality);
1079 } else if (cmd.hasOption("p") || cmd.hasOption("print")) {
1080 FavoredNodesPlan plan = rp.getRegionAssignmentSnapshot().getExistingAssignmentPlan();
1081 printAssignmentPlan(plan);
1082 } else if (cmd.hasOption("overwrite")) {
1083 if (!cmd.hasOption("f") || !cmd.hasOption("r")) {
1084 throw new IllegalArgumentException("Please specify: " +
1085 " -update -r regionName -f server1:port,server2:port,server3:port");
1088 String regionName = cmd.getOptionValue("r");
1089 String favoredNodesStr = cmd.getOptionValue("f");
1090 LOG.info("Going to update the region " + regionName + " with the new favored nodes " +
1091 favoredNodesStr);
1092 List<ServerName> favoredNodes = null;
1093 RegionInfo regionInfo =
1094 rp.getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap().get(regionName);
1095 if (regionInfo == null) {
1096 LOG.error("Cannot find the region " + regionName + " from the META");
1097 } else {
1098 try {
1099 favoredNodes = getFavoredNodeList(favoredNodesStr);
1100 } catch (IllegalArgumentException e) {
1101 LOG.error("Cannot parse the invalid favored nodes because " + e);
1103 FavoredNodesPlan newPlan = new FavoredNodesPlan();
1104 newPlan.updateFavoredNodesMap(regionInfo, favoredNodes);
1105 rp.updateAssignmentPlan(newPlan);
1107 } else {
1108 printHelp(opt);
1111 } catch (ParseException e) {
1112 printHelp(opt);