2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
;
20 import com
.google
.errorprone
.annotations
.RestrictedApi
;
21 import java
.io
.IOException
;
22 import java
.util
.Arrays
;
23 import java
.util
.concurrent
.TimeUnit
;
24 import org
.apache
.hadoop
.conf
.Configured
;
25 import org
.apache
.hadoop
.hbase
.client
.Admin
;
26 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
27 import org
.apache
.hadoop
.hbase
.client
.Connection
;
28 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
29 import org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
;
30 import org
.apache
.hadoop
.hbase
.client
.Consistency
;
31 import org
.apache
.hadoop
.hbase
.client
.Get
;
32 import org
.apache
.hadoop
.hbase
.client
.Put
;
33 import org
.apache
.hadoop
.hbase
.client
.Result
;
34 import org
.apache
.hadoop
.hbase
.client
.Table
;
35 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
36 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
37 import org
.apache
.hadoop
.hbase
.metrics
.impl
.FastLongHistogram
;
38 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
39 import org
.apache
.hadoop
.hbase
.util
.Threads
;
40 import org
.apache
.hadoop
.util
.Tool
;
41 import org
.apache
.hadoop
.util
.ToolRunner
;
42 import org
.apache
.yetus
.audience
.InterfaceAudience
;
43 import org
.slf4j
.Logger
;
44 import org
.slf4j
.LoggerFactory
;
45 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.CommandLine
;
46 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.DefaultParser
;
47 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.HelpFormatter
;
48 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.Options
;
51 * A tool to evaluating the lag between primary replica and secondary replica.
53 * It simply adds a row to the primary replica, and then check how long before we can read it from
54 * the secondary replica.
56 @InterfaceAudience.Private
57 public class RegionReplicationLagEvaluation
extends Configured
implements Tool
{
59 private static final Logger LOG
= LoggerFactory
.getLogger(RegionReplicationLagEvaluation
.class);
61 public static final String TABLE_NAME
= "TestLagTable";
63 public static final String FAMILY_NAME
= "info";
65 public static final String QUALIFIER_NAME
= "qual";
67 public static final int VALUE_LENGTH
= 256;
69 public static final int ROW_LENGTH
= 16;
71 private static final Options OPTIONS
= new Options().addOption("t", "table", true, "Table name")
72 .addOption("rlen", "rlength", true, "The length of row key")
73 .addOption("vlen", "vlength", true, "The length of value")
74 .addRequiredOption("r", "rows", true, "Number of rows to test");
76 private FastLongHistogram histogram
= new FastLongHistogram();
78 @RestrictedApi(explanation
= "Should only be called in tests", link
= "",
79 allowedOnPath
= ".*/src/test/.*")
80 FastLongHistogram
getHistogram() {
85 public int run(String
[] args
) throws Exception
{
91 CommandLine cli
= new DefaultParser().parse(OPTIONS
, args
);
92 tableName
= TableName
.valueOf(cli
.getOptionValue("t", TABLE_NAME
));
93 rlen
= Integer
.parseInt(cli
.getOptionValue("rlen", String
.valueOf(ROW_LENGTH
)));
94 vlen
= Integer
.parseInt(cli
.getOptionValue("vlen", String
.valueOf(VALUE_LENGTH
)));
95 rows
= Integer
.parseInt(cli
.getOptionValue("r"));
96 } catch (Exception e
) {
97 LOG
.warn("Error parsing command line options", e
);
98 HelpFormatter formatter
= new HelpFormatter();
99 formatter
.printHelp(getClass().getName(), OPTIONS
);
102 exec(tableName
, rlen
, vlen
, rows
);
106 private void createTable(Admin admin
, TableName tableName
) throws IOException
{
107 TableDescriptor td
= TableDescriptorBuilder
.newBuilder(tableName
)
108 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(FAMILY_NAME
)).setRegionReplication(2)
110 admin
.createTable(td
);
113 private void checkLag(Table table
, int rlen
, int vlen
, int rows
) throws IOException
{
114 byte[] family
= Bytes
.toBytes(FAMILY_NAME
);
115 byte[] qualifier
= Bytes
.toBytes(QUALIFIER_NAME
);
116 LOG
.info("Test replication lag on table {} with {} rows", table
.getName(), rows
);
117 for (int i
= 0; i
< rows
; i
++) {
118 byte[] row
= new byte[rlen
];
120 byte[] value
= new byte[vlen
];
122 table
.put(new Put(row
).addColumn(family
, qualifier
, value
));
123 // get from secondary replica
124 Get get
= new Get(row
).setConsistency(Consistency
.TIMELINE
).setReplicaId(1);
125 long startNs
= System
.nanoTime();
126 for (int retry
= 0;; retry
++) {
127 Result result
= table
.get(get
);
128 byte[] gotValue
= result
.getValue(family
, qualifier
);
129 if (Arrays
.equals(value
, gotValue
)) {
132 long pauseTimeMs
= Math
.min(ConnectionUtils
.getPauseTime(1, retry
), 1000);
133 Threads
.sleepWithoutInterrupt(pauseTimeMs
);
135 long lagMs
= TimeUnit
.NANOSECONDS
.toMillis(System
.nanoTime() - startNs
);
136 histogram
.add(lagMs
, 1);
138 LOG
.info("Test finished, min lag {} ms, max lag {} ms, mean lag {} ms", histogram
.getMin(),
139 histogram
.getMax(), histogram
.getMean());
140 long[] q
= histogram
.getQuantiles(FastLongHistogram
.DEFAULT_QUANTILES
);
141 for (int i
= 0; i
< q
.length
; i
++) {
142 LOG
.info("{}% lag: {} ms", FastLongHistogram
.DEFAULT_QUANTILES
[i
] * 100, q
[i
]);
146 private void exec(TableName tableName
, int rlen
, int vlen
, int rows
) throws IOException
{
147 try (Connection conn
= ConnectionFactory
.createConnection(getConf())) {
148 try (Admin admin
= conn
.getAdmin()) {
149 if (!admin
.tableExists(tableName
)) {
150 createTable(admin
, tableName
);
153 try (Table table
= conn
.getTable(tableName
)) {
154 checkLag(table
, rlen
, vlen
, rows
);
159 public static void main(String
[] args
) throws Exception
{
161 ToolRunner
.run(HBaseConfiguration
.create(), new RegionReplicationLagEvaluation(), args
);