HBASE-26921 Rewrite the counting cells part in TestMultiVersions (#4316)
[hbase.git] / hbase-client / src / main / java / org / apache / hadoop / hbase / client / RawAsyncHBaseAdmin.java
blob572eb0960ea1b3dd4ca1fab8ff962995b5d1ebea
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
21 import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
22 import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
23 import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
25 import edu.umd.cs.findbugs.annotations.Nullable;
26 import java.io.IOException;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.EnumSet;
31 import java.util.HashMap;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Optional;
35 import java.util.Set;
36 import java.util.concurrent.CompletableFuture;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.ConcurrentLinkedQueue;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicReference;
41 import java.util.function.BiConsumer;
42 import java.util.function.Consumer;
43 import java.util.function.Function;
44 import java.util.function.Supplier;
45 import java.util.regex.Pattern;
46 import java.util.stream.Collectors;
47 import java.util.stream.Stream;
48 import org.apache.hadoop.conf.Configuration;
49 import org.apache.hadoop.hbase.CacheEvictionStats;
50 import org.apache.hadoop.hbase.CacheEvictionStatsAggregator;
51 import org.apache.hadoop.hbase.CatalogFamilyFormat;
52 import org.apache.hadoop.hbase.ClientMetaTableAccessor;
53 import org.apache.hadoop.hbase.ClusterMetrics;
54 import org.apache.hadoop.hbase.ClusterMetrics.Option;
55 import org.apache.hadoop.hbase.ClusterMetricsBuilder;
56 import org.apache.hadoop.hbase.HConstants;
57 import org.apache.hadoop.hbase.HRegionLocation;
58 import org.apache.hadoop.hbase.NamespaceDescriptor;
59 import org.apache.hadoop.hbase.RegionLocations;
60 import org.apache.hadoop.hbase.RegionMetrics;
61 import org.apache.hadoop.hbase.RegionMetricsBuilder;
62 import org.apache.hadoop.hbase.ServerName;
63 import org.apache.hadoop.hbase.TableExistsException;
64 import org.apache.hadoop.hbase.TableName;
65 import org.apache.hadoop.hbase.TableNotDisabledException;
66 import org.apache.hadoop.hbase.TableNotEnabledException;
67 import org.apache.hadoop.hbase.TableNotFoundException;
68 import org.apache.hadoop.hbase.UnknownRegionException;
69 import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
70 import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
71 import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
72 import org.apache.hadoop.hbase.client.Scan.ReadType;
73 import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
74 import org.apache.hadoop.hbase.client.replication.TableCFs;
75 import org.apache.hadoop.hbase.client.security.SecurityCapability;
76 import org.apache.hadoop.hbase.exceptions.DeserializationException;
77 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
78 import org.apache.hadoop.hbase.net.Address;
79 import org.apache.hadoop.hbase.quotas.QuotaFilter;
80 import org.apache.hadoop.hbase.quotas.QuotaSettings;
81 import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
82 import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
83 import org.apache.hadoop.hbase.replication.ReplicationException;
84 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
85 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
86 import org.apache.hadoop.hbase.replication.SyncReplicationState;
87 import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
88 import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
89 import org.apache.hadoop.hbase.security.access.Permission;
90 import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
91 import org.apache.hadoop.hbase.security.access.UserPermission;
92 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
93 import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
94 import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
95 import org.apache.hadoop.hbase.util.Bytes;
96 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
97 import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
98 import org.apache.hadoop.hbase.util.Pair;
99 import org.apache.yetus.audience.InterfaceAudience;
100 import org.slf4j.Logger;
101 import org.slf4j.LoggerFactory;
103 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
104 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
105 import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
106 import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
107 import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
108 import org.apache.hbase.thirdparty.io.netty.util.Timeout;
109 import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
110 import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
112 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
113 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
114 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
115 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
116 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
117 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse;
118 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
119 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
120 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
121 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
122 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
123 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
124 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
125 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
126 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
127 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
128 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
129 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
130 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
131 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
132 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
133 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
134 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
135 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
136 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
137 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
138 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
139 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
140 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
141 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
142 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
143 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
144 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
145 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
146 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
147 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
148 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
149 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
150 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
151 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
152 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
153 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
154 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
155 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
156 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
157 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
158 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
159 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
160 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
161 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
162 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
163 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
164 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
165 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
166 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
167 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
168 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
169 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
170 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
171 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
172 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
173 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
174 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
175 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
176 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
177 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
178 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
179 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
180 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
181 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
182 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
183 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
184 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
185 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
186 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
187 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
188 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
189 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
190 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
191 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
192 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
193 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
194 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
195 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
196 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
197 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
198 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
199 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
200 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
201 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
202 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse;
203 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
204 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
205 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
206 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
207 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
208 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
209 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
210 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
211 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotCleanupEnabledResponse;
212 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
213 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
214 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
215 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
216 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
217 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
218 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
219 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
220 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
221 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
222 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
223 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
224 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
225 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
226 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
227 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
228 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
229 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
230 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
231 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
232 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
233 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
234 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerRequest;
235 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerResponse;
236 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
237 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
238 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
239 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
240 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerRequest;
241 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerResponse;
242 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
243 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
244 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
245 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
246 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
247 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
248 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
249 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
250 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
251 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
252 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
253 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
254 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
255 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse;
256 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
257 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
258 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
259 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
260 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
261 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse;
262 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
263 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
264 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
265 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
266 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse;
267 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
268 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
269 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
270 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
271 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
272 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
273 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
274 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
275 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
276 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
277 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
278 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
279 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
280 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
281 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
282 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
283 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
284 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
285 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
286 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
287 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
288 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
289 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
290 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
291 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
292 import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest;
293 import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse;
294 import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest;
295 import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupResponse;
296 import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupRequest;
297 import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupResponse;
298 import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest;
299 import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse;
300 import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest;
301 import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse;
302 import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest;
303 import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse;
304 import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest;
305 import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse;
306 import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupRequest;
307 import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupResponse;
308 import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersRequest;
309 import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersResponse;
310 import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest;
311 import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse;
312 import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest;
313 import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse;
314 import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupRequest;
315 import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupResponse;
316 import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigRequest;
317 import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigResponse;
318 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
319 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
320 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
321 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
322 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
323 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
324 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
325 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
326 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
327 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
328 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
329 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
330 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
331 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
332 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
333 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
334 import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
337 * The implementation of AsyncAdmin.
338 * <p>
339 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
340 * be finished inside the rpc framework thread, which means that the callbacks registered to the
341 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
342 * this class should not try to do time consuming tasks in the callbacks.
343 * @since 2.0.0
344 * @see AsyncHBaseAdmin
345 * @see AsyncConnection#getAdmin()
346 * @see AsyncConnection#getAdminBuilder()
348 @InterfaceAudience.Private
349 class RawAsyncHBaseAdmin implements AsyncAdmin {
351 public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
353 private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class);
355 private final AsyncConnectionImpl connection;
357 private final HashedWheelTimer retryTimer;
359 private final AsyncTable<AdvancedScanResultConsumer> metaTable;
361 private final long rpcTimeoutNs;
363 private final long operationTimeoutNs;
365 private final long pauseNs;
367 private final long pauseForCQTBENs;
369 private final int maxAttempts;
371 private final int startLogErrorsCnt;
373 private final NonceGenerator ng;
375 RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer,
376 AsyncAdminBuilderBase builder) {
377 this.connection = connection;
378 this.retryTimer = retryTimer;
379 this.metaTable = connection.getTable(META_TABLE_NAME);
380 this.rpcTimeoutNs = builder.rpcTimeoutNs;
381 this.operationTimeoutNs = builder.operationTimeoutNs;
382 this.pauseNs = builder.pauseNs;
383 if (builder.pauseForCQTBENs < builder.pauseNs) {
384 LOG.warn(
385 "Configured value of pauseForCQTBENs is {} ms, which is less than" +
386 " the normal pause value {} ms, use the greater one instead",
387 TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs),
388 TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
389 this.pauseForCQTBENs = builder.pauseNs;
390 } else {
391 this.pauseForCQTBENs = builder.pauseForCQTBENs;
393 this.maxAttempts = builder.maxAttempts;
394 this.startLogErrorsCnt = builder.startLogErrorsCnt;
395 this.ng = connection.getNonceGenerator();
398 <T> MasterRequestCallerBuilder<T> newMasterCaller() {
399 return this.connection.callerFactory.<T> masterRequest()
400 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
401 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
402 .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
403 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
406 private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
407 return this.connection.callerFactory.<T> adminRequest()
408 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
409 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
410 .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
411 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
414 @FunctionalInterface
415 private interface MasterRpcCall<RESP, REQ> {
416 void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
417 RpcCallback<RESP> done);
420 @FunctionalInterface
421 private interface AdminRpcCall<RESP, REQ> {
422 void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
423 RpcCallback<RESP> done);
426 @FunctionalInterface
427 private interface Converter<D, S> {
428 D convert(S src) throws IOException;
431 private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
432 MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
433 Converter<RESP, PRESP> respConverter) {
434 CompletableFuture<RESP> future = new CompletableFuture<>();
435 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
437 @Override
438 public void run(PRESP resp) {
439 if (controller.failed()) {
440 future.completeExceptionally(controller.getFailed());
441 } else {
442 try {
443 future.complete(respConverter.convert(resp));
444 } catch (IOException e) {
445 future.completeExceptionally(e);
450 return future;
453 private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
454 AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
455 Converter<RESP, PRESP> respConverter) {
456 CompletableFuture<RESP> future = new CompletableFuture<>();
457 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
459 @Override
460 public void run(PRESP resp) {
461 if (controller.failed()) {
462 future.completeExceptionally(controller.getFailed());
463 } else {
464 try {
465 future.complete(respConverter.convert(resp));
466 } catch (IOException e) {
467 future.completeExceptionally(e);
472 return future;
475 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
476 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
477 ProcedureBiConsumer consumer) {
478 return procedureCall(b -> {
479 }, preq, rpcCall, respConverter, consumer);
482 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq,
483 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
484 ProcedureBiConsumer consumer) {
485 return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer);
488 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(
489 Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq,
490 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
491 ProcedureBiConsumer consumer) {
492 MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller,
493 stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter));
494 prioritySetter.accept(builder);
495 CompletableFuture<Long> procFuture = builder.call();
496 CompletableFuture<Void> future = waitProcedureResult(procFuture);
497 addListener(future, consumer);
498 return future;
501 @FunctionalInterface
502 private interface TableOperator {
503 CompletableFuture<Void> operate(TableName table);
506 @Override
507 public CompletableFuture<Boolean> tableExists(TableName tableName) {
508 if (TableName.isMetaTableName(tableName)) {
509 return CompletableFuture.completedFuture(true);
511 return ClientMetaTableAccessor.tableExists(metaTable, tableName);
514 @Override
515 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) {
516 return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(null,
517 includeSysTables));
521 * {@link #listTableDescriptors(boolean)}
523 @Override
524 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern,
525 boolean includeSysTables) {
526 Preconditions.checkNotNull(pattern,
527 "pattern is null. If you don't specify a pattern, "
528 + "use listTableDescriptors(boolean) instead");
529 return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(pattern,
530 includeSysTables));
533 @Override
534 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) {
535 Preconditions.checkNotNull(tableNames,
536 "tableNames is null. If you don't specify tableNames, "
537 + "use listTableDescriptors(boolean) instead");
538 if (tableNames.isEmpty()) {
539 return CompletableFuture.completedFuture(Collections.emptyList());
541 return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames));
544 private CompletableFuture<List<TableDescriptor>>
545 getTableDescriptors(GetTableDescriptorsRequest request) {
546 return this.<List<TableDescriptor>> newMasterCaller()
547 .action((controller, stub) -> this
548 .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableDescriptor>> call(
549 controller, stub, request, (s, c, req, done) -> s.getTableDescriptors(c, req, done),
550 (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
551 .call();
554 @Override
555 public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) {
556 return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables));
559 @Override
560 public CompletableFuture<List<TableName>>
561 listTableNames(Pattern pattern, boolean includeSysTables) {
562 Preconditions.checkNotNull(pattern,
563 "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead");
564 return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables));
567 private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) {
568 return this
569 .<List<TableName>> newMasterCaller()
570 .action(
571 (controller, stub) -> this
572 .<GetTableNamesRequest, GetTableNamesResponse, List<TableName>> call(controller,
573 stub, request, (s, c, req, done) -> s.getTableNames(c, req, done),
574 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))).call();
577 @Override
578 public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) {
579 return this.<List<TableDescriptor>> newMasterCaller().action((controller, stub) -> this
580 .<ListTableDescriptorsByNamespaceRequest, ListTableDescriptorsByNamespaceResponse,
581 List<TableDescriptor>> call(
582 controller, stub,
583 ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
584 (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done),
585 (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
586 .call();
589 @Override
590 public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) {
591 return this.<List<TableName>> newMasterCaller().action((controller, stub) -> this
592 .<ListTableNamesByNamespaceRequest, ListTableNamesByNamespaceResponse,
593 List<TableName>> call(
594 controller, stub,
595 ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
596 (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done),
597 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList())))
598 .call();
601 @Override
602 public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
603 CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
604 addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName)
605 .action((controller, stub) -> this
606 .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
607 controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName),
608 (s, c, req, done) -> s.getTableDescriptors(c, req, done),
609 (resp) -> resp.getTableSchemaList()))
610 .call(), (tableSchemas, error) -> {
611 if (error != null) {
612 future.completeExceptionally(error);
613 return;
615 if (!tableSchemas.isEmpty()) {
616 future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
617 } else {
618 future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
621 return future;
624 @Override
625 public CompletableFuture<Void> createTable(TableDescriptor desc) {
626 return createTable(desc.getTableName(),
627 RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce()));
630 @Override
631 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
632 int numRegions) {
633 try {
634 return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
635 } catch (IllegalArgumentException e) {
636 return failedFuture(e);
640 @Override
641 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
642 Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
643 + " use createTable(TableDescriptor) instead");
644 try {
645 verifySplitKeys(splitKeys);
646 return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc,
647 splitKeys, ng.getNonceGroup(), ng.newNonce()));
648 } catch (IllegalArgumentException e) {
649 return failedFuture(e);
653 private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
654 Preconditions.checkNotNull(tableName, "table name is null");
655 return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request,
656 (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
657 new CreateTableProcedureBiConsumer(tableName));
660 @Override
661 public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
662 return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(),
663 RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
664 ng.newNonce()), (s, c, req, done) -> s.modifyTable(c, req, done),
665 (resp) -> resp.getProcId(), new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
668 @Override
669 public CompletableFuture<Void> modifyTableStoreFileTracker(TableName tableName, String dstSFT) {
670 return this
671 .<ModifyTableStoreFileTrackerRequest, ModifyTableStoreFileTrackerResponse> procedureCall(
672 tableName,
673 RequestConverter.buildModifyTableStoreFileTrackerRequest(tableName, dstSFT,
674 ng.getNonceGroup(), ng.newNonce()),
675 (s, c, req, done) -> s.modifyTableStoreFileTracker(c, req, done),
676 (resp) -> resp.getProcId(),
677 new ModifyTableStoreFileTrackerProcedureBiConsumer(this, tableName));
680 @Override
681 public CompletableFuture<Void> deleteTable(TableName tableName) {
682 return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName,
683 RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
684 (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
685 new DeleteTableProcedureBiConsumer(tableName));
688 @Override
689 public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
690 return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName,
691 RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
692 ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done),
693 (resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(tableName));
696 @Override
697 public CompletableFuture<Void> enableTable(TableName tableName) {
698 return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName,
699 RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
700 (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
701 new EnableTableProcedureBiConsumer(tableName));
704 @Override
705 public CompletableFuture<Void> disableTable(TableName tableName) {
706 return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName,
707 RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
708 (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
709 new DisableTableProcedureBiConsumer(tableName));
713 * Utility for completing passed TableState {@link CompletableFuture} <code>future</code>
714 * using passed parameters. Sets error or boolean result ('true' if table matches
715 * the passed-in targetState).
717 private static CompletableFuture<Boolean> completeCheckTableState(
718 CompletableFuture<Boolean> future, TableState tableState, Throwable error,
719 TableState.State targetState, TableName tableName) {
720 if (error != null) {
721 future.completeExceptionally(error);
722 } else {
723 if (tableState != null) {
724 future.complete(tableState.inStates(targetState));
725 } else {
726 future.completeExceptionally(new TableNotFoundException(tableName));
729 return future;
732 @Override
733 public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
734 if (TableName.isMetaTableName(tableName)) {
735 return CompletableFuture.completedFuture(true);
737 CompletableFuture<Boolean> future = new CompletableFuture<>();
738 addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
739 (tableState, error) -> {
740 completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error,
741 TableState.State.ENABLED, tableName);
743 return future;
746 @Override
747 public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
748 if (TableName.isMetaTableName(tableName)) {
749 return CompletableFuture.completedFuture(false);
751 CompletableFuture<Boolean> future = new CompletableFuture<>();
752 addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
753 (tableState, error) -> {
754 completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error,
755 TableState.State.DISABLED, tableName);
757 return future;
760 @Override
761 public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
762 if (TableName.isMetaTableName(tableName)) {
763 return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream
764 .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
766 CompletableFuture<Boolean> future = new CompletableFuture<>();
767 addListener(isTableEnabled(tableName), (enabled, error) -> {
768 if (error != null) {
769 if (error instanceof TableNotFoundException) {
770 future.complete(false);
771 } else {
772 future.completeExceptionally(error);
774 return;
776 if (!enabled) {
777 future.complete(false);
778 } else {
779 addListener(
780 ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName),
781 (locations, error1) -> {
782 if (error1 != null) {
783 future.completeExceptionally(error1);
784 return;
786 List<HRegionLocation> notDeployedRegions = locations.stream()
787 .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
788 if (notDeployedRegions.size() > 0) {
789 if (LOG.isDebugEnabled()) {
790 LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
792 future.complete(false);
793 return;
795 future.complete(true);
799 return future;
802 @Override
803 public CompletableFuture<Void> addColumnFamily(
804 TableName tableName, ColumnFamilyDescriptor columnFamily) {
805 return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
806 RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
807 ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
808 new AddColumnFamilyProcedureBiConsumer(tableName));
811 @Override
812 public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
813 return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName,
814 RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
815 ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done),
816 (resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(tableName));
819 @Override
820 public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
821 ColumnFamilyDescriptor columnFamily) {
822 return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName,
823 RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
824 ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done),
825 (resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(tableName));
828 @Override
829 public CompletableFuture<Void> modifyColumnFamilyStoreFileTracker(TableName tableName,
830 byte[] family, String dstSFT) {
831 return this
832 .<ModifyColumnStoreFileTrackerRequest, ModifyColumnStoreFileTrackerResponse> procedureCall(
833 tableName,
834 RequestConverter.buildModifyColumnStoreFileTrackerRequest(tableName, family, dstSFT,
835 ng.getNonceGroup(), ng.newNonce()),
836 (s, c, req, done) -> s.modifyColumnStoreFileTracker(c, req, done),
837 (resp) -> resp.getProcId(),
838 new ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(tableName));
841 @Override
842 public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
843 return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
844 RequestConverter.buildCreateNamespaceRequest(descriptor),
845 (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
846 new CreateNamespaceProcedureBiConsumer(descriptor.getName()));
849 @Override
850 public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
851 return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
852 RequestConverter.buildModifyNamespaceRequest(descriptor),
853 (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
854 new ModifyNamespaceProcedureBiConsumer(descriptor.getName()));
857 @Override
858 public CompletableFuture<Void> deleteNamespace(String name) {
859 return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
860 RequestConverter.buildDeleteNamespaceRequest(name),
861 (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
862 new DeleteNamespaceProcedureBiConsumer(name));
865 @Override
866 public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
867 return this
868 .<NamespaceDescriptor> newMasterCaller()
869 .action(
870 (controller, stub) -> this
871 .<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor>
872 call(controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name),
873 (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done), (resp)
874 -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call();
877 @Override
878 public CompletableFuture<List<String>> listNamespaces() {
879 return this
880 .<List<String>> newMasterCaller()
881 .action(
882 (controller, stub) -> this
883 .<ListNamespacesRequest, ListNamespacesResponse, List<String>> call(
884 controller, stub, ListNamespacesRequest.newBuilder().build(), (s, c, req,
885 done) -> s.listNamespaces(c, req, done),
886 (resp) -> resp.getNamespaceNameList())).call();
889 @Override
890 public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
891 return this
892 .<List<NamespaceDescriptor>> newMasterCaller().action((controller, stub) -> this
893 .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse,
894 List<NamespaceDescriptor>> call(controller, stub,
895 ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req, done) ->
896 s.listNamespaceDescriptors(c, req, done),
897 (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp))).call();
900 @Override
901 public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) {
902 return this.<List<RegionInfo>> newAdminCaller()
903 .action((controller, stub) -> this
904 .<GetOnlineRegionRequest, GetOnlineRegionResponse, List<RegionInfo>> adminCall(
905 controller, stub, RequestConverter.buildGetOnlineRegionRequest(),
906 (s, c, req, done) -> s.getOnlineRegion(c, req, done),
907 resp -> ProtobufUtil.getRegionInfos(resp)))
908 .serverName(serverName).call();
911 @Override
912 public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
913 if (tableName.equals(META_TABLE_NAME)) {
914 return connection.registry.getMetaRegionLocations()
915 .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
916 .collect(Collectors.toList()));
917 } else {
918 return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName)
919 .thenApply(
920 locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
923 @Override
924 public CompletableFuture<Void> flush(TableName tableName) {
925 return flush(tableName, null);
928 @Override
929 public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
930 CompletableFuture<Void> future = new CompletableFuture<>();
931 addListener(tableExists(tableName), (exists, err) -> {
932 if (err != null) {
933 future.completeExceptionally(err);
934 } else if (!exists) {
935 future.completeExceptionally(new TableNotFoundException(tableName));
936 } else {
937 addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
938 if (err2 != null) {
939 future.completeExceptionally(err2);
940 } else if (!tableEnabled) {
941 future.completeExceptionally(new TableNotEnabledException(tableName));
942 } else {
943 Map<String, String> props = new HashMap<>();
944 if (columnFamily != null) {
945 props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily));
947 addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
948 props), (ret, err3) -> {
949 if (err3 != null) {
950 future.completeExceptionally(err3);
951 } else {
952 future.complete(ret);
959 return future;
962 @Override
963 public CompletableFuture<Void> flushRegion(byte[] regionName) {
964 return flushRegionInternal(regionName, null, false).thenAccept(r -> {
968 @Override
969 public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) {
970 Preconditions.checkNotNull(columnFamily, "columnFamily is null."
971 + "If you don't specify a columnFamily, use flushRegion(regionName) instead");
972 return flushRegionInternal(regionName, columnFamily, false)
973 .thenAccept(r -> {});
977 * This method is for internal use only, where we need the response of the flush.
978 * <p/>
979 * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public
980 * API.
982 CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName,
983 byte[] columnFamily, boolean writeFlushWALMarker) {
984 CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>();
985 addListener(getRegionLocation(regionName), (location, err) -> {
986 if (err != null) {
987 future.completeExceptionally(err);
988 return;
990 ServerName serverName = location.getServerName();
991 if (serverName == null) {
992 future
993 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
994 return;
996 addListener(
997 flush(serverName, location.getRegion(), columnFamily, writeFlushWALMarker),
998 (ret, err2) -> {
999 if (err2 != null) {
1000 future.completeExceptionally(err2);
1001 } else {
1002 future.complete(ret);
1003 }});
1005 return future;
1008 private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo,
1009 byte[] columnFamily, boolean writeFlushWALMarker) {
1010 return this.<FlushRegionResponse> newAdminCaller().serverName(serverName)
1011 .action((controller, stub) -> this
1012 .<FlushRegionRequest, FlushRegionResponse, FlushRegionResponse> adminCall(controller, stub,
1013 RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(),
1014 columnFamily, writeFlushWALMarker),
1015 (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp))
1016 .call();
1019 @Override
1020 public CompletableFuture<Void> flushRegionServer(ServerName sn) {
1021 CompletableFuture<Void> future = new CompletableFuture<>();
1022 addListener(getRegions(sn), (hRegionInfos, err) -> {
1023 if (err != null) {
1024 future.completeExceptionally(err);
1025 return;
1027 List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1028 if (hRegionInfos != null) {
1029 hRegionInfos.forEach(
1030 region -> compactFutures.add(
1031 flush(sn, region, null, false).thenAccept(r -> {})
1035 addListener(CompletableFuture.allOf(
1036 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1037 if (err2 != null) {
1038 future.completeExceptionally(err2);
1039 } else {
1040 future.complete(ret);
1044 return future;
1047 @Override
1048 public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) {
1049 return compact(tableName, null, false, compactType);
1052 @Override
1053 public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
1054 CompactType compactType) {
1055 Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
1056 + "If you don't specify a columnFamily, use compact(TableName) instead");
1057 return compact(tableName, columnFamily, false, compactType);
1060 @Override
1061 public CompletableFuture<Void> compactRegion(byte[] regionName) {
1062 return compactRegion(regionName, null, false);
1065 @Override
1066 public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) {
1067 Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1068 + " If you don't specify a columnFamily, use compactRegion(regionName) instead");
1069 return compactRegion(regionName, columnFamily, false);
1072 @Override
1073 public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
1074 return compact(tableName, null, true, compactType);
1077 @Override
1078 public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
1079 CompactType compactType) {
1080 Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1081 + "If you don't specify a columnFamily, use compact(TableName) instead");
1082 return compact(tableName, columnFamily, true, compactType);
1085 @Override
1086 public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
1087 return compactRegion(regionName, null, true);
1090 @Override
1091 public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) {
1092 Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1093 + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead");
1094 return compactRegion(regionName, columnFamily, true);
1097 @Override
1098 public CompletableFuture<Void> compactRegionServer(ServerName sn) {
1099 return compactRegionServer(sn, false);
1102 @Override
1103 public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
1104 return compactRegionServer(sn, true);
1107 private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
1108 CompletableFuture<Void> future = new CompletableFuture<>();
1109 addListener(getRegions(sn), (hRegionInfos, err) -> {
1110 if (err != null) {
1111 future.completeExceptionally(err);
1112 return;
1114 List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1115 if (hRegionInfos != null) {
1116 hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
1118 addListener(CompletableFuture.allOf(
1119 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1120 if (err2 != null) {
1121 future.completeExceptionally(err2);
1122 } else {
1123 future.complete(ret);
1127 return future;
1130 private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
1131 boolean major) {
1132 CompletableFuture<Void> future = new CompletableFuture<>();
1133 addListener(getRegionLocation(regionName), (location, err) -> {
1134 if (err != null) {
1135 future.completeExceptionally(err);
1136 return;
1138 ServerName serverName = location.getServerName();
1139 if (serverName == null) {
1140 future
1141 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1142 return;
1144 addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
1145 (ret, err2) -> {
1146 if (err2 != null) {
1147 future.completeExceptionally(err2);
1148 } else {
1149 future.complete(ret);
1153 return future;
1157 * List all region locations for the specific table.
1159 private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
1160 if (TableName.META_TABLE_NAME.equals(tableName)) {
1161 CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
1162 addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> {
1163 if (err != null) {
1164 future.completeExceptionally(err);
1165 } else if (metaRegions == null || metaRegions.isEmpty() ||
1166 metaRegions.getDefaultRegionLocation() == null) {
1167 future.completeExceptionally(new IOException("meta region does not found"));
1168 } else {
1169 future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
1172 return future;
1173 } else {
1174 // For non-meta table, we fetch all locations by scanning hbase:meta table
1175 return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName);
1180 * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
1182 private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
1183 CompactType compactType) {
1184 CompletableFuture<Void> future = new CompletableFuture<>();
1186 switch (compactType) {
1187 case MOB:
1188 addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
1189 if (err != null) {
1190 future.completeExceptionally(err);
1191 return;
1193 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
1194 addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
1195 if (err2 != null) {
1196 future.completeExceptionally(err2);
1197 } else {
1198 future.complete(ret);
1202 break;
1203 case NORMAL:
1204 addListener(getTableHRegionLocations(tableName), (locations, err) -> {
1205 if (err != null) {
1206 future.completeExceptionally(err);
1207 return;
1209 if (locations == null || locations.isEmpty()) {
1210 future.completeExceptionally(new TableNotFoundException(tableName));
1212 CompletableFuture<?>[] compactFutures =
1213 locations.stream().filter(l -> l.getRegion() != null)
1214 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
1215 .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
1216 .toArray(CompletableFuture<?>[]::new);
1217 // future complete unless all of the compact futures are completed.
1218 addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
1219 if (err2 != null) {
1220 future.completeExceptionally(err2);
1221 } else {
1222 future.complete(ret);
1226 break;
1227 default:
1228 throw new IllegalArgumentException("Unknown compactType: " + compactType);
1230 return future;
1234 * Compact the region at specific region server.
1236 private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri,
1237 final boolean major, byte[] columnFamily) {
1238 return this
1239 .<Void> newAdminCaller()
1240 .serverName(sn)
1241 .action(
1242 (controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, Void> adminCall(
1243 controller, stub, RequestConverter.buildCompactRegionRequest(hri.getRegionName(),
1244 major, columnFamily), (s, c, req, done) -> s.compactRegion(c, req, done),
1245 resp -> null)).call();
1248 private byte[] toEncodeRegionName(byte[] regionName) {
1249 return RegionInfo.isEncodedRegionName(regionName) ? regionName :
1250 Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
1253 private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
1254 CompletableFuture<TableName> result) {
1255 addListener(getRegionLocation(encodeRegionName), (location, err) -> {
1256 if (err != null) {
1257 result.completeExceptionally(err);
1258 return;
1260 RegionInfo regionInfo = location.getRegion();
1261 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1262 result.completeExceptionally(
1263 new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
1264 return;
1266 if (!tableName.compareAndSet(null, regionInfo.getTable())) {
1267 if (!tableName.get().equals(regionInfo.getTable())) {
1268 // tables of this two region should be same.
1269 result.completeExceptionally(
1270 new IllegalArgumentException("Cannot merge regions from two different tables " +
1271 tableName.get() + " and " + regionInfo.getTable()));
1272 } else {
1273 result.complete(tableName.get());
1279 private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) {
1280 AtomicReference<TableName> tableNameRef = new AtomicReference<>();
1281 CompletableFuture<TableName> future = new CompletableFuture<>();
1282 for (byte[] encodedRegionName : encodedRegionNames) {
1283 checkAndGetTableName(encodedRegionName, tableNameRef, future);
1285 return future;
1288 @Override
1289 public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) {
1290 return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE);
1293 @Override
1294 public CompletableFuture<Boolean> isMergeEnabled() {
1295 return isSplitOrMergeOn(MasterSwitchType.MERGE);
1298 @Override
1299 public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) {
1300 return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT);
1303 @Override
1304 public CompletableFuture<Boolean> isSplitEnabled() {
1305 return isSplitOrMergeOn(MasterSwitchType.SPLIT);
1308 private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous,
1309 MasterSwitchType switchType) {
1310 SetSplitOrMergeEnabledRequest request =
1311 RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType);
1312 return this.<Boolean> newMasterCaller()
1313 .action((controller, stub) -> this
1314 .<SetSplitOrMergeEnabledRequest, SetSplitOrMergeEnabledResponse, Boolean> call(controller,
1315 stub, request, (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done),
1316 (resp) -> resp.getPrevValueList().get(0)))
1317 .call();
1320 private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) {
1321 IsSplitOrMergeEnabledRequest request =
1322 RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType);
1323 return this
1324 .<Boolean> newMasterCaller()
1325 .action(
1326 (controller, stub) -> this
1327 .<IsSplitOrMergeEnabledRequest, IsSplitOrMergeEnabledResponse, Boolean> call(
1328 controller, stub, request,
1329 (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done),
1330 (resp) -> resp.getEnabled())).call();
1333 @Override
1334 public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) {
1335 if (nameOfRegionsToMerge.size() < 2) {
1336 return failedFuture(new IllegalArgumentException(
1337 "Can not merge only " + nameOfRegionsToMerge.size() + " region"));
1339 CompletableFuture<Void> future = new CompletableFuture<>();
1340 byte[][] encodedNameOfRegionsToMerge =
1341 nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new);
1343 addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> {
1344 if (err != null) {
1345 future.completeExceptionally(err);
1346 return;
1349 final MergeTableRegionsRequest request;
1350 try {
1351 request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge,
1352 forcible, ng.getNonceGroup(), ng.newNonce());
1353 } catch (DeserializationException e) {
1354 future.completeExceptionally(e);
1355 return;
1358 addListener(
1359 this.procedureCall(tableName, request,
1360 MasterService.Interface::mergeTableRegions, MergeTableRegionsResponse::getProcId,
1361 new MergeTableRegionProcedureBiConsumer(tableName)),
1362 (ret, err2) -> {
1363 if (err2 != null) {
1364 future.completeExceptionally(err2);
1365 } else {
1366 future.complete(ret);
1370 return future;
1373 @Override
1374 public CompletableFuture<Void> split(TableName tableName) {
1375 CompletableFuture<Void> future = new CompletableFuture<>();
1376 addListener(tableExists(tableName), (exist, error) -> {
1377 if (error != null) {
1378 future.completeExceptionally(error);
1379 return;
1381 if (!exist) {
1382 future.completeExceptionally(new TableNotFoundException(tableName));
1383 return;
1385 addListener(metaTable
1386 .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
1387 .withStartRow(ClientMetaTableAccessor.getTableStartRowForMeta(tableName,
1388 ClientMetaTableAccessor.QueryType.REGION))
1389 .withStopRow(ClientMetaTableAccessor.getTableStopRowForMeta(tableName,
1390 ClientMetaTableAccessor.QueryType.REGION))),
1391 (results, err2) -> {
1392 if (err2 != null) {
1393 future.completeExceptionally(err2);
1394 return;
1396 if (results != null && !results.isEmpty()) {
1397 List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
1398 for (Result r : results) {
1399 if (r.isEmpty() || CatalogFamilyFormat.getRegionInfo(r) == null) {
1400 continue;
1402 RegionLocations rl = CatalogFamilyFormat.getRegionLocations(r);
1403 if (rl != null) {
1404 for (HRegionLocation h : rl.getRegionLocations()) {
1405 if (h != null && h.getServerName() != null) {
1406 RegionInfo hri = h.getRegion();
1407 if (hri == null || hri.isSplitParent() ||
1408 hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1409 continue;
1411 splitFutures.add(split(hri, null));
1416 addListener(
1417 CompletableFuture
1418 .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
1419 (ret, exception) -> {
1420 if (exception != null) {
1421 future.completeExceptionally(exception);
1422 return;
1424 future.complete(ret);
1426 } else {
1427 future.complete(null);
1431 return future;
1434 @Override
1435 public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
1436 CompletableFuture<Void> result = new CompletableFuture<>();
1437 if (splitPoint == null) {
1438 return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
1440 addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true),
1441 (loc, err) -> {
1442 if (err != null) {
1443 result.completeExceptionally(err);
1444 } else if (loc == null || loc.getRegion() == null) {
1445 result.completeExceptionally(new IllegalArgumentException(
1446 "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
1447 } else {
1448 addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
1449 if (err2 != null) {
1450 result.completeExceptionally(err2);
1451 } else {
1452 result.complete(ret);
1458 return result;
1461 @Override
1462 public CompletableFuture<Void> splitRegion(byte[] regionName) {
1463 CompletableFuture<Void> future = new CompletableFuture<>();
1464 addListener(getRegionLocation(regionName), (location, err) -> {
1465 if (err != null) {
1466 future.completeExceptionally(err);
1467 return;
1469 RegionInfo regionInfo = location.getRegion();
1470 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1471 future
1472 .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
1473 "Replicas are auto-split when their primary is split."));
1474 return;
1476 ServerName serverName = location.getServerName();
1477 if (serverName == null) {
1478 future
1479 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1480 return;
1482 addListener(split(regionInfo, null), (ret, err2) -> {
1483 if (err2 != null) {
1484 future.completeExceptionally(err2);
1485 } else {
1486 future.complete(ret);
1490 return future;
1493 @Override
1494 public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) {
1495 Preconditions.checkNotNull(splitPoint,
1496 "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
1497 CompletableFuture<Void> future = new CompletableFuture<>();
1498 addListener(getRegionLocation(regionName), (location, err) -> {
1499 if (err != null) {
1500 future.completeExceptionally(err);
1501 return;
1503 RegionInfo regionInfo = location.getRegion();
1504 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1505 future
1506 .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
1507 "Replicas are auto-split when their primary is split."));
1508 return;
1510 ServerName serverName = location.getServerName();
1511 if (serverName == null) {
1512 future
1513 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1514 return;
1516 if (regionInfo.getStartKey() != null &&
1517 Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0) {
1518 future.completeExceptionally(
1519 new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
1520 return;
1522 addListener(split(regionInfo, splitPoint), (ret, err2) -> {
1523 if (err2 != null) {
1524 future.completeExceptionally(err2);
1525 } else {
1526 future.complete(ret);
1530 return future;
1533 private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
1534 CompletableFuture<Void> future = new CompletableFuture<>();
1535 TableName tableName = hri.getTable();
1536 final SplitTableRegionRequest request;
1537 try {
1538 request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
1539 ng.newNonce());
1540 } catch (DeserializationException e) {
1541 future.completeExceptionally(e);
1542 return future;
1545 addListener(
1546 this.procedureCall(tableName,
1547 request, MasterService.Interface::splitRegion, SplitTableRegionResponse::getProcId,
1548 new SplitTableRegionProcedureBiConsumer(tableName)),
1549 (ret, err2) -> {
1550 if (err2 != null) {
1551 future.completeExceptionally(err2);
1552 } else {
1553 future.complete(ret);
1556 return future;
1559 @Override
1560 public CompletableFuture<Void> assign(byte[] regionName) {
1561 CompletableFuture<Void> future = new CompletableFuture<>();
1562 addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1563 if (err != null) {
1564 future.completeExceptionally(err);
1565 return;
1567 addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable())
1568 .action(((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
1569 controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
1570 (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)))
1571 .call(), (ret, err2) -> {
1572 if (err2 != null) {
1573 future.completeExceptionally(err2);
1574 } else {
1575 future.complete(ret);
1579 return future;
1582 @Override
1583 public CompletableFuture<Void> unassign(byte[] regionName) {
1584 CompletableFuture<Void> future = new CompletableFuture<>();
1585 addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1586 if (err != null) {
1587 future.completeExceptionally(err);
1588 return;
1590 addListener(
1591 this.<Void> newMasterCaller().priority(regionInfo.getTable())
1592 .action(((controller, stub) -> this
1593 .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
1594 RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()),
1595 (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)))
1596 .call(),
1597 (ret, err2) -> {
1598 if (err2 != null) {
1599 future.completeExceptionally(err2);
1600 } else {
1601 future.complete(ret);
1605 return future;
1608 @Override
1609 public CompletableFuture<Void> offline(byte[] regionName) {
1610 CompletableFuture<Void> future = new CompletableFuture<>();
1611 addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1612 if (err != null) {
1613 future.completeExceptionally(err);
1614 return;
1616 addListener(
1617 this.<Void> newMasterCaller().priority(regionInfo.getTable())
1618 .action(((controller, stub) -> this
1619 .<OfflineRegionRequest, OfflineRegionResponse, Void> call(controller, stub,
1620 RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
1621 (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)))
1622 .call(),
1623 (ret, err2) -> {
1624 if (err2 != null) {
1625 future.completeExceptionally(err2);
1626 } else {
1627 future.complete(ret);
1631 return future;
1634 @Override
1635 public CompletableFuture<Void> move(byte[] regionName) {
1636 CompletableFuture<Void> future = new CompletableFuture<>();
1637 addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1638 if (err != null) {
1639 future.completeExceptionally(err);
1640 return;
1642 addListener(
1643 moveRegion(regionInfo,
1644 RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
1645 (ret, err2) -> {
1646 if (err2 != null) {
1647 future.completeExceptionally(err2);
1648 } else {
1649 future.complete(ret);
1653 return future;
1656 @Override
1657 public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) {
1658 Preconditions.checkNotNull(destServerName,
1659 "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
1660 CompletableFuture<Void> future = new CompletableFuture<>();
1661 addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1662 if (err != null) {
1663 future.completeExceptionally(err);
1664 return;
1666 addListener(
1667 moveRegion(regionInfo, RequestConverter
1668 .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
1669 (ret, err2) -> {
1670 if (err2 != null) {
1671 future.completeExceptionally(err2);
1672 } else {
1673 future.complete(ret);
1677 return future;
1680 private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) {
1681 return this.<Void> newMasterCaller().priority(regionInfo.getTable())
1682 .action(
1683 (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
1684 stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null))
1685 .call();
1688 @Override
1689 public CompletableFuture<Void> setQuota(QuotaSettings quota) {
1690 return this
1691 .<Void> newMasterCaller()
1692 .action(
1693 (controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
1694 stub, QuotaSettings.buildSetQuotaRequestProto(quota),
1695 (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)).call();
1698 @Override
1699 public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
1700 CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
1701 Scan scan = QuotaTableUtil.makeScan(filter);
1702 this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build()
1703 .scan(scan, new AdvancedScanResultConsumer() {
1704 List<QuotaSettings> settings = new ArrayList<>();
1706 @Override
1707 public void onNext(Result[] results, ScanController controller) {
1708 for (Result result : results) {
1709 try {
1710 QuotaTableUtil.parseResultToCollection(result, settings);
1711 } catch (IOException e) {
1712 controller.terminate();
1713 future.completeExceptionally(e);
1718 @Override
1719 public void onError(Throwable error) {
1720 future.completeExceptionally(error);
1723 @Override
1724 public void onComplete() {
1725 future.complete(settings);
1728 return future;
1731 @Override
1732 public CompletableFuture<Void> addReplicationPeer(String peerId,
1733 ReplicationPeerConfig peerConfig, boolean enabled) {
1734 return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
1735 RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
1736 (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1737 new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
1740 @Override
1741 public CompletableFuture<Void> removeReplicationPeer(String peerId) {
1742 return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall(
1743 RequestConverter.buildRemoveReplicationPeerRequest(peerId),
1744 (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1745 new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER"));
1748 @Override
1749 public CompletableFuture<Void> enableReplicationPeer(String peerId) {
1750 return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall(
1751 RequestConverter.buildEnableReplicationPeerRequest(peerId),
1752 (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1753 new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER"));
1756 @Override
1757 public CompletableFuture<Void> disableReplicationPeer(String peerId) {
1758 return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall(
1759 RequestConverter.buildDisableReplicationPeerRequest(peerId),
1760 (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1761 new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER"));
1764 @Override
1765 public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
1766 return this.<ReplicationPeerConfig> newMasterCaller().action((controller, stub) -> this
1767 .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig>
1768 call(controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId),
1769 (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
1770 (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))).call();
1773 @Override
1774 public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
1775 ReplicationPeerConfig peerConfig) {
1776 return this
1777 .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse> procedureCall(
1778 RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
1779 (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
1780 (resp) -> resp.getProcId(),
1781 new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
1784 @Override
1785 public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId,
1786 SyncReplicationState clusterState) {
1787 return this.<TransitReplicationPeerSyncReplicationStateRequest,
1788 TransitReplicationPeerSyncReplicationStateResponse> procedureCall(
1789 RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId,
1790 clusterState),
1791 (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done),
1792 (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId,
1793 () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE"));
1796 @Override
1797 public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
1798 Map<TableName, List<String>> tableCfs) {
1799 if (tableCfs == null) {
1800 return failedFuture(new ReplicationException("tableCfs is null"));
1803 CompletableFuture<Void> future = new CompletableFuture<Void>();
1804 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1805 if (!completeExceptionally(future, error)) {
1806 ReplicationPeerConfig newPeerConfig =
1807 ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
1808 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1809 if (!completeExceptionally(future, error)) {
1810 future.complete(result);
1815 return future;
1818 @Override
1819 public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
1820 Map<TableName, List<String>> tableCfs) {
1821 if (tableCfs == null) {
1822 return failedFuture(new ReplicationException("tableCfs is null"));
1825 CompletableFuture<Void> future = new CompletableFuture<Void>();
1826 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1827 if (!completeExceptionally(future, error)) {
1828 ReplicationPeerConfig newPeerConfig = null;
1829 try {
1830 newPeerConfig = ReplicationPeerConfigUtil
1831 .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
1832 } catch (ReplicationException e) {
1833 future.completeExceptionally(e);
1834 return;
1836 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1837 if (!completeExceptionally(future, error)) {
1838 future.complete(result);
1843 return future;
1846 @Override
1847 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() {
1848 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null));
1851 @Override
1852 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) {
1853 Preconditions.checkNotNull(pattern,
1854 "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead");
1855 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern));
1858 private CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(
1859 ListReplicationPeersRequest request) {
1860 return this
1861 .<List<ReplicationPeerDescription>> newMasterCaller()
1862 .action(
1863 (controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse,
1864 List<ReplicationPeerDescription>> call(controller, stub, request,
1865 (s, c, req, done) -> s.listReplicationPeers(c, req, done),
1866 (resp) -> resp.getPeerDescList().stream()
1867 .map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
1868 .collect(Collectors.toList()))).call();
1871 @Override
1872 public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
1873 CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
1874 addListener(listTableDescriptors(), (tables, error) -> {
1875 if (!completeExceptionally(future, error)) {
1876 List<TableCFs> replicatedTableCFs = new ArrayList<>();
1877 tables.forEach(table -> {
1878 Map<String, Integer> cfs = new HashMap<>();
1879 Stream.of(table.getColumnFamilies())
1880 .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
1881 .forEach(column -> {
1882 cfs.put(column.getNameAsString(), column.getScope());
1884 if (!cfs.isEmpty()) {
1885 replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
1888 future.complete(replicatedTableCFs);
1891 return future;
1894 @Override
1895 public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
1896 SnapshotProtos.SnapshotDescription snapshot =
1897 ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
1898 try {
1899 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
1900 } catch (IllegalArgumentException e) {
1901 return failedFuture(e);
1903 CompletableFuture<Void> future = new CompletableFuture<>();
1904 final SnapshotRequest request =
1905 SnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
1906 .setNonce(ng.newNonce()).build();
1907 addListener(this.<SnapshotResponse> newMasterCaller()
1908 .action((controller, stub) ->
1909 this.<SnapshotRequest, SnapshotResponse, SnapshotResponse> call(controller, stub,
1910 request, (s, c, req, done) -> s.snapshot(c, req, done), resp -> resp))
1911 .call(), (resp, err) -> {
1912 if (err != null) {
1913 future.completeExceptionally(err);
1914 return;
1916 waitSnapshotFinish(snapshotDesc, future, resp);
1918 return future;
1921 // This is for keeping compatibility with old implementation.
1922 // If there is a procId field in the response, then the snapshot will be operated with a
1923 // SnapshotProcedure, otherwise the snapshot will be coordinated by zk.
1924 private void waitSnapshotFinish(SnapshotDescription snapshot,
1925 CompletableFuture<Void> future, SnapshotResponse resp) {
1926 if (resp.hasProcId()) {
1927 getProcedureResult(resp.getProcId(), future, 0);
1928 addListener(future, new SnapshotProcedureBiConsumer(snapshot.getTableName()));
1929 } else {
1930 long expectedTimeout = resp.getExpectedTimeout();
1931 TimerTask pollingTask = new TimerTask() {
1932 int tries = 0;
1933 long startTime = EnvironmentEdgeManager.currentTime();
1934 long endTime = startTime + expectedTimeout;
1935 long maxPauseTime = expectedTimeout / maxAttempts;
1937 @Override
1938 public void run(Timeout timeout) throws Exception {
1939 if (EnvironmentEdgeManager.currentTime() < endTime) {
1940 addListener(isSnapshotFinished(snapshot), (done, err2) -> {
1941 if (err2 != null) {
1942 future.completeExceptionally(err2);
1943 } else if (done) {
1944 future.complete(null);
1945 } else {
1946 // retry again after pauseTime.
1947 long pauseTime = ConnectionUtils
1948 .getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
1949 pauseTime = Math.min(pauseTime, maxPauseTime);
1950 AsyncConnectionImpl.RETRY_TIMER
1951 .newTimeout(this, pauseTime, TimeUnit.MILLISECONDS);
1954 } else {
1955 future.completeExceptionally(new SnapshotCreationException(
1956 "Snapshot '" + snapshot.getName() + "' wasn't completed in expectedTime:"
1957 + expectedTimeout + " ms", snapshot));
1961 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
1965 @Override
1966 public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
1967 return this
1968 .<Boolean> newMasterCaller()
1969 .action(
1970 (controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> call(
1971 controller,
1972 stub,
1973 IsSnapshotDoneRequest.newBuilder()
1974 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
1975 req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())).call();
1978 @Override
1979 public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
1980 boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
1981 HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
1982 HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
1983 return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
1986 @Override
1987 public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot,
1988 boolean restoreAcl) {
1989 CompletableFuture<Void> future = new CompletableFuture<>();
1990 addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
1991 if (err != null) {
1992 future.completeExceptionally(err);
1993 return;
1995 TableName tableName = null;
1996 if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
1997 for (SnapshotDescription snap : snapshotDescriptions) {
1998 if (snap.getName().equals(snapshotName)) {
1999 tableName = snap.getTableName();
2000 break;
2004 if (tableName == null) {
2005 future.completeExceptionally(new RestoreSnapshotException(
2006 "Unable to find the table name for snapshot=" + snapshotName));
2007 return;
2009 final TableName finalTableName = tableName;
2010 addListener(tableExists(finalTableName), (exists, err2) -> {
2011 if (err2 != null) {
2012 future.completeExceptionally(err2);
2013 } else if (!exists) {
2014 // if table does not exist, then just clone snapshot into new table.
2015 completeConditionalOnFuture(future,
2016 internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl, null));
2017 } else {
2018 addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
2019 if (err4 != null) {
2020 future.completeExceptionally(err4);
2021 } else if (!disabled) {
2022 future.completeExceptionally(new TableNotDisabledException(finalTableName));
2023 } else {
2024 completeConditionalOnFuture(future,
2025 restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl));
2031 return future;
2034 private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
2035 boolean takeFailSafeSnapshot, boolean restoreAcl) {
2036 if (takeFailSafeSnapshot) {
2037 CompletableFuture<Void> future = new CompletableFuture<>();
2038 // Step.1 Take a snapshot of the current state
2039 String failSafeSnapshotSnapshotNameFormat =
2040 this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
2041 HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
2042 final String failSafeSnapshotSnapshotName =
2043 failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
2044 .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
2045 .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
2046 LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2047 addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
2048 if (err != null) {
2049 future.completeExceptionally(err);
2050 } else {
2051 // Step.2 Restore snapshot
2052 addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null),
2053 (void2, err2) -> {
2054 if (err2 != null) {
2055 // Step.3.a Something went wrong during the restore and try to rollback.
2056 addListener(
2057 internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName, restoreAcl,
2058 null),
2059 (void3, err3) -> {
2060 if (err3 != null) {
2061 future.completeExceptionally(err3);
2062 } else {
2063 String msg =
2064 "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" +
2065 failSafeSnapshotSnapshotName + " succeeded.";
2066 future.completeExceptionally(new RestoreSnapshotException(msg, err2));
2069 } else {
2070 // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
2071 LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2072 addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
2073 if (err3 != null) {
2074 LOG.error(
2075 "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
2076 err3);
2078 future.complete(ret3);
2084 return future;
2085 } else {
2086 return internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null);
2090 private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
2091 CompletableFuture<T> parentFuture) {
2092 addListener(parentFuture, (res, err) -> {
2093 if (err != null) {
2094 dependentFuture.completeExceptionally(err);
2095 } else {
2096 dependentFuture.complete(res);
2101 @Override
2102 public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName,
2103 boolean restoreAcl, String customSFT) {
2104 CompletableFuture<Void> future = new CompletableFuture<>();
2105 addListener(tableExists(tableName), (exists, err) -> {
2106 if (err != null) {
2107 future.completeExceptionally(err);
2108 } else if (exists) {
2109 future.completeExceptionally(new TableExistsException(tableName));
2110 } else {
2111 completeConditionalOnFuture(future,
2112 internalRestoreSnapshot(snapshotName, tableName, restoreAcl, customSFT));
2115 return future;
2118 private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName,
2119 boolean restoreAcl, String customSFT) {
2120 SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
2121 .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2122 try {
2123 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2124 } catch (IllegalArgumentException e) {
2125 return failedFuture(e);
2127 RestoreSnapshotRequest.Builder builder =
2128 RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
2129 .setNonce(ng.newNonce()).setRestoreACL(restoreAcl);
2130 if(customSFT != null){
2131 builder.setCustomSFT(customSFT);
2133 return waitProcedureResult(this.<Long> newMasterCaller().action((controller, stub) -> this
2134 .<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> call(controller, stub,
2135 builder.build(),
2136 (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId()))
2137 .call());
2140 @Override
2141 public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
2142 return getCompletedSnapshots(null);
2145 @Override
2146 public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
2147 Preconditions.checkNotNull(pattern,
2148 "pattern is null. If you don't specify a pattern, use listSnapshots() instead");
2149 return getCompletedSnapshots(pattern);
2152 private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) {
2153 return this.<List<SnapshotDescription>> newMasterCaller().action((controller, stub) -> this
2154 .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, List<SnapshotDescription>>
2155 call(controller, stub, GetCompletedSnapshotsRequest.newBuilder().build(),
2156 (s, c, req, done) -> s.getCompletedSnapshots(c, req, done),
2157 resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern)))
2158 .call();
2161 @Override
2162 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) {
2163 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2164 + " If you don't specify a tableNamePattern, use listSnapshots() instead");
2165 return getCompletedSnapshots(tableNamePattern, null);
2168 @Override
2169 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
2170 Pattern snapshotNamePattern) {
2171 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2172 + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead");
2173 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2174 + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead");
2175 return getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2178 private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(
2179 Pattern tableNamePattern, Pattern snapshotNamePattern) {
2180 CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
2181 addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
2182 if (err != null) {
2183 future.completeExceptionally(err);
2184 return;
2186 if (tableNames == null || tableNames.size() <= 0) {
2187 future.complete(Collections.emptyList());
2188 return;
2190 addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
2191 if (err2 != null) {
2192 future.completeExceptionally(err2);
2193 return;
2195 if (snapshotDescList == null || snapshotDescList.isEmpty()) {
2196 future.complete(Collections.emptyList());
2197 return;
2199 future.complete(snapshotDescList.stream()
2200 .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
2201 .collect(Collectors.toList()));
2204 return future;
2207 @Override
2208 public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
2209 return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
2212 @Override
2213 public CompletableFuture<Void> deleteSnapshots() {
2214 return internalDeleteSnapshots(null, null);
2217 @Override
2218 public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
2219 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2220 + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead");
2221 return internalDeleteSnapshots(null, snapshotNamePattern);
2224 @Override
2225 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) {
2226 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2227 + " If you don't specify a tableNamePattern, use deleteSnapshots() instead");
2228 return internalDeleteSnapshots(tableNamePattern, null);
2231 @Override
2232 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
2233 Pattern snapshotNamePattern) {
2234 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2235 + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead");
2236 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2237 + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead");
2238 return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern);
2241 private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern,
2242 Pattern snapshotNamePattern) {
2243 CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture;
2244 if (tableNamePattern == null) {
2245 listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern);
2246 } else {
2247 listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2249 CompletableFuture<Void> future = new CompletableFuture<>();
2250 addListener(listSnapshotsFuture, ((snapshotDescriptions, err) -> {
2251 if (err != null) {
2252 future.completeExceptionally(err);
2253 return;
2255 if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
2256 future.complete(null);
2257 return;
2259 addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
2260 .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
2261 if (e != null) {
2262 future.completeExceptionally(e);
2263 } else {
2264 future.complete(v);
2267 }));
2268 return future;
2271 private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
2272 return this
2273 .<Void> newMasterCaller()
2274 .action(
2275 (controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
2276 controller,
2277 stub,
2278 DeleteSnapshotRequest.newBuilder()
2279 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
2280 req, done) -> s.deleteSnapshot(c, req, done), resp -> null)).call();
2283 @Override
2284 public CompletableFuture<Void> execProcedure(String signature, String instance,
2285 Map<String, String> props) {
2286 CompletableFuture<Void> future = new CompletableFuture<>();
2287 ProcedureDescription procDesc =
2288 ProtobufUtil.buildProcedureDescription(signature, instance, props);
2289 addListener(this.<Long> newMasterCaller()
2290 .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
2291 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
2292 (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
2293 .call(), (expectedTimeout, err) -> {
2294 if (err != null) {
2295 future.completeExceptionally(err);
2296 return;
2298 TimerTask pollingTask = new TimerTask() {
2299 int tries = 0;
2300 long startTime = EnvironmentEdgeManager.currentTime();
2301 long endTime = startTime + expectedTimeout;
2302 long maxPauseTime = expectedTimeout / maxAttempts;
2304 @Override
2305 public void run(Timeout timeout) throws Exception {
2306 if (EnvironmentEdgeManager.currentTime() < endTime) {
2307 addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
2308 if (err2 != null) {
2309 future.completeExceptionally(err2);
2310 return;
2312 if (done) {
2313 future.complete(null);
2314 } else {
2315 // retry again after pauseTime.
2316 long pauseTime =
2317 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2318 pauseTime = Math.min(pauseTime, maxPauseTime);
2319 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
2320 TimeUnit.MICROSECONDS);
2323 } else {
2324 future.completeExceptionally(new IOException("Procedure '" + signature + " : " +
2325 instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
2329 // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
2330 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2332 return future;
2335 @Override
2336 public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance,
2337 Map<String, String> props) {
2338 ProcedureDescription proDesc =
2339 ProtobufUtil.buildProcedureDescription(signature, instance, props);
2340 return this.<byte[]> newMasterCaller()
2341 .action(
2342 (controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
2343 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
2344 (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
2345 resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
2346 .call();
2349 @Override
2350 public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
2351 Map<String, String> props) {
2352 ProcedureDescription proDesc =
2353 ProtobufUtil.buildProcedureDescription(signature, instance, props);
2354 return this.<Boolean> newMasterCaller()
2355 .action((controller, stub) -> this
2356 .<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(controller, stub,
2357 IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
2358 (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
2359 .call();
2362 @Override
2363 public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
2364 return this.<Boolean> newMasterCaller().action(
2365 (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
2366 controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
2367 (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
2368 .call();
2371 @Override
2372 public CompletableFuture<String> getProcedures() {
2373 return this
2374 .<String> newMasterCaller()
2375 .action(
2376 (controller, stub) -> this
2377 .<GetProceduresRequest, GetProceduresResponse, String> call(
2378 controller, stub, GetProceduresRequest.newBuilder().build(),
2379 (s, c, req, done) -> s.getProcedures(c, req, done),
2380 resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList()))).call();
2383 @Override
2384 public CompletableFuture<String> getLocks() {
2385 return this
2386 .<String> newMasterCaller()
2387 .action(
2388 (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(
2389 controller, stub, GetLocksRequest.newBuilder().build(),
2390 (s, c, req, done) -> s.getLocks(c, req, done),
2391 resp -> ProtobufUtil.toLockJson(resp.getLockList()))).call();
2394 @Override
2395 public CompletableFuture<Void> decommissionRegionServers(
2396 List<ServerName> servers, boolean offload) {
2397 return this.<Void> newMasterCaller()
2398 .action((controller, stub) -> this
2399 .<DecommissionRegionServersRequest, DecommissionRegionServersResponse, Void> call(
2400 controller, stub,
2401 RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
2402 (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
2403 .call();
2406 @Override
2407 public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
2408 return this.<List<ServerName>> newMasterCaller()
2409 .action((controller, stub) -> this
2410 .<ListDecommissionedRegionServersRequest, ListDecommissionedRegionServersResponse,
2411 List<ServerName>> call(
2412 controller, stub, ListDecommissionedRegionServersRequest.newBuilder().build(),
2413 (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done),
2414 resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
2415 .collect(Collectors.toList())))
2416 .call();
2419 @Override
2420 public CompletableFuture<Void> recommissionRegionServer(ServerName server,
2421 List<byte[]> encodedRegionNames) {
2422 return this.<Void> newMasterCaller()
2423 .action((controller, stub) ->
2424 this.<RecommissionRegionServerRequest, RecommissionRegionServerResponse, Void> call(
2425 controller, stub, RequestConverter.buildRecommissionRegionServerRequest(
2426 server, encodedRegionNames), (s, c, req, done) -> s.recommissionRegionServer(
2427 c, req, done), resp -> null)).call();
2431 * Get the region location for the passed region name. The region name may be a full region name
2432 * or encoded region name. If the region does not found, then it'll throw an
2433 * UnknownRegionException wrapped by a {@link CompletableFuture}
2434 * @param regionNameOrEncodedRegionName region name or encoded region name
2435 * @return region location, wrapped by a {@link CompletableFuture}
2437 CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
2438 if (regionNameOrEncodedRegionName == null) {
2439 return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2442 CompletableFuture<Optional<HRegionLocation>> future;
2443 if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
2444 String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
2445 if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
2446 // old format encodedName, should be meta region
2447 future = connection.registry.getMetaRegionLocations()
2448 .thenApply(locs -> Stream.of(locs.getRegionLocations())
2449 .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
2450 } else {
2451 future = ClientMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
2452 regionNameOrEncodedRegionName);
2454 } else {
2455 // Not all regionNameOrEncodedRegionName here is going to be a valid region name,
2456 // it needs to throw out IllegalArgumentException in case tableName is passed in.
2457 RegionInfo regionInfo;
2458 try {
2459 regionInfo = CatalogFamilyFormat.parseRegionInfoFromRegionName(
2460 regionNameOrEncodedRegionName);
2461 } catch (IOException ioe) {
2462 return failedFuture(new IllegalArgumentException(ioe.getMessage()));
2465 if (regionInfo.isMetaRegion()) {
2466 future = connection.registry.getMetaRegionLocations()
2467 .thenApply(locs -> Stream.of(locs.getRegionLocations())
2468 .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
2469 .findFirst());
2470 } else {
2471 future =
2472 ClientMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
2476 CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
2477 addListener(future, (location, err) -> {
2478 if (err != null) {
2479 returnedFuture.completeExceptionally(err);
2480 return;
2482 if (!location.isPresent() || location.get().getRegion() == null) {
2483 returnedFuture.completeExceptionally(
2484 new UnknownRegionException("Invalid region name or encoded region name: " +
2485 Bytes.toStringBinary(regionNameOrEncodedRegionName)));
2486 } else {
2487 returnedFuture.complete(location.get());
2490 return returnedFuture;
2494 * Get the region info for the passed region name. The region name may be a full region name or
2495 * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
2496 * wrapped by a {@link CompletableFuture}
2497 * @return region info, wrapped by a {@link CompletableFuture}
2499 private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
2500 if (regionNameOrEncodedRegionName == null) {
2501 return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2504 if (Bytes.equals(regionNameOrEncodedRegionName,
2505 RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) ||
2506 Bytes.equals(regionNameOrEncodedRegionName,
2507 RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
2508 return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
2511 CompletableFuture<RegionInfo> future = new CompletableFuture<>();
2512 addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
2513 if (err != null) {
2514 future.completeExceptionally(err);
2515 } else {
2516 future.complete(location.getRegion());
2519 return future;
2522 private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
2523 if (numRegions < 3) {
2524 throw new IllegalArgumentException("Must create at least three regions");
2525 } else if (Bytes.compareTo(startKey, endKey) >= 0) {
2526 throw new IllegalArgumentException("Start key must be smaller than end key");
2528 if (numRegions == 3) {
2529 return new byte[][] { startKey, endKey };
2531 byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
2532 if (splitKeys == null || splitKeys.length != numRegions - 1) {
2533 throw new IllegalArgumentException("Unable to split key range into enough regions");
2535 return splitKeys;
2538 private void verifySplitKeys(byte[][] splitKeys) {
2539 Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
2540 // Verify there are no duplicate split keys
2541 byte[] lastKey = null;
2542 for (byte[] splitKey : splitKeys) {
2543 if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
2544 throw new IllegalArgumentException("Empty split key must not be passed in the split keys.");
2546 if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
2547 throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: "
2548 + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey));
2550 lastKey = splitKey;
2554 private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
2556 abstract void onFinished();
2558 abstract void onError(Throwable error);
2560 @Override
2561 public void accept(Void v, Throwable error) {
2562 if (error != null) {
2563 onError(error);
2564 return;
2566 onFinished();
2570 private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
2571 protected final TableName tableName;
2573 TableProcedureBiConsumer(TableName tableName) {
2574 this.tableName = tableName;
2577 abstract String getOperationType();
2579 String getDescription() {
2580 return "Operation: " + getOperationType() + ", " + "Table Name: "
2581 + tableName.getNameWithNamespaceInclAsString();
2584 @Override
2585 void onFinished() {
2586 LOG.info(getDescription() + " completed");
2589 @Override
2590 void onError(Throwable error) {
2591 LOG.info(getDescription() + " failed with " + error.getMessage());
2595 private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
2596 protected final String namespaceName;
2598 NamespaceProcedureBiConsumer(String namespaceName) {
2599 this.namespaceName = namespaceName;
2602 abstract String getOperationType();
2604 String getDescription() {
2605 return "Operation: " + getOperationType() + ", Namespace: " + namespaceName;
2608 @Override
2609 void onFinished() {
2610 LOG.info(getDescription() + " completed");
2613 @Override
2614 void onError(Throwable error) {
2615 LOG.info(getDescription() + " failed with " + error.getMessage());
2619 private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2621 CreateTableProcedureBiConsumer(TableName tableName) {
2622 super(tableName);
2625 @Override
2626 String getOperationType() {
2627 return "CREATE";
2631 private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
2633 ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2634 super(tableName);
2637 @Override
2638 String getOperationType() {
2639 return "MODIFY";
2643 private static class ModifyTableStoreFileTrackerProcedureBiConsumer
2644 extends TableProcedureBiConsumer {
2646 ModifyTableStoreFileTrackerProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2647 super(tableName);
2650 @Override
2651 String getOperationType() {
2652 return "MODIFY_TABLE_STORE_FILE_TRACKER";
2656 private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
2658 DeleteTableProcedureBiConsumer(TableName tableName) {
2659 super(tableName);
2662 @Override
2663 String getOperationType() {
2664 return "DELETE";
2667 @Override
2668 void onFinished() {
2669 connection.getLocator().clearCache(this.tableName);
2670 super.onFinished();
2674 private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2676 TruncateTableProcedureBiConsumer(TableName tableName) {
2677 super(tableName);
2680 @Override
2681 String getOperationType() {
2682 return "TRUNCATE";
2686 private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2688 EnableTableProcedureBiConsumer(TableName tableName) {
2689 super(tableName);
2692 @Override
2693 String getOperationType() {
2694 return "ENABLE";
2698 private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2700 DisableTableProcedureBiConsumer(TableName tableName) {
2701 super(tableName);
2704 @Override
2705 String getOperationType() {
2706 return "DISABLE";
2710 private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2712 AddColumnFamilyProcedureBiConsumer(TableName tableName) {
2713 super(tableName);
2716 @Override
2717 String getOperationType() {
2718 return "ADD_COLUMN_FAMILY";
2722 private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2724 DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
2725 super(tableName);
2728 @Override
2729 String getOperationType() {
2730 return "DELETE_COLUMN_FAMILY";
2734 private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2736 ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
2737 super(tableName);
2740 @Override
2741 String getOperationType() {
2742 return "MODIFY_COLUMN_FAMILY";
2746 private static class ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer
2747 extends TableProcedureBiConsumer {
2749 ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(TableName tableName) {
2750 super(tableName);
2753 @Override
2754 String getOperationType() {
2755 return "MODIFY_COLUMN_FAMILY_STORE_FILE_TRACKER";
2759 private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2761 CreateNamespaceProcedureBiConsumer(String namespaceName) {
2762 super(namespaceName);
2765 @Override
2766 String getOperationType() {
2767 return "CREATE_NAMESPACE";
2771 private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2773 DeleteNamespaceProcedureBiConsumer(String namespaceName) {
2774 super(namespaceName);
2777 @Override
2778 String getOperationType() {
2779 return "DELETE_NAMESPACE";
2783 private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2785 ModifyNamespaceProcedureBiConsumer(String namespaceName) {
2786 super(namespaceName);
2789 @Override
2790 String getOperationType() {
2791 return "MODIFY_NAMESPACE";
2795 private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2797 MergeTableRegionProcedureBiConsumer(TableName tableName) {
2798 super(tableName);
2801 @Override
2802 String getOperationType() {
2803 return "MERGE_REGIONS";
2807 private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2809 SplitTableRegionProcedureBiConsumer(TableName tableName) {
2810 super(tableName);
2813 @Override
2814 String getOperationType() {
2815 return "SPLIT_REGION";
2819 private static class SnapshotProcedureBiConsumer extends TableProcedureBiConsumer {
2820 SnapshotProcedureBiConsumer(TableName tableName) {
2821 super(tableName);
2824 @Override
2825 String getOperationType() {
2826 return "SNAPSHOT";
2831 private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
2832 private final String peerId;
2833 private final Supplier<String> getOperation;
2835 ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) {
2836 this.peerId = peerId;
2837 this.getOperation = getOperation;
2840 String getDescription() {
2841 return "Operation: " + getOperation.get() + ", peerId: " + peerId;
2844 @Override
2845 void onFinished() {
2846 LOG.info(getDescription() + " completed");
2849 @Override
2850 void onError(Throwable error) {
2851 LOG.info(getDescription() + " failed with " + error.getMessage());
2855 private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
2856 CompletableFuture<Void> future = new CompletableFuture<>();
2857 addListener(procFuture, (procId, error) -> {
2858 if (error != null) {
2859 future.completeExceptionally(error);
2860 return;
2862 getProcedureResult(procId, future, 0);
2864 return future;
2867 private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) {
2868 addListener(
2869 this.<GetProcedureResultResponse> newMasterCaller()
2870 .action((controller, stub) -> this
2871 .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call(
2872 controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
2873 (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
2874 .call(),
2875 (response, error) -> {
2876 if (error != null) {
2877 LOG.warn("failed to get the procedure result procId={}", procId,
2878 ConnectionUtils.translateException(error));
2879 retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2880 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2881 return;
2883 if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
2884 retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2885 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2886 return;
2888 if (response.hasException()) {
2889 IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
2890 future.completeExceptionally(ioe);
2891 } else {
2892 future.complete(null);
2897 private <T> CompletableFuture<T> failedFuture(Throwable error) {
2898 CompletableFuture<T> future = new CompletableFuture<>();
2899 future.completeExceptionally(error);
2900 return future;
2903 private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) {
2904 if (error != null) {
2905 future.completeExceptionally(error);
2906 return true;
2908 return false;
2911 @Override
2912 public CompletableFuture<ClusterMetrics> getClusterMetrics() {
2913 return getClusterMetrics(EnumSet.allOf(Option.class));
2916 @Override
2917 public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) {
2918 return this
2919 .<ClusterMetrics> newMasterCaller()
2920 .action(
2921 (controller, stub) -> this
2922 .<GetClusterStatusRequest, GetClusterStatusResponse, ClusterMetrics> call(controller,
2923 stub, RequestConverter.buildGetClusterStatusRequest(options),
2924 (s, c, req, done) -> s.getClusterStatus(c, req, done),
2925 resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus()))).call();
2928 @Override
2929 public CompletableFuture<Void> shutdown() {
2930 return this.<Void> newMasterCaller().priority(HIGH_QOS)
2931 .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
2932 stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done),
2933 resp -> null))
2934 .call();
2937 @Override
2938 public CompletableFuture<Void> stopMaster() {
2939 return this.<Void> newMasterCaller().priority(HIGH_QOS)
2940 .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(
2941 controller, stub, StopMasterRequest.newBuilder().build(),
2942 (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null))
2943 .call();
2946 @Override
2947 public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
2948 StopServerRequest request = RequestConverter
2949 .buildStopServerRequest("Called by admin client " + this.connection.toString());
2950 return this.<Void> newAdminCaller().priority(HIGH_QOS)
2951 .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
2952 controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
2953 resp -> null))
2954 .serverName(serverName).call();
2957 @Override
2958 public CompletableFuture<Void> updateConfiguration(ServerName serverName) {
2959 return this
2960 .<Void> newAdminCaller()
2961 .action(
2962 (controller, stub) -> this
2963 .<UpdateConfigurationRequest, UpdateConfigurationResponse, Void> adminCall(
2964 controller, stub, UpdateConfigurationRequest.getDefaultInstance(),
2965 (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null))
2966 .serverName(serverName).call();
2969 @Override
2970 public CompletableFuture<Void> updateConfiguration() {
2971 CompletableFuture<Void> future = new CompletableFuture<Void>();
2972 addListener(
2973 getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)),
2974 (status, err) -> {
2975 if (err != null) {
2976 future.completeExceptionally(err);
2977 } else {
2978 List<CompletableFuture<Void>> futures = new ArrayList<>();
2979 status.getServersName().forEach(server -> futures.add(updateConfiguration(server)));
2980 futures.add(updateConfiguration(status.getMasterName()));
2981 status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
2982 addListener(
2983 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
2984 (result, err2) -> {
2985 if (err2 != null) {
2986 future.completeExceptionally(err2);
2987 } else {
2988 future.complete(result);
2993 return future;
2996 @Override
2997 public CompletableFuture<Void> updateConfiguration(String groupName) {
2998 CompletableFuture<Void> future = new CompletableFuture<Void>();
2999 addListener(
3000 getRSGroup(groupName),
3001 (rsGroupInfo, err) -> {
3002 if (err != null) {
3003 future.completeExceptionally(err);
3004 } else if (rsGroupInfo == null) {
3005 future.completeExceptionally(
3006 new IllegalArgumentException("Group does not exist: " + groupName));
3007 } else {
3008 addListener(getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)), (status, err2) -> {
3009 if (err2 != null) {
3010 future.completeExceptionally(err2);
3011 } else {
3012 List<CompletableFuture<Void>> futures = new ArrayList<>();
3013 List<ServerName> groupServers = status.getServersName().stream().filter(
3014 s -> rsGroupInfo.containsServer(s.getAddress())).collect(Collectors.toList());
3015 groupServers.forEach(server -> futures.add(updateConfiguration(server)));
3016 addListener(
3017 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3018 (result, err3) -> {
3019 if (err3 != null) {
3020 future.completeExceptionally(err3);
3021 } else {
3022 future.complete(result);
3029 return future;
3032 @Override
3033 public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
3034 return this
3035 .<Void> newAdminCaller()
3036 .action(
3037 (controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, Void> adminCall(
3038 controller, stub, RequestConverter.buildRollWALWriterRequest(),
3039 (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null))
3040 .serverName(serverName).call();
3043 @Override
3044 public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
3045 return this
3046 .<Void> newAdminCaller()
3047 .action(
3048 (controller, stub) -> this
3049 .<ClearCompactionQueuesRequest, ClearCompactionQueuesResponse, Void> adminCall(
3050 controller, stub, RequestConverter.buildClearCompactionQueuesRequest(queues), (s,
3051 c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null))
3052 .serverName(serverName).call();
3055 @Override
3056 public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() {
3057 return this
3058 .<List<SecurityCapability>> newMasterCaller()
3059 .action(
3060 (controller, stub) -> this
3061 .<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, List<SecurityCapability>>
3062 call(controller, stub, SecurityCapabilitiesRequest.newBuilder().build(),
3063 (s, c, req, done) -> s.getSecurityCapabilities(c, req, done),
3064 (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList())))
3065 .call();
3068 @Override
3069 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) {
3070 return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName);
3073 @Override
3074 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName,
3075 TableName tableName) {
3076 Preconditions.checkNotNull(tableName,
3077 "tableName is null. If you don't specify a tableName, use getRegionLoads() instead");
3078 return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName);
3081 private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request,
3082 ServerName serverName) {
3083 return this.<List<RegionMetrics>> newAdminCaller()
3084 .action((controller, stub) -> this
3085 .<GetRegionLoadRequest, GetRegionLoadResponse, List<RegionMetrics>>
3086 adminCall(controller, stub, request, (s, c, req, done) ->
3087 s.getRegionLoad(controller, req, done), RegionMetricsBuilder::toRegionMetrics))
3088 .serverName(serverName).call();
3091 @Override
3092 public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
3093 return this
3094 .<Boolean> newMasterCaller()
3095 .action(
3096 (controller, stub) -> this
3097 .<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, Boolean> call(controller,
3098 stub, IsInMaintenanceModeRequest.newBuilder().build(),
3099 (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
3100 resp -> resp.getInMaintenanceMode())).call();
3103 @Override
3104 public CompletableFuture<CompactionState> getCompactionState(TableName tableName,
3105 CompactType compactType) {
3106 CompletableFuture<CompactionState> future = new CompletableFuture<>();
3108 switch (compactType) {
3109 case MOB:
3110 addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
3111 if (err != null) {
3112 future.completeExceptionally(err);
3113 return;
3115 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
3117 addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
3118 .action((controller, stub) -> this
3119 .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
3120 controller, stub,
3121 RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
3122 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3123 .call(), (resp2, err2) -> {
3124 if (err2 != null) {
3125 future.completeExceptionally(err2);
3126 } else {
3127 if (resp2.hasCompactionState()) {
3128 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3129 } else {
3130 future.complete(CompactionState.NONE);
3135 break;
3136 case NORMAL:
3137 addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3138 if (err != null) {
3139 future.completeExceptionally(err);
3140 return;
3142 ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>();
3143 List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
3144 locations.stream().filter(loc -> loc.getServerName() != null)
3145 .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
3146 .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
3147 futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
3148 // If any region compaction state is MAJOR_AND_MINOR
3149 // the table compaction state is MAJOR_AND_MINOR, too.
3150 if (err2 != null) {
3151 future.completeExceptionally(unwrapCompletionException(err2));
3152 } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
3153 future.complete(regionState);
3154 } else {
3155 regionStates.add(regionState);
3157 }));
3159 addListener(
3160 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3161 (ret, err3) -> {
3162 // If future not completed, check all regions's compaction state
3163 if (!future.isCompletedExceptionally() && !future.isDone()) {
3164 CompactionState state = CompactionState.NONE;
3165 for (CompactionState regionState : regionStates) {
3166 switch (regionState) {
3167 case MAJOR:
3168 if (state == CompactionState.MINOR) {
3169 future.complete(CompactionState.MAJOR_AND_MINOR);
3170 } else {
3171 state = CompactionState.MAJOR;
3173 break;
3174 case MINOR:
3175 if (state == CompactionState.MAJOR) {
3176 future.complete(CompactionState.MAJOR_AND_MINOR);
3177 } else {
3178 state = CompactionState.MINOR;
3180 break;
3181 case NONE:
3182 default:
3185 if (!future.isDone()) {
3186 future.complete(state);
3191 break;
3192 default:
3193 throw new IllegalArgumentException("Unknown compactType: " + compactType);
3196 return future;
3199 @Override
3200 public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
3201 CompletableFuture<CompactionState> future = new CompletableFuture<>();
3202 addListener(getRegionLocation(regionName), (location, err) -> {
3203 if (err != null) {
3204 future.completeExceptionally(err);
3205 return;
3207 ServerName serverName = location.getServerName();
3208 if (serverName == null) {
3209 future
3210 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
3211 return;
3213 addListener(
3214 this.<GetRegionInfoResponse> newAdminCaller()
3215 .action((controller, stub) -> this
3216 .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
3217 controller, stub,
3218 RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(),
3219 true),
3220 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3221 .serverName(serverName).call(),
3222 (resp2, err2) -> {
3223 if (err2 != null) {
3224 future.completeExceptionally(err2);
3225 } else {
3226 if (resp2.hasCompactionState()) {
3227 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3228 } else {
3229 future.complete(CompactionState.NONE);
3234 return future;
3237 @Override
3238 public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
3239 MajorCompactionTimestampRequest request =
3240 MajorCompactionTimestampRequest.newBuilder()
3241 .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
3242 return this.<Optional<Long>> newMasterCaller().action((controller, stub) ->
3243 this.<MajorCompactionTimestampRequest, MajorCompactionTimestampResponse, Optional<Long>>
3244 call(controller, stub, request, (s, c, req, done) -> s.getLastMajorCompactionTimestamp(
3245 c, req, done), ProtobufUtil::toOptionalTimestamp)).call();
3248 @Override
3249 public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion(
3250 byte[] regionName) {
3251 CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
3252 // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
3253 addListener(getRegionInfo(regionName), (region, err) -> {
3254 if (err != null) {
3255 future.completeExceptionally(err);
3256 return;
3258 MajorCompactionTimestampForRegionRequest.Builder builder =
3259 MajorCompactionTimestampForRegionRequest.newBuilder();
3260 builder.setRegion(
3261 RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
3262 addListener(this.<Optional<Long>> newMasterCaller().action((controller, stub) -> this
3263 .<MajorCompactionTimestampForRegionRequest,
3264 MajorCompactionTimestampResponse, Optional<Long>> call(
3265 controller, stub, builder.build(),
3266 (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done),
3267 ProtobufUtil::toOptionalTimestamp))
3268 .call(), (timestamp, err2) -> {
3269 if (err2 != null) {
3270 future.completeExceptionally(err2);
3271 } else {
3272 future.complete(timestamp);
3276 return future;
3279 @Override
3280 public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
3281 List<String> serverNamesList) {
3282 CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>();
3283 addListener(getRegionServerList(serverNamesList), (serverNames, err) -> {
3284 if (err != null) {
3285 future.completeExceptionally(err);
3286 return;
3288 // Accessed by multiple threads.
3289 Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size());
3290 List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size());
3291 serverNames.stream().forEach(serverName -> {
3292 futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
3293 if (err2 != null) {
3294 future.completeExceptionally(unwrapCompletionException(err2));
3295 } else {
3296 serverStates.put(serverName, serverState);
3298 }));
3300 addListener(
3301 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3302 (ret, err3) -> {
3303 if (!future.isCompletedExceptionally()) {
3304 if (err3 != null) {
3305 future.completeExceptionally(err3);
3306 } else {
3307 future.complete(serverStates);
3312 return future;
3315 private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) {
3316 CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
3317 if (serverNamesList.isEmpty()) {
3318 CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture =
3319 getClusterMetrics(EnumSet.of(Option.SERVERS_NAME));
3320 addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> {
3321 if (err != null) {
3322 future.completeExceptionally(err);
3323 } else {
3324 future.complete(clusterMetrics.getServersName());
3327 return future;
3328 } else {
3329 List<ServerName> serverList = new ArrayList<>();
3330 for (String regionServerName : serverNamesList) {
3331 ServerName serverName = null;
3332 try {
3333 serverName = ServerName.valueOf(regionServerName);
3334 } catch (Exception e) {
3335 future.completeExceptionally(
3336 new IllegalArgumentException(String.format("ServerName format: %s", regionServerName)));
3338 if (serverName == null) {
3339 future.completeExceptionally(
3340 new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName)));
3341 } else {
3342 serverList.add(serverName);
3345 future.complete(serverList);
3347 return future;
3350 private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) {
3351 return this
3352 .<Boolean>newAdminCaller()
3353 .serverName(serverName)
3354 .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse,
3355 Boolean>adminCall(controller, stub,
3356 CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(), (s, c, req, done) ->
3357 s.compactionSwitch(c, req, done), resp -> resp.getPrevState())).call();
3360 @Override
3361 public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) {
3362 return this.<Boolean> newMasterCaller()
3363 .action((controller, stub) -> this
3364 .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller, stub,
3365 RequestConverter.buildSetBalancerRunningRequest(on, drainRITs),
3366 (s, c, req, done) -> s.setBalancerRunning(c, req, done),
3367 (resp) -> resp.getPrevBalanceValue()))
3368 .call();
3371 @Override
3372 public CompletableFuture<BalanceResponse> balance(BalanceRequest request) {
3373 return this
3374 .<BalanceResponse> newMasterCaller()
3375 .action(
3376 (controller, stub) -> this.<MasterProtos.BalanceRequest, MasterProtos.BalanceResponse, BalanceResponse> call(controller,
3377 stub, ProtobufUtil.toBalanceRequest(request),
3378 (s, c, req, done) -> s.balance(c, req, done), (resp) -> ProtobufUtil.toBalanceResponse(resp))).call();
3382 @Override
3383 public CompletableFuture<Boolean> isBalancerEnabled() {
3384 return this
3385 .<Boolean> newMasterCaller()
3386 .action((controller, stub) ->
3387 this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(controller,
3388 stub, RequestConverter.buildIsBalancerEnabledRequest(), (s, c, req, done)
3389 -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled())).call();
3392 @Override
3393 public CompletableFuture<Boolean> normalizerSwitch(boolean on) {
3394 return this
3395 .<Boolean> newMasterCaller()
3396 .action(
3397 (controller, stub) -> this
3398 .<SetNormalizerRunningRequest, SetNormalizerRunningResponse, Boolean> call(
3399 controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on), (s, c,
3400 req, done) -> s.setNormalizerRunning(c, req, done), (resp) -> resp
3401 .getPrevNormalizerValue())).call();
3404 @Override
3405 public CompletableFuture<Boolean> isNormalizerEnabled() {
3406 return this
3407 .<Boolean> newMasterCaller()
3408 .action(
3409 (controller, stub) -> this
3410 .<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse, Boolean> call(controller,
3411 stub, RequestConverter.buildIsNormalizerEnabledRequest(),
3412 (s, c, req, done) -> s.isNormalizerEnabled(c, req, done),
3413 (resp) -> resp.getEnabled())).call();
3416 @Override
3417 public CompletableFuture<Boolean> normalize(NormalizeTableFilterParams ntfp) {
3418 return normalize(RequestConverter.buildNormalizeRequest(ntfp));
3421 private CompletableFuture<Boolean> normalize(NormalizeRequest request) {
3422 return this
3423 .<Boolean> newMasterCaller()
3424 .action(
3425 (controller, stub) -> this.call(
3426 controller, stub, request, MasterService.Interface::normalize,
3427 NormalizeResponse::getNormalizerRan))
3428 .call();
3431 @Override
3432 public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) {
3433 return this
3434 .<Boolean> newMasterCaller()
3435 .action(
3436 (controller, stub) -> this
3437 .<SetCleanerChoreRunningRequest, SetCleanerChoreRunningResponse, Boolean> call(
3438 controller, stub, RequestConverter.buildSetCleanerChoreRunningRequest(enabled), (s,
3439 c, req, done) -> s.setCleanerChoreRunning(c, req, done), (resp) -> resp
3440 .getPrevValue())).call();
3443 @Override
3444 public CompletableFuture<Boolean> isCleanerChoreEnabled() {
3445 return this
3446 .<Boolean> newMasterCaller()
3447 .action(
3448 (controller, stub) -> this
3449 .<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse, Boolean> call(
3450 controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(), (s, c, req,
3451 done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue()))
3452 .call();
3455 @Override
3456 public CompletableFuture<Boolean> runCleanerChore() {
3457 return this
3458 .<Boolean> newMasterCaller()
3459 .action(
3460 (controller, stub) -> this
3461 .<RunCleanerChoreRequest, RunCleanerChoreResponse, Boolean> call(controller, stub,
3462 RequestConverter.buildRunCleanerChoreRequest(),
3463 (s, c, req, done) -> s.runCleanerChore(c, req, done),
3464 (resp) -> resp.getCleanerChoreRan())).call();
3467 @Override
3468 public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) {
3469 return this
3470 .<Boolean> newMasterCaller()
3471 .action(
3472 (controller, stub) -> this
3473 .<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse, Boolean> call(
3474 controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled), (s,
3475 c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp
3476 .getPrevValue())).call();
3479 @Override
3480 public CompletableFuture<Boolean> isCatalogJanitorEnabled() {
3481 return this
3482 .<Boolean> newMasterCaller()
3483 .action(
3484 (controller, stub) -> this
3485 .<IsCatalogJanitorEnabledRequest, IsCatalogJanitorEnabledResponse, Boolean> call(
3486 controller, stub, RequestConverter.buildIsCatalogJanitorEnabledRequest(), (s, c,
3487 req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp
3488 .getValue())).call();
3491 @Override
3492 public CompletableFuture<Integer> runCatalogJanitor() {
3493 return this
3494 .<Integer> newMasterCaller()
3495 .action(
3496 (controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse, Integer> call(
3497 controller, stub, RequestConverter.buildCatalogScanRequest(),
3498 (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
3499 .call();
3502 @Override
3503 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3504 ServiceCaller<S, R> callable) {
3505 MasterCoprocessorRpcChannelImpl channel =
3506 new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
3507 S stub = stubMaker.apply(channel);
3508 CompletableFuture<R> future = new CompletableFuture<>();
3509 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3510 callable.call(stub, controller, resp -> {
3511 if (controller.failed()) {
3512 future.completeExceptionally(controller.getFailed());
3513 } else {
3514 future.complete(resp);
3517 return future;
3520 @Override
3521 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3522 ServiceCaller<S, R> callable, ServerName serverName) {
3523 RegionServerCoprocessorRpcChannelImpl channel =
3524 new RegionServerCoprocessorRpcChannelImpl(this.<Message> newServerCaller().serverName(
3525 serverName));
3526 S stub = stubMaker.apply(channel);
3527 CompletableFuture<R> future = new CompletableFuture<>();
3528 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3529 callable.call(stub, controller, resp -> {
3530 if (controller.failed()) {
3531 future.completeExceptionally(controller.getFailed());
3532 } else {
3533 future.complete(resp);
3536 return future;
3539 @Override
3540 public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
3541 return this.<List<ServerName>> newMasterCaller()
3542 .action((controller, stub) -> this
3543 .<ClearDeadServersRequest, ClearDeadServersResponse, List<ServerName>> call(
3544 controller, stub, RequestConverter.buildClearDeadServersRequest(servers),
3545 (s, c, req, done) -> s.clearDeadServers(c, req, done),
3546 (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
3547 .call();
3550 <T> ServerRequestCallerBuilder<T> newServerCaller() {
3551 return this.connection.callerFactory.<T> serverRequest()
3552 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
3553 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
3554 .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
3555 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
3558 @Override
3559 public CompletableFuture<Void> enableTableReplication(TableName tableName) {
3560 if (tableName == null) {
3561 return failedFuture(new IllegalArgumentException("Table name is null"));
3563 CompletableFuture<Void> future = new CompletableFuture<>();
3564 addListener(tableExists(tableName), (exist, err) -> {
3565 if (err != null) {
3566 future.completeExceptionally(err);
3567 return;
3569 if (!exist) {
3570 future.completeExceptionally(new TableNotFoundException(
3571 "Table '" + tableName.getNameAsString() + "' does not exists."));
3572 return;
3574 addListener(getTableSplits(tableName), (splits, err1) -> {
3575 if (err1 != null) {
3576 future.completeExceptionally(err1);
3577 } else {
3578 addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> {
3579 if (err2 != null) {
3580 future.completeExceptionally(err2);
3581 } else {
3582 addListener(setTableReplication(tableName, true), (result3, err3) -> {
3583 if (err3 != null) {
3584 future.completeExceptionally(err3);
3585 } else {
3586 future.complete(result3);
3594 return future;
3597 @Override
3598 public CompletableFuture<Void> disableTableReplication(TableName tableName) {
3599 if (tableName == null) {
3600 return failedFuture(new IllegalArgumentException("Table name is null"));
3602 CompletableFuture<Void> future = new CompletableFuture<>();
3603 addListener(tableExists(tableName), (exist, err) -> {
3604 if (err != null) {
3605 future.completeExceptionally(err);
3606 return;
3608 if (!exist) {
3609 future.completeExceptionally(new TableNotFoundException(
3610 "Table '" + tableName.getNameAsString() + "' does not exists."));
3611 return;
3613 addListener(setTableReplication(tableName, false), (result, err2) -> {
3614 if (err2 != null) {
3615 future.completeExceptionally(err2);
3616 } else {
3617 future.complete(result);
3621 return future;
3624 private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
3625 CompletableFuture<byte[][]> future = new CompletableFuture<>();
3626 addListener(
3627 getRegions(tableName).thenApply(regions -> regions.stream()
3628 .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())),
3629 (regions, err2) -> {
3630 if (err2 != null) {
3631 future.completeExceptionally(err2);
3632 return;
3634 if (regions.size() == 1) {
3635 future.complete(null);
3636 } else {
3637 byte[][] splits = new byte[regions.size() - 1][];
3638 for (int i = 1; i < regions.size(); i++) {
3639 splits[i - 1] = regions.get(i).getStartKey();
3641 future.complete(splits);
3644 return future;
3648 * Connect to peer and check the table descriptor on peer:
3649 * <ol>
3650 * <li>Create the same table on peer when not exist.</li>
3651 * <li>Throw an exception if the table already has replication enabled on any of the column
3652 * families.</li>
3653 * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
3654 * </ol>
3655 * @param tableName name of the table to sync to the peer
3656 * @param splits table split keys
3658 private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName,
3659 byte[][] splits) {
3660 CompletableFuture<Void> future = new CompletableFuture<>();
3661 addListener(listReplicationPeers(), (peers, err) -> {
3662 if (err != null) {
3663 future.completeExceptionally(err);
3664 return;
3666 if (peers == null || peers.size() <= 0) {
3667 future.completeExceptionally(
3668 new IllegalArgumentException("Found no peer cluster for replication."));
3669 return;
3671 List<CompletableFuture<Void>> futures = new ArrayList<>();
3672 peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName))
3673 .forEach(peer -> {
3674 futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
3676 addListener(
3677 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3678 (result, err2) -> {
3679 if (err2 != null) {
3680 future.completeExceptionally(err2);
3681 } else {
3682 future.complete(result);
3686 return future;
3689 private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
3690 ReplicationPeerDescription peer) {
3691 Configuration peerConf = null;
3692 try {
3693 peerConf =
3694 ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer);
3695 } catch (IOException e) {
3696 return failedFuture(e);
3698 CompletableFuture<Void> future = new CompletableFuture<>();
3699 addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> {
3700 if (err != null) {
3701 future.completeExceptionally(err);
3702 return;
3704 addListener(getDescriptor(tableName), (tableDesc, err1) -> {
3705 if (err1 != null) {
3706 future.completeExceptionally(err1);
3707 return;
3709 AsyncAdmin peerAdmin = conn.getAdmin();
3710 addListener(peerAdmin.tableExists(tableName), (exist, err2) -> {
3711 if (err2 != null) {
3712 future.completeExceptionally(err2);
3713 return;
3715 if (!exist) {
3716 CompletableFuture<Void> createTableFuture = null;
3717 if (splits == null) {
3718 createTableFuture = peerAdmin.createTable(tableDesc);
3719 } else {
3720 createTableFuture = peerAdmin.createTable(tableDesc, splits);
3722 addListener(createTableFuture, (result, err3) -> {
3723 if (err3 != null) {
3724 future.completeExceptionally(err3);
3725 } else {
3726 future.complete(result);
3729 } else {
3730 addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin),
3731 (result, err4) -> {
3732 if (err4 != null) {
3733 future.completeExceptionally(err4);
3734 } else {
3735 future.complete(result);
3742 return future;
3745 private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName,
3746 TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) {
3747 CompletableFuture<Void> future = new CompletableFuture<>();
3748 addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> {
3749 if (err != null) {
3750 future.completeExceptionally(err);
3751 return;
3753 if (peerTableDesc == null) {
3754 future.completeExceptionally(
3755 new IllegalArgumentException("Failed to get table descriptor for table " +
3756 tableName.getNameAsString() + " from peer cluster " + peer.getPeerId()));
3757 return;
3759 if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) {
3760 future.completeExceptionally(new IllegalArgumentException(
3761 "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() +
3762 ", but the table descriptors are not same when compared with source cluster." +
3763 " Thus can not enable the table's replication switch."));
3764 return;
3766 future.complete(null);
3768 return future;
3772 * Set the table's replication switch if the table's replication switch is already not set.
3773 * @param tableName name of the table
3774 * @param enableRep is replication switch enable or disable
3776 private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) {
3777 CompletableFuture<Void> future = new CompletableFuture<>();
3778 addListener(getDescriptor(tableName), (tableDesc, err) -> {
3779 if (err != null) {
3780 future.completeExceptionally(err);
3781 return;
3783 if (!tableDesc.matchReplicationScope(enableRep)) {
3784 int scope =
3785 enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
3786 TableDescriptor newTableDesc =
3787 TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
3788 addListener(modifyTable(newTableDesc), (result, err2) -> {
3789 if (err2 != null) {
3790 future.completeExceptionally(err2);
3791 } else {
3792 future.complete(result);
3795 } else {
3796 future.complete(null);
3799 return future;
3802 @Override
3803 public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
3804 CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
3805 addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3806 if (err != null) {
3807 future.completeExceptionally(err);
3808 return;
3810 Map<ServerName, List<RegionInfo>> regionInfoByServerName =
3811 locations.stream().filter(l -> l.getRegion() != null)
3812 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
3813 .collect(Collectors.groupingBy(l -> l.getServerName(),
3814 Collectors.mapping(l -> l.getRegion(), Collectors.toList())));
3815 List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>();
3816 CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator();
3817 for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
3818 futures
3819 .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
3820 if (err2 != null) {
3821 future.completeExceptionally(unwrapCompletionException(err2));
3822 } else {
3823 aggregator.append(stats);
3825 }));
3827 addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
3828 (ret, err3) -> {
3829 if (err3 != null) {
3830 future.completeExceptionally(unwrapCompletionException(err3));
3831 } else {
3832 future.complete(aggregator.sum());
3836 return future;
3839 @Override
3840 public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName,
3841 boolean preserveSplits) {
3842 CompletableFuture<Void> future = new CompletableFuture<>();
3843 addListener(tableExists(tableName), (exist, err) -> {
3844 if (err != null) {
3845 future.completeExceptionally(err);
3846 return;
3848 if (!exist) {
3849 future.completeExceptionally(new TableNotFoundException(tableName));
3850 return;
3852 addListener(tableExists(newTableName), (exist1, err1) -> {
3853 if (err1 != null) {
3854 future.completeExceptionally(err1);
3855 return;
3857 if (exist1) {
3858 future.completeExceptionally(new TableExistsException(newTableName));
3859 return;
3861 addListener(getDescriptor(tableName), (tableDesc, err2) -> {
3862 if (err2 != null) {
3863 future.completeExceptionally(err2);
3864 return;
3866 TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc);
3867 if (preserveSplits) {
3868 addListener(getTableSplits(tableName), (splits, err3) -> {
3869 if (err3 != null) {
3870 future.completeExceptionally(err3);
3871 } else {
3872 addListener(
3873 splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc),
3874 (result, err4) -> {
3875 if (err4 != null) {
3876 future.completeExceptionally(err4);
3877 } else {
3878 future.complete(result);
3883 } else {
3884 addListener(createTable(newTableDesc), (result, err5) -> {
3885 if (err5 != null) {
3886 future.completeExceptionally(err5);
3887 } else {
3888 future.complete(result);
3895 return future;
3898 private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName,
3899 List<RegionInfo> hris) {
3900 return this.<CacheEvictionStats> newAdminCaller().action((controller, stub) -> this
3901 .<ClearRegionBlockCacheRequest, ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(
3902 controller, stub, RequestConverter.buildClearRegionBlockCacheRequest(hris),
3903 (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done),
3904 resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
3905 .serverName(serverName).call();
3908 @Override
3909 public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) {
3910 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3911 .action((controller, stub) -> this
3912 .<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse, Boolean> call(controller, stub,
3913 SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(),
3914 (s, c, req, done) -> s.switchRpcThrottle(c, req, done),
3915 resp -> resp.getPreviousRpcThrottleEnabled()))
3916 .call();
3917 return future;
3920 @Override
3921 public CompletableFuture<Boolean> isRpcThrottleEnabled() {
3922 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3923 .action((controller, stub) -> this
3924 .<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse, Boolean> call(controller,
3925 stub, IsRpcThrottleEnabledRequest.newBuilder().build(),
3926 (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done),
3927 resp -> resp.getRpcThrottleEnabled()))
3928 .call();
3929 return future;
3932 @Override
3933 public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) {
3934 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3935 .action((controller, stub) -> this
3936 .<SwitchExceedThrottleQuotaRequest, SwitchExceedThrottleQuotaResponse, Boolean> call(
3937 controller, stub,
3938 SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable)
3939 .build(),
3940 (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done),
3941 resp -> resp.getPreviousExceedThrottleQuotaEnabled()))
3942 .call();
3943 return future;
3946 @Override
3947 public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() {
3948 return this.<Map<TableName, Long>> newMasterCaller().action((controller, stub) -> this
3949 .<GetSpaceQuotaRegionSizesRequest, GetSpaceQuotaRegionSizesResponse,
3950 Map<TableName, Long>> call(controller, stub,
3951 RequestConverter.buildGetSpaceQuotaRegionSizesRequest(),
3952 (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done),
3953 resp -> resp.getSizesList().stream().collect(Collectors
3954 .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize))))
3955 .call();
3958 @Override
3959 public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>> getRegionServerSpaceQuotaSnapshots(
3960 ServerName serverName) {
3961 return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller()
3962 .action((controller, stub) -> this
3963 .<GetSpaceQuotaSnapshotsRequest, GetSpaceQuotaSnapshotsResponse,
3964 Map<TableName, SpaceQuotaSnapshot>> adminCall(controller, stub,
3965 RequestConverter.buildGetSpaceQuotaSnapshotsRequest(),
3966 (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done),
3967 resp -> resp.getSnapshotsList().stream()
3968 .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()),
3969 snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot())))))
3970 .serverName(serverName).call();
3973 private CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(
3974 Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) {
3975 return this.<SpaceQuotaSnapshot> newMasterCaller()
3976 .action((controller, stub) -> this
3977 .<GetQuotaStatesRequest, GetQuotaStatesResponse, SpaceQuotaSnapshot> call(controller, stub,
3978 RequestConverter.buildGetQuotaStatesRequest(),
3979 (s, c, req, done) -> s.getQuotaStates(c, req, done), converter))
3980 .call();
3983 @Override
3984 public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) {
3985 return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream()
3986 .filter(s -> s.getNamespace().equals(namespace)).findFirst()
3987 .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3990 @Override
3991 public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) {
3992 HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName);
3993 return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream()
3994 .filter(s -> s.getTableName().equals(protoTableName)).findFirst()
3995 .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3998 @Override
3999 public CompletableFuture<Void> grant(UserPermission userPermission,
4000 boolean mergeExistingPermissions) {
4001 return this.<Void> newMasterCaller()
4002 .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller,
4003 stub, ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions),
4004 (s, c, req, done) -> s.grant(c, req, done), resp -> null))
4005 .call();
4008 @Override
4009 public CompletableFuture<Void> revoke(UserPermission userPermission) {
4010 return this.<Void> newMasterCaller()
4011 .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller,
4012 stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission),
4013 (s, c, req, done) -> s.revoke(c, req, done), resp -> null))
4014 .call();
4017 @Override
4018 public CompletableFuture<List<UserPermission>>
4019 getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) {
4020 return this.<List<UserPermission>> newMasterCaller().action((controller,
4021 stub) -> this.<AccessControlProtos.GetUserPermissionsRequest, GetUserPermissionsResponse,
4022 List<UserPermission>> call(controller, stub,
4023 ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest),
4024 (s, c, req, done) -> s.getUserPermissions(c, req, done),
4025 resp -> resp.getUserPermissionList().stream()
4026 .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm))
4027 .collect(Collectors.toList())))
4028 .call();
4031 @Override
4032 public CompletableFuture<List<Boolean>> hasUserPermissions(String userName,
4033 List<Permission> permissions) {
4034 return this.<List<Boolean>> newMasterCaller()
4035 .action((controller, stub) -> this
4036 .<HasUserPermissionsRequest, HasUserPermissionsResponse, List<Boolean>> call(controller,
4037 stub, ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions),
4038 (s, c, req, done) -> s.hasUserPermissions(c, req, done),
4039 resp -> resp.getHasUserPermissionList()))
4040 .call();
4043 @Override
4044 public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on,
4045 final boolean sync) {
4046 return this.<Boolean>newMasterCaller().action((controller, stub) -> this
4047 .call(controller, stub, RequestConverter.buildSetSnapshotCleanupRequest(on, sync),
4048 MasterService.Interface::switchSnapshotCleanup,
4049 SetSnapshotCleanupResponse::getPrevSnapshotCleanup)).call();
4052 @Override
4053 public CompletableFuture<Boolean> isSnapshotCleanupEnabled() {
4054 return this.<Boolean>newMasterCaller().action((controller, stub) -> this
4055 .call(controller, stub, RequestConverter.buildIsSnapshotCleanupEnabledRequest(),
4056 MasterService.Interface::isSnapshotCleanupEnabled,
4057 IsSnapshotCleanupEnabledResponse::getEnabled)).call();
4060 @Override
4061 public CompletableFuture<Void> moveServersToRSGroup(Set<Address> servers, String groupName) {
4062 return this.<Void> newMasterCaller()
4063 .action((controller, stub) -> this.
4064 <MoveServersRequest, MoveServersResponse, Void> call(controller, stub,
4065 RequestConverter.buildMoveServersRequest(servers, groupName),
4066 (s, c, req, done) -> s.moveServers(c, req, done), resp -> null))
4067 .call();
4070 @Override
4071 public CompletableFuture<Void> addRSGroup(String groupName) {
4072 return this.<Void> newMasterCaller()
4073 .action(((controller, stub) -> this.
4074 <AddRSGroupRequest, AddRSGroupResponse, Void> call(controller, stub,
4075 AddRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
4076 (s, c, req, done) -> s.addRSGroup(c, req, done), resp -> null)))
4077 .call();
4080 @Override
4081 public CompletableFuture<Void> removeRSGroup(String groupName) {
4082 return this.<Void> newMasterCaller()
4083 .action((controller, stub) -> this.
4084 <RemoveRSGroupRequest, RemoveRSGroupResponse, Void> call(controller, stub,
4085 RemoveRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
4086 (s, c, req, done) -> s.removeRSGroup(c, req, done), resp -> null))
4087 .call();
4090 @Override
4091 public CompletableFuture<BalanceResponse> balanceRSGroup(String groupName,
4092 BalanceRequest request) {
4093 return this.<BalanceResponse>newMasterCaller().action(
4094 (controller, stub) -> this.<BalanceRSGroupRequest, BalanceRSGroupResponse, BalanceResponse>call(
4095 controller, stub, ProtobufUtil.createBalanceRSGroupRequest(groupName, request),
4096 MasterService.Interface::balanceRSGroup, ProtobufUtil::toBalanceResponse))
4097 .call();
4100 @Override
4101 public CompletableFuture<List<RSGroupInfo>> listRSGroups() {
4102 return this.<List<RSGroupInfo>> newMasterCaller()
4103 .action((controller, stub) -> this
4104 .<ListRSGroupInfosRequest, ListRSGroupInfosResponse, List<RSGroupInfo>> call(
4105 controller, stub, ListRSGroupInfosRequest.getDefaultInstance(),
4106 (s, c, req, done) -> s.listRSGroupInfos(c, req, done),
4107 resp -> resp.getRSGroupInfoList().stream()
4108 .map(r -> ProtobufUtil.toGroupInfo(r))
4109 .collect(Collectors.toList())))
4110 .call();
4113 private CompletableFuture<List<LogEntry>> getSlowLogResponses(
4114 final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit,
4115 final String logType) {
4116 if (CollectionUtils.isEmpty(serverNames)) {
4117 return CompletableFuture.completedFuture(Collections.emptyList());
4119 return CompletableFuture.supplyAsync(() -> serverNames.stream()
4120 .map((ServerName serverName) ->
4121 getSlowLogResponseFromServer(serverName, filterParams, limit, logType))
4122 .map(CompletableFuture::join)
4123 .flatMap(List::stream)
4124 .collect(Collectors.toList()));
4127 private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName,
4128 Map<String, Object> filterParams, int limit, String logType) {
4129 return this.<List<LogEntry>>newAdminCaller().action((controller, stub) -> this
4130 .adminCall(controller, stub,
4131 RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType),
4132 AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads))
4133 .serverName(serverName).call();
4136 @Override
4137 public CompletableFuture<List<Boolean>> clearSlowLogResponses(
4138 @Nullable Set<ServerName> serverNames) {
4139 if (CollectionUtils.isEmpty(serverNames)) {
4140 return CompletableFuture.completedFuture(Collections.emptyList());
4142 List<CompletableFuture<Boolean>> clearSlowLogResponseList = serverNames.stream()
4143 .map(this::clearSlowLogsResponses)
4144 .collect(Collectors.toList());
4145 return convertToFutureOfList(clearSlowLogResponseList);
4148 private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) {
4149 return this.<Boolean>newAdminCaller()
4150 .action(((controller, stub) -> this
4151 .adminCall(
4152 controller, stub, RequestConverter.buildClearSlowLogResponseRequest(),
4153 AdminService.Interface::clearSlowLogsResponses,
4154 ProtobufUtil::toClearSlowLogPayload))
4155 ).serverName(serverName).call();
4158 private static <T> CompletableFuture<List<T>> convertToFutureOfList(
4159 List<CompletableFuture<T>> futures) {
4160 CompletableFuture<Void> allDoneFuture =
4161 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
4162 return allDoneFuture.thenApply(v ->
4163 futures.stream()
4164 .map(CompletableFuture::join)
4165 .collect(Collectors.toList())
4169 @Override
4170 public CompletableFuture<List<TableName>> listTablesInRSGroup(String groupName) {
4171 return this.<List<TableName>> newMasterCaller()
4172 .action((controller, stub) -> this
4173 .<ListTablesInRSGroupRequest, ListTablesInRSGroupResponse, List<TableName>> call(controller,
4174 stub, ListTablesInRSGroupRequest.newBuilder().setGroupName(groupName).build(),
4175 (s, c, req, done) -> s.listTablesInRSGroup(c, req, done), resp -> resp.getTableNameList()
4176 .stream().map(ProtobufUtil::toTableName).collect(Collectors.toList())))
4177 .call();
4180 @Override
4181 public CompletableFuture<Pair<List<String>, List<TableName>>>
4182 getConfiguredNamespacesAndTablesInRSGroup(String groupName) {
4183 return this.<Pair<List<String>, List<TableName>>> newMasterCaller()
4184 .action((controller, stub) -> this
4185 .<GetConfiguredNamespacesAndTablesInRSGroupRequest,
4186 GetConfiguredNamespacesAndTablesInRSGroupResponse,
4187 Pair<List<String>, List<TableName>>> call(controller, stub,
4188 GetConfiguredNamespacesAndTablesInRSGroupRequest.newBuilder().setGroupName(groupName)
4189 .build(),
4190 (s, c, req, done) -> s.getConfiguredNamespacesAndTablesInRSGroup(c, req, done),
4191 resp -> Pair.newPair(resp.getNamespaceList(), resp.getTableNameList().stream()
4192 .map(ProtobufUtil::toTableName).collect(Collectors.toList()))))
4193 .call();
4196 @Override
4197 public CompletableFuture<RSGroupInfo> getRSGroup(Address hostPort) {
4198 return this.<RSGroupInfo> newMasterCaller()
4199 .action(((controller, stub) -> this
4200 .<GetRSGroupInfoOfServerRequest, GetRSGroupInfoOfServerResponse, RSGroupInfo> call(
4201 controller, stub,
4202 GetRSGroupInfoOfServerRequest.newBuilder()
4203 .setServer(HBaseProtos.ServerName.newBuilder().setHostName(hostPort.getHostname())
4204 .setPort(hostPort.getPort()).build())
4205 .build(),
4206 (s, c, req, done) -> s.getRSGroupInfoOfServer(c, req, done),
4207 resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)))
4208 .call();
4211 @Override
4212 public CompletableFuture<Void> removeServersFromRSGroup(Set<Address> servers) {
4213 return this.<Void> newMasterCaller()
4214 .action((controller, stub) -> this.
4215 <RemoveServersRequest, RemoveServersResponse, Void> call(controller, stub,
4216 RequestConverter.buildRemoveServersRequest(servers),
4217 (s, c, req, done) -> s.removeServers(c, req, done), resp -> null))
4218 .call();
4221 @Override
4222 public CompletableFuture<Void> setRSGroup(Set<TableName> tables, String groupName) {
4223 CompletableFuture<Void> future = new CompletableFuture<>();
4224 for (TableName tableName : tables) {
4225 addListener(tableExists(tableName), (exist, err) -> {
4226 if (err != null) {
4227 future.completeExceptionally(err);
4228 return;
4230 if (!exist) {
4231 future.completeExceptionally(new TableNotFoundException(tableName));
4232 return;
4236 addListener(listTableDescriptors(new ArrayList<>(tables)), ((tableDescriptions, err) -> {
4237 if (err != null) {
4238 future.completeExceptionally(err);
4239 return;
4241 if (tableDescriptions == null || tableDescriptions.isEmpty()) {
4242 future.complete(null);
4243 return;
4245 List<TableDescriptor> newTableDescriptors = new ArrayList<>();
4246 for (TableDescriptor td : tableDescriptions) {
4247 newTableDescriptors
4248 .add(TableDescriptorBuilder.newBuilder(td).setRegionServerGroup(groupName).build());
4250 addListener(CompletableFuture.allOf(
4251 newTableDescriptors.stream().map(this::modifyTable).toArray(CompletableFuture[]::new)),
4252 (v, e) -> {
4253 if (e != null) {
4254 future.completeExceptionally(e);
4255 } else {
4256 future.complete(v);
4259 }));
4260 return future;
4263 @Override
4264 public CompletableFuture<RSGroupInfo> getRSGroup(TableName table) {
4265 return this.<RSGroupInfo> newMasterCaller().action(((controller, stub) -> this
4266 .<GetRSGroupInfoOfTableRequest, GetRSGroupInfoOfTableResponse, RSGroupInfo> call(controller,
4267 stub,
4268 GetRSGroupInfoOfTableRequest.newBuilder().setTableName(ProtobufUtil.toProtoTableName(table))
4269 .build(),
4270 (s, c, req, done) -> s.getRSGroupInfoOfTable(c, req, done),
4271 resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)))
4272 .call();
4275 @Override
4276 public CompletableFuture<RSGroupInfo> getRSGroup(String groupName) {
4277 return this.<RSGroupInfo> newMasterCaller()
4278 .action(((controller, stub) -> this
4279 .<GetRSGroupInfoRequest, GetRSGroupInfoResponse, RSGroupInfo> call(controller, stub,
4280 GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build(),
4281 (s, c, req, done) -> s.getRSGroupInfo(c, req, done),
4282 resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null)))
4283 .call();
4286 @Override
4287 public CompletableFuture<Void> renameRSGroup(String oldName, String newName) {
4288 return this.<Void> newMasterCaller()
4289 .action(
4291 (controller, stub) -> this.<RenameRSGroupRequest, RenameRSGroupResponse, Void> call(
4292 controller,
4293 stub,
4294 RenameRSGroupRequest.newBuilder().setOldRsgroupName(oldName).setNewRsgroupName(newName)
4295 .build(),
4296 (s, c, req, done) -> s.renameRSGroup(c, req, done),
4297 resp -> null
4300 ).call();
4303 @Override
4304 public CompletableFuture<Void>
4305 updateRSGroupConfig(String groupName, Map<String, String> configuration) {
4306 UpdateRSGroupConfigRequest.Builder request = UpdateRSGroupConfigRequest.newBuilder()
4307 .setGroupName(groupName);
4308 if (configuration != null) {
4309 configuration.entrySet().forEach(e ->
4310 request.addConfiguration(NameStringPair.newBuilder().setName(e.getKey())
4311 .setValue(e.getValue()).build()));
4313 return this.<Void> newMasterCaller()
4314 .action(((controller, stub) ->
4315 this.<UpdateRSGroupConfigRequest, UpdateRSGroupConfigResponse, Void> call(
4316 controller, stub, request.build(),
4317 (s, c, req, done) -> s.updateRSGroupConfig(c, req, done), resp -> null))
4318 ).call();
4321 private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) {
4322 return this.<List<LogEntry>>newMasterCaller()
4323 .action((controller, stub) ->
4324 this.call(controller, stub,
4325 ProtobufUtil.toBalancerDecisionRequest(limit),
4326 MasterService.Interface::getLogEntries, ProtobufUtil::toBalancerDecisionResponse))
4327 .call();
4330 private CompletableFuture<List<LogEntry>> getBalancerRejections(final int limit) {
4331 return this.<List<LogEntry>>newMasterCaller()
4332 .action((controller, stub) ->
4333 this.call(controller, stub,
4334 ProtobufUtil.toBalancerRejectionRequest(limit),
4335 MasterService.Interface::getLogEntries, ProtobufUtil::toBalancerRejectionResponse))
4336 .call();
4339 @Override
4340 public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames,
4341 String logType, ServerType serverType, int limit,
4342 Map<String, Object> filterParams) {
4343 if (logType == null || serverType == null) {
4344 throw new IllegalArgumentException("logType and/or serverType cannot be empty");
4346 switch (logType){
4347 case "SLOW_LOG":
4348 case "LARGE_LOG":
4349 if (ServerType.MASTER.equals(serverType)) {
4350 throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster");
4352 return getSlowLogResponses(filterParams, serverNames, limit, logType);
4353 case "BALANCER_DECISION":
4354 if (ServerType.REGION_SERVER.equals(serverType)) {
4355 throw new IllegalArgumentException(
4356 "Balancer Decision logs are not maintained by HRegionServer");
4358 return getBalancerDecisions(limit);
4359 case "BALANCER_REJECTION":
4360 if (ServerType.REGION_SERVER.equals(serverType)) {
4361 throw new IllegalArgumentException(
4362 "Balancer Rejection logs are not maintained by HRegionServer");
4364 return getBalancerRejections(limit);
4365 default:
4366 return CompletableFuture.completedFuture(Collections.emptyList());