HBASE-22157 Include the cause when constructing RestoreSnapshotException in restoreSn...
[hbase.git] / hbase-client / src / main / java / org / apache / hadoop / hbase / client / RawAsyncHBaseAdmin.java
blob6c89fc88b0b147f7261cac2b57f9fbd1917bbbd0
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
21 import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
22 import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
23 import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
25 import com.google.protobuf.Message;
26 import com.google.protobuf.RpcChannel;
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collections;
31 import java.util.EnumSet;
32 import java.util.HashMap;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Optional;
36 import java.util.Set;
37 import java.util.concurrent.CompletableFuture;
38 import java.util.concurrent.ConcurrentHashMap;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicReference;
41 import java.util.function.BiConsumer;
42 import java.util.function.Consumer;
43 import java.util.function.Function;
44 import java.util.function.Supplier;
45 import java.util.regex.Pattern;
46 import java.util.stream.Collectors;
47 import java.util.stream.Stream;
48 import org.apache.commons.io.IOUtils;
49 import org.apache.hadoop.conf.Configuration;
50 import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
51 import org.apache.hadoop.hbase.CacheEvictionStats;
52 import org.apache.hadoop.hbase.CacheEvictionStatsAggregator;
53 import org.apache.hadoop.hbase.ClusterMetrics;
54 import org.apache.hadoop.hbase.ClusterMetrics.Option;
55 import org.apache.hadoop.hbase.ClusterMetricsBuilder;
56 import org.apache.hadoop.hbase.HConstants;
57 import org.apache.hadoop.hbase.HRegionLocation;
58 import org.apache.hadoop.hbase.MetaTableAccessor;
59 import org.apache.hadoop.hbase.MetaTableAccessor.QueryType;
60 import org.apache.hadoop.hbase.NamespaceDescriptor;
61 import org.apache.hadoop.hbase.RegionLocations;
62 import org.apache.hadoop.hbase.RegionMetrics;
63 import org.apache.hadoop.hbase.RegionMetricsBuilder;
64 import org.apache.hadoop.hbase.ServerName;
65 import org.apache.hadoop.hbase.TableExistsException;
66 import org.apache.hadoop.hbase.TableName;
67 import org.apache.hadoop.hbase.TableNotDisabledException;
68 import org.apache.hadoop.hbase.TableNotEnabledException;
69 import org.apache.hadoop.hbase.TableNotFoundException;
70 import org.apache.hadoop.hbase.UnknownRegionException;
71 import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
72 import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
73 import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
74 import org.apache.hadoop.hbase.client.Scan.ReadType;
75 import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
76 import org.apache.hadoop.hbase.client.replication.TableCFs;
77 import org.apache.hadoop.hbase.client.security.SecurityCapability;
78 import org.apache.hadoop.hbase.exceptions.DeserializationException;
79 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
80 import org.apache.hadoop.hbase.quotas.QuotaFilter;
81 import org.apache.hadoop.hbase.quotas.QuotaSettings;
82 import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
83 import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
84 import org.apache.hadoop.hbase.replication.ReplicationException;
85 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
86 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
87 import org.apache.hadoop.hbase.replication.SyncReplicationState;
88 import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
89 import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
90 import org.apache.hadoop.hbase.security.access.UserPermission;
91 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
92 import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
93 import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
94 import org.apache.hadoop.hbase.util.Bytes;
95 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
96 import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
97 import org.apache.yetus.audience.InterfaceAudience;
98 import org.slf4j.Logger;
99 import org.slf4j.LoggerFactory;
101 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
102 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
103 import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
104 import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
105 import org.apache.hbase.thirdparty.io.netty.util.Timeout;
106 import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
108 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
109 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
110 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
111 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
112 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
113 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse;
114 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
115 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
116 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
117 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
118 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
119 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
120 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
121 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
122 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
123 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
124 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
125 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
126 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
127 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
128 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
129 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
130 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
131 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
132 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
133 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
134 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
135 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
136 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
137 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
138 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
139 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
140 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
141 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
142 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
143 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
144 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
145 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
146 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
147 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
148 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
149 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest;
150 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
151 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
152 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
153 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
154 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
155 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
156 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
157 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
158 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
159 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
160 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
161 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
162 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
163 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
164 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
165 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
166 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
167 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
168 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
169 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
170 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
171 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
172 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
173 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
174 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
175 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
176 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
177 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
178 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
179 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
180 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
181 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
182 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
183 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
184 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
185 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
186 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
187 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
188 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
189 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
190 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
191 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
192 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
193 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
194 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
195 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
196 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse;
197 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
198 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
199 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
200 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
201 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
202 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
203 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
204 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
205 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
206 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
207 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
208 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
209 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
210 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
211 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
212 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
213 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
214 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
215 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
216 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
217 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
218 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
219 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
220 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
221 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
222 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
223 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
224 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
225 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
226 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
227 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
228 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
229 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
230 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
231 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
232 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
233 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
234 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
235 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
236 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
237 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
238 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
239 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
240 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
241 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
242 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse;
243 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
244 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
245 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
246 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
247 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
248 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse;
249 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
250 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
251 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
252 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
253 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
254 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
255 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
256 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
257 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
258 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
259 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
260 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
261 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
262 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
263 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
264 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
265 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
266 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
267 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
268 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
269 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
270 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
271 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
272 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
273 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
274 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
275 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
276 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
277 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
278 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
279 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
280 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
281 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
282 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
283 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
284 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
285 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
286 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
287 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
288 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
289 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
290 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
291 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
292 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
293 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
294 import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
297 * The implementation of AsyncAdmin.
298 * <p>
299 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
300 * be finished inside the rpc framework thread, which means that the callbacks registered to the
301 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
302 * this class should not try to do time consuming tasks in the callbacks.
303 * @since 2.0.0
304 * @see AsyncHBaseAdmin
305 * @see AsyncConnection#getAdmin()
306 * @see AsyncConnection#getAdminBuilder()
308 @InterfaceAudience.Private
309 class RawAsyncHBaseAdmin implements AsyncAdmin {
310 public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
312 private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class);
314 private final AsyncConnectionImpl connection;
316 private final HashedWheelTimer retryTimer;
318 private final AsyncTable<AdvancedScanResultConsumer> metaTable;
320 private final long rpcTimeoutNs;
322 private final long operationTimeoutNs;
324 private final long pauseNs;
326 private final int maxAttempts;
328 private final int startLogErrorsCnt;
330 private final NonceGenerator ng;
332 RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer,
333 AsyncAdminBuilderBase builder) {
334 this.connection = connection;
335 this.retryTimer = retryTimer;
336 this.metaTable = connection.getTable(META_TABLE_NAME);
337 this.rpcTimeoutNs = builder.rpcTimeoutNs;
338 this.operationTimeoutNs = builder.operationTimeoutNs;
339 this.pauseNs = builder.pauseNs;
340 this.maxAttempts = builder.maxAttempts;
341 this.startLogErrorsCnt = builder.startLogErrorsCnt;
342 this.ng = connection.getNonceGenerator();
345 private <T> MasterRequestCallerBuilder<T> newMasterCaller() {
346 return this.connection.callerFactory.<T> masterRequest()
347 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
348 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
349 .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
350 .startLogErrorsCnt(startLogErrorsCnt);
353 private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
354 return this.connection.callerFactory.<T> adminRequest()
355 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
356 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
357 .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
358 .startLogErrorsCnt(startLogErrorsCnt);
361 @FunctionalInterface
362 private interface MasterRpcCall<RESP, REQ> {
363 void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
364 RpcCallback<RESP> done);
367 @FunctionalInterface
368 private interface AdminRpcCall<RESP, REQ> {
369 void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
370 RpcCallback<RESP> done);
373 @FunctionalInterface
374 private interface Converter<D, S> {
375 D convert(S src) throws IOException;
378 private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
379 MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
380 Converter<RESP, PRESP> respConverter) {
381 CompletableFuture<RESP> future = new CompletableFuture<>();
382 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
384 @Override
385 public void run(PRESP resp) {
386 if (controller.failed()) {
387 future.completeExceptionally(controller.getFailed());
388 } else {
389 try {
390 future.complete(respConverter.convert(resp));
391 } catch (IOException e) {
392 future.completeExceptionally(e);
397 return future;
400 private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
401 AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
402 Converter<RESP, PRESP> respConverter) {
403 CompletableFuture<RESP> future = new CompletableFuture<>();
404 rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
406 @Override
407 public void run(PRESP resp) {
408 if (controller.failed()) {
409 future.completeExceptionally(new IOException(controller.errorText()));
410 } else {
411 try {
412 future.complete(respConverter.convert(resp));
413 } catch (IOException e) {
414 future.completeExceptionally(e);
419 return future;
422 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
423 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
424 ProcedureBiConsumer consumer) {
425 return procedureCall(b -> {
426 }, preq, rpcCall, respConverter, consumer);
429 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq,
430 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
431 ProcedureBiConsumer consumer) {
432 return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer);
435 private <PREQ, PRESP> CompletableFuture<Void> procedureCall(
436 Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq,
437 MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
438 ProcedureBiConsumer consumer) {
439 MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller,
440 stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter));
441 prioritySetter.accept(builder);
442 CompletableFuture<Long> procFuture = builder.call();
443 CompletableFuture<Void> future = waitProcedureResult(procFuture);
444 addListener(future, consumer);
445 return future;
448 @FunctionalInterface
449 private interface TableOperator {
450 CompletableFuture<Void> operate(TableName table);
453 @Override
454 public CompletableFuture<Boolean> tableExists(TableName tableName) {
455 if (TableName.isMetaTableName(tableName)) {
456 return CompletableFuture.completedFuture(true);
458 return AsyncMetaTableAccessor.tableExists(metaTable, tableName);
461 @Override
462 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) {
463 return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(null,
464 includeSysTables));
468 * {@link #listTableDescriptors(boolean)}
470 @Override
471 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern,
472 boolean includeSysTables) {
473 Preconditions.checkNotNull(pattern,
474 "pattern is null. If you don't specify a pattern, use listTables(boolean) instead");
475 return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(pattern,
476 includeSysTables));
479 @Override
480 public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) {
481 Preconditions.checkNotNull(tableNames,
482 "tableNames is null. If you don't specify tableNames, " + "use listTables(boolean) instead");
483 if (tableNames.isEmpty()) {
484 return CompletableFuture.completedFuture(Collections.emptyList());
486 return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames));
489 private CompletableFuture<List<TableDescriptor>>
490 getTableDescriptors(GetTableDescriptorsRequest request) {
491 return this.<List<TableDescriptor>> newMasterCaller()
492 .action((controller, stub) -> this
493 .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableDescriptor>> call(
494 controller, stub, request, (s, c, req, done) -> s.getTableDescriptors(c, req, done),
495 (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
496 .call();
499 @Override
500 public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) {
501 return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables));
504 @Override
505 public CompletableFuture<List<TableName>>
506 listTableNames(Pattern pattern, boolean includeSysTables) {
507 Preconditions.checkNotNull(pattern,
508 "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead");
509 return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables));
512 private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) {
513 return this
514 .<List<TableName>> newMasterCaller()
515 .action(
516 (controller, stub) -> this
517 .<GetTableNamesRequest, GetTableNamesResponse, List<TableName>> call(controller,
518 stub, request, (s, c, req, done) -> s.getTableNames(c, req, done),
519 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))).call();
522 @Override
523 public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) {
524 return this.<List<TableDescriptor>> newMasterCaller().action((controller, stub) -> this
525 .<ListTableDescriptorsByNamespaceRequest, ListTableDescriptorsByNamespaceResponse,
526 List<TableDescriptor>> call(
527 controller, stub,
528 ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
529 (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done),
530 (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
531 .call();
534 @Override
535 public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) {
536 return this.<List<TableName>> newMasterCaller().action((controller, stub) -> this
537 .<ListTableNamesByNamespaceRequest, ListTableNamesByNamespaceResponse,
538 List<TableName>> call(
539 controller, stub,
540 ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
541 (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done),
542 (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList())))
543 .call();
546 @Override
547 public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
548 CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
549 addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName)
550 .action((controller, stub) -> this
551 .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
552 controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName),
553 (s, c, req, done) -> s.getTableDescriptors(c, req, done),
554 (resp) -> resp.getTableSchemaList()))
555 .call(), (tableSchemas, error) -> {
556 if (error != null) {
557 future.completeExceptionally(error);
558 return;
560 if (!tableSchemas.isEmpty()) {
561 future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
562 } else {
563 future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
566 return future;
569 @Override
570 public CompletableFuture<Void> createTable(TableDescriptor desc) {
571 return createTable(desc.getTableName(),
572 RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce()));
575 @Override
576 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
577 int numRegions) {
578 try {
579 return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
580 } catch (IllegalArgumentException e) {
581 return failedFuture(e);
585 @Override
586 public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
587 Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
588 + " use createTable(TableDescriptor) instead");
589 try {
590 verifySplitKeys(splitKeys);
591 return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc,
592 splitKeys, ng.getNonceGroup(), ng.newNonce()));
593 } catch (IllegalArgumentException e) {
594 return failedFuture(e);
598 private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
599 Preconditions.checkNotNull(tableName, "table name is null");
600 return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request,
601 (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
602 new CreateTableProcedureBiConsumer(tableName));
605 @Override
606 public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
607 return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(),
608 RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
609 ng.newNonce()), (s, c, req, done) -> s.modifyTable(c, req, done),
610 (resp) -> resp.getProcId(), new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
613 @Override
614 public CompletableFuture<Void> deleteTable(TableName tableName) {
615 return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName,
616 RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
617 (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
618 new DeleteTableProcedureBiConsumer(tableName));
621 @Override
622 public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
623 return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName,
624 RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
625 ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done),
626 (resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(tableName));
629 @Override
630 public CompletableFuture<Void> enableTable(TableName tableName) {
631 return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName,
632 RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
633 (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
634 new EnableTableProcedureBiConsumer(tableName));
637 @Override
638 public CompletableFuture<Void> disableTable(TableName tableName) {
639 return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName,
640 RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
641 (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
642 new DisableTableProcedureBiConsumer(tableName));
645 @Override
646 public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
647 if (TableName.isMetaTableName(tableName)) {
648 return CompletableFuture.completedFuture(true);
650 CompletableFuture<Boolean> future = new CompletableFuture<>();
651 addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (state, error) -> {
652 if (error != null) {
653 future.completeExceptionally(error);
654 return;
656 if (state.isPresent()) {
657 future.complete(state.get().inStates(TableState.State.ENABLED));
658 } else {
659 future.completeExceptionally(new TableNotFoundException(tableName));
662 return future;
665 @Override
666 public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
667 if (TableName.isMetaTableName(tableName)) {
668 return CompletableFuture.completedFuture(false);
670 CompletableFuture<Boolean> future = new CompletableFuture<>();
671 addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (state, error) -> {
672 if (error != null) {
673 future.completeExceptionally(error);
674 return;
676 if (state.isPresent()) {
677 future.complete(state.get().inStates(TableState.State.DISABLED));
678 } else {
679 future.completeExceptionally(new TableNotFoundException(tableName));
682 return future;
685 @Override
686 public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
687 return isTableAvailable(tableName, Optional.empty());
690 private CompletableFuture<Boolean> isTableAvailable(TableName tableName,
691 Optional<byte[][]> splitKeys) {
692 if (TableName.isMetaTableName(tableName)) {
693 return connection.registry.getMetaRegionLocation().thenApply(locs -> Stream
694 .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
696 CompletableFuture<Boolean> future = new CompletableFuture<>();
697 addListener(isTableEnabled(tableName), (enabled, error) -> {
698 if (error != null) {
699 if (error instanceof TableNotFoundException) {
700 future.complete(false);
701 } else {
702 future.completeExceptionally(error);
704 return;
706 if (!enabled) {
707 future.complete(false);
708 } else {
709 addListener(
710 AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)),
711 (locations, error1) -> {
712 if (error1 != null) {
713 future.completeExceptionally(error1);
714 return;
716 List<HRegionLocation> notDeployedRegions = locations.stream()
717 .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
718 if (notDeployedRegions.size() > 0) {
719 if (LOG.isDebugEnabled()) {
720 LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
722 future.complete(false);
723 return;
726 Optional<Boolean> available =
727 splitKeys.map(keys -> compareRegionsWithSplitKeys(locations, keys));
728 future.complete(available.orElse(true));
732 return future;
735 private boolean compareRegionsWithSplitKeys(List<HRegionLocation> locations, byte[][] splitKeys) {
736 int regionCount = 0;
737 for (HRegionLocation location : locations) {
738 RegionInfo info = location.getRegion();
739 if (Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
740 regionCount++;
741 continue;
743 for (byte[] splitKey : splitKeys) {
744 // Just check if the splitkey is available
745 if (Bytes.equals(info.getStartKey(), splitKey)) {
746 regionCount++;
747 break;
751 return regionCount == splitKeys.length + 1;
754 @Override
755 public CompletableFuture<Void> addColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamily) {
756 return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
757 RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
758 ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
759 new AddColumnFamilyProcedureBiConsumer(tableName));
762 @Override
763 public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
764 return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName,
765 RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
766 ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done),
767 (resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(tableName));
770 @Override
771 public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
772 ColumnFamilyDescriptor columnFamily) {
773 return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName,
774 RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
775 ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done),
776 (resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(tableName));
779 @Override
780 public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
781 return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
782 RequestConverter.buildCreateNamespaceRequest(descriptor),
783 (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
784 new CreateNamespaceProcedureBiConsumer(descriptor.getName()));
787 @Override
788 public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
789 return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
790 RequestConverter.buildModifyNamespaceRequest(descriptor),
791 (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
792 new ModifyNamespaceProcedureBiConsumer(descriptor.getName()));
795 @Override
796 public CompletableFuture<Void> deleteNamespace(String name) {
797 return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
798 RequestConverter.buildDeleteNamespaceRequest(name),
799 (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
800 new DeleteNamespaceProcedureBiConsumer(name));
803 @Override
804 public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
805 return this
806 .<NamespaceDescriptor> newMasterCaller()
807 .action(
808 (controller, stub) -> this
809 .<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor> call(
810 controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name), (s, c,
811 req, done) -> s.getNamespaceDescriptor(c, req, done), (resp) -> ProtobufUtil
812 .toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call();
815 @Override
816 public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
817 return this
818 .<List<NamespaceDescriptor>> newMasterCaller()
819 .action(
820 (controller, stub) -> this
821 .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(
822 controller, stub, ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req,
823 done) -> s.listNamespaceDescriptors(c, req, done), (resp) -> ProtobufUtil
824 .toNamespaceDescriptorList(resp))).call();
827 @Override
828 public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) {
829 return this.<List<RegionInfo>> newAdminCaller()
830 .action((controller, stub) -> this
831 .<GetOnlineRegionRequest, GetOnlineRegionResponse, List<RegionInfo>> adminCall(
832 controller, stub, RequestConverter.buildGetOnlineRegionRequest(),
833 (s, c, req, done) -> s.getOnlineRegion(c, req, done),
834 resp -> ProtobufUtil.getRegionInfos(resp)))
835 .serverName(serverName).call();
838 @Override
839 public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
840 if (tableName.equals(META_TABLE_NAME)) {
841 return connection.getLocator().getRegionLocation(tableName, null, null, operationTimeoutNs)
842 .thenApply(loc -> Collections.singletonList(loc.getRegion()));
843 } else {
844 return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName))
845 .thenApply(
846 locs -> locs.stream().map(loc -> loc.getRegion()).collect(Collectors.toList()));
850 @Override
851 public CompletableFuture<Void> flush(TableName tableName) {
852 CompletableFuture<Void> future = new CompletableFuture<>();
853 addListener(tableExists(tableName), (exists, err) -> {
854 if (err != null) {
855 future.completeExceptionally(err);
856 } else if (!exists) {
857 future.completeExceptionally(new TableNotFoundException(tableName));
858 } else {
859 addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
860 if (err2 != null) {
861 future.completeExceptionally(err2);
862 } else if (!tableEnabled) {
863 future.completeExceptionally(new TableNotEnabledException(tableName));
864 } else {
865 addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
866 new HashMap<>()), (ret, err3) -> {
867 if (err3 != null) {
868 future.completeExceptionally(err3);
869 } else {
870 future.complete(ret);
877 return future;
880 @Override
881 public CompletableFuture<Void> flushRegion(byte[] regionName) {
882 CompletableFuture<Void> future = new CompletableFuture<>();
883 addListener(getRegionLocation(regionName), (location, err) -> {
884 if (err != null) {
885 future.completeExceptionally(err);
886 return;
888 ServerName serverName = location.getServerName();
889 if (serverName == null) {
890 future
891 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
892 return;
894 addListener(flush(serverName, location.getRegion()), (ret, err2) -> {
895 if (err2 != null) {
896 future.completeExceptionally(err2);
897 } else {
898 future.complete(ret);
902 return future;
905 private CompletableFuture<Void> flush(final ServerName serverName, final RegionInfo regionInfo) {
906 return this.<Void> newAdminCaller()
907 .serverName(serverName)
908 .action(
909 (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, Void> adminCall(
910 controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo
911 .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done),
912 resp -> null))
913 .call();
916 @Override
917 public CompletableFuture<Void> flushRegionServer(ServerName sn) {
918 CompletableFuture<Void> future = new CompletableFuture<>();
919 addListener(getRegions(sn), (hRegionInfos, err) -> {
920 if (err != null) {
921 future.completeExceptionally(err);
922 return;
924 List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
925 if (hRegionInfos != null) {
926 hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region)));
928 addListener(CompletableFuture.allOf(
929 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
930 if (err2 != null) {
931 future.completeExceptionally(err2);
932 } else {
933 future.complete(ret);
937 return future;
940 @Override
941 public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) {
942 return compact(tableName, null, false, compactType);
945 @Override
946 public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
947 CompactType compactType) {
948 Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
949 + "If you don't specify a columnFamily, use compact(TableName) instead");
950 return compact(tableName, columnFamily, false, compactType);
953 @Override
954 public CompletableFuture<Void> compactRegion(byte[] regionName) {
955 return compactRegion(regionName, null, false);
958 @Override
959 public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) {
960 Preconditions.checkNotNull(columnFamily, "columnFamily is null."
961 + " If you don't specify a columnFamily, use compactRegion(regionName) instead");
962 return compactRegion(regionName, columnFamily, false);
965 @Override
966 public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
967 return compact(tableName, null, true, compactType);
970 @Override
971 public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
972 CompactType compactType) {
973 Preconditions.checkNotNull(columnFamily, "columnFamily is null."
974 + "If you don't specify a columnFamily, use compact(TableName) instead");
975 return compact(tableName, columnFamily, true, compactType);
978 @Override
979 public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
980 return compactRegion(regionName, null, true);
983 @Override
984 public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) {
985 Preconditions.checkNotNull(columnFamily, "columnFamily is null."
986 + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead");
987 return compactRegion(regionName, columnFamily, true);
990 @Override
991 public CompletableFuture<Void> compactRegionServer(ServerName sn) {
992 return compactRegionServer(sn, false);
995 @Override
996 public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
997 return compactRegionServer(sn, true);
1000 private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
1001 CompletableFuture<Void> future = new CompletableFuture<>();
1002 addListener(getRegions(sn), (hRegionInfos, err) -> {
1003 if (err != null) {
1004 future.completeExceptionally(err);
1005 return;
1007 List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1008 if (hRegionInfos != null) {
1009 hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
1011 addListener(CompletableFuture.allOf(
1012 compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1013 if (err2 != null) {
1014 future.completeExceptionally(err2);
1015 } else {
1016 future.complete(ret);
1020 return future;
1023 private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
1024 boolean major) {
1025 CompletableFuture<Void> future = new CompletableFuture<>();
1026 addListener(getRegionLocation(regionName), (location, err) -> {
1027 if (err != null) {
1028 future.completeExceptionally(err);
1029 return;
1031 ServerName serverName = location.getServerName();
1032 if (serverName == null) {
1033 future
1034 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1035 return;
1037 addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
1038 (ret, err2) -> {
1039 if (err2 != null) {
1040 future.completeExceptionally(err2);
1041 } else {
1042 future.complete(ret);
1046 return future;
1050 * List all region locations for the specific table.
1052 private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
1053 if (TableName.META_TABLE_NAME.equals(tableName)) {
1054 CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
1055 // For meta table, we use zk to fetch all locations.
1056 AsyncRegistry registry = AsyncRegistryFactory.getRegistry(connection.getConfiguration());
1057 addListener(registry.getMetaRegionLocation(), (metaRegions, err) -> {
1058 if (err != null) {
1059 future.completeExceptionally(err);
1060 } else if (metaRegions == null || metaRegions.isEmpty() ||
1061 metaRegions.getDefaultRegionLocation() == null) {
1062 future.completeExceptionally(new IOException("meta region does not found"));
1063 } else {
1064 future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
1066 // close the registry.
1067 IOUtils.closeQuietly(registry);
1069 return future;
1070 } else {
1071 // For non-meta table, we fetch all locations by scanning hbase:meta table
1072 return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName));
1077 * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
1079 private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
1080 CompactType compactType) {
1081 CompletableFuture<Void> future = new CompletableFuture<>();
1083 switch (compactType) {
1084 case MOB:
1085 addListener(connection.registry.getMasterAddress(), (serverName, err) -> {
1086 if (err != null) {
1087 future.completeExceptionally(err);
1088 return;
1090 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
1091 addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
1092 if (err2 != null) {
1093 future.completeExceptionally(err2);
1094 } else {
1095 future.complete(ret);
1099 break;
1100 case NORMAL:
1101 addListener(getTableHRegionLocations(tableName), (locations, err) -> {
1102 if (err != null) {
1103 future.completeExceptionally(err);
1104 return;
1106 if (locations == null || locations.isEmpty()) {
1107 future.completeExceptionally(new TableNotFoundException(tableName));
1109 CompletableFuture<?>[] compactFutures =
1110 locations.stream().filter(l -> l.getRegion() != null)
1111 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
1112 .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
1113 .toArray(CompletableFuture<?>[]::new);
1114 // future complete unless all of the compact futures are completed.
1115 addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
1116 if (err2 != null) {
1117 future.completeExceptionally(err2);
1118 } else {
1119 future.complete(ret);
1123 break;
1124 default:
1125 throw new IllegalArgumentException("Unknown compactType: " + compactType);
1127 return future;
1131 * Compact the region at specific region server.
1133 private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri,
1134 final boolean major, byte[] columnFamily) {
1135 return this
1136 .<Void> newAdminCaller()
1137 .serverName(sn)
1138 .action(
1139 (controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, Void> adminCall(
1140 controller, stub, RequestConverter.buildCompactRegionRequest(hri.getRegionName(),
1141 major, columnFamily), (s, c, req, done) -> s.compactRegion(c, req, done),
1142 resp -> null)).call();
1145 private byte[] toEncodeRegionName(byte[] regionName) {
1146 try {
1147 return RegionInfo.isEncodedRegionName(regionName) ? regionName
1148 : Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
1149 } catch (IOException e) {
1150 return regionName;
1154 private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
1155 CompletableFuture<TableName> result) {
1156 addListener(getRegionLocation(encodeRegionName), (location, err) -> {
1157 if (err != null) {
1158 result.completeExceptionally(err);
1159 return;
1161 RegionInfo regionInfo = location.getRegion();
1162 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1163 result.completeExceptionally(
1164 new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
1165 return;
1167 if (!tableName.compareAndSet(null, regionInfo.getTable())) {
1168 if (!tableName.get().equals(regionInfo.getTable())) {
1169 // tables of this two region should be same.
1170 result.completeExceptionally(
1171 new IllegalArgumentException("Cannot merge regions from two different tables " +
1172 tableName.get() + " and " + regionInfo.getTable()));
1173 } else {
1174 result.complete(tableName.get());
1180 private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) {
1181 AtomicReference<TableName> tableNameRef = new AtomicReference<>();
1182 CompletableFuture<TableName> future = new CompletableFuture<>();
1183 for (byte[] encodedRegionName : encodedRegionNames) {
1184 checkAndGetTableName(encodedRegionName, tableNameRef, future);
1186 return future;
1189 @Override
1190 public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) {
1191 return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE);
1194 @Override
1195 public CompletableFuture<Boolean> isMergeEnabled() {
1196 return isSplitOrMergeOn(MasterSwitchType.MERGE);
1199 @Override
1200 public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) {
1201 return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT);
1204 @Override
1205 public CompletableFuture<Boolean> isSplitEnabled() {
1206 return isSplitOrMergeOn(MasterSwitchType.SPLIT);
1209 private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous,
1210 MasterSwitchType switchType) {
1211 SetSplitOrMergeEnabledRequest request =
1212 RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType);
1213 return this.<Boolean> newMasterCaller()
1214 .action((controller, stub) -> this
1215 .<SetSplitOrMergeEnabledRequest, SetSplitOrMergeEnabledResponse, Boolean> call(controller,
1216 stub, request, (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done),
1217 (resp) -> resp.getPrevValueList().get(0)))
1218 .call();
1221 private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) {
1222 IsSplitOrMergeEnabledRequest request =
1223 RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType);
1224 return this
1225 .<Boolean> newMasterCaller()
1226 .action(
1227 (controller, stub) -> this
1228 .<IsSplitOrMergeEnabledRequest, IsSplitOrMergeEnabledResponse, Boolean> call(
1229 controller, stub, request,
1230 (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done),
1231 (resp) -> resp.getEnabled())).call();
1234 @Override
1235 public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) {
1236 if (nameOfRegionsToMerge.size() < 2) {
1237 return failedFuture(new IllegalArgumentException(
1238 "Can not merge only " + nameOfRegionsToMerge.size() + " region"));
1240 CompletableFuture<Void> future = new CompletableFuture<>();
1241 byte[][] encodedNameOfRegionsToMerge =
1242 nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new);
1244 addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> {
1245 if (err != null) {
1246 future.completeExceptionally(err);
1247 return;
1250 MergeTableRegionsRequest request = null;
1251 try {
1252 request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge,
1253 forcible, ng.getNonceGroup(), ng.newNonce());
1254 } catch (DeserializationException e) {
1255 future.completeExceptionally(e);
1256 return;
1259 addListener(
1260 this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(tableName, request,
1261 (s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(),
1262 new MergeTableRegionProcedureBiConsumer(tableName)),
1263 (ret, err2) -> {
1264 if (err2 != null) {
1265 future.completeExceptionally(err2);
1266 } else {
1267 future.complete(ret);
1271 return future;
1274 @Override
1275 public CompletableFuture<Void> split(TableName tableName) {
1276 CompletableFuture<Void> future = new CompletableFuture<>();
1277 addListener(tableExists(tableName), (exist, error) -> {
1278 if (error != null) {
1279 future.completeExceptionally(error);
1280 return;
1282 if (!exist) {
1283 future.completeExceptionally(new TableNotFoundException(tableName));
1284 return;
1286 addListener(
1287 metaTable
1288 .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
1289 .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION))
1290 .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION))),
1291 (results, err2) -> {
1292 if (err2 != null) {
1293 future.completeExceptionally(err2);
1294 return;
1296 if (results != null && !results.isEmpty()) {
1297 List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
1298 for (Result r : results) {
1299 if (r.isEmpty() || MetaTableAccessor.getRegionInfo(r) == null) {
1300 continue;
1302 RegionLocations rl = MetaTableAccessor.getRegionLocations(r);
1303 if (rl != null) {
1304 for (HRegionLocation h : rl.getRegionLocations()) {
1305 if (h != null && h.getServerName() != null) {
1306 RegionInfo hri = h.getRegion();
1307 if (hri == null || hri.isSplitParent() ||
1308 hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1309 continue;
1311 splitFutures.add(split(hri, null));
1316 addListener(
1317 CompletableFuture
1318 .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
1319 (ret, exception) -> {
1320 if (exception != null) {
1321 future.completeExceptionally(exception);
1322 return;
1324 future.complete(ret);
1326 } else {
1327 future.complete(null);
1331 return future;
1334 @Override
1335 public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
1336 CompletableFuture<Void> result = new CompletableFuture<>();
1337 if (splitPoint == null) {
1338 return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
1340 addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint),
1341 (loc, err) -> {
1342 if (err != null) {
1343 result.completeExceptionally(err);
1344 } else if (loc == null || loc.getRegion() == null) {
1345 result.completeExceptionally(new IllegalArgumentException(
1346 "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
1347 } else {
1348 addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
1349 if (err2 != null) {
1350 result.completeExceptionally(err2);
1351 } else {
1352 result.complete(ret);
1358 return result;
1361 @Override
1362 public CompletableFuture<Void> splitRegion(byte[] regionName) {
1363 CompletableFuture<Void> future = new CompletableFuture<>();
1364 addListener(getRegionLocation(regionName), (location, err) -> {
1365 RegionInfo regionInfo = location.getRegion();
1366 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1367 future
1368 .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
1369 "Replicas are auto-split when their primary is split."));
1370 return;
1372 ServerName serverName = location.getServerName();
1373 if (serverName == null) {
1374 future
1375 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1376 return;
1378 addListener(split(regionInfo, null), (ret, err2) -> {
1379 if (err2 != null) {
1380 future.completeExceptionally(err2);
1381 } else {
1382 future.complete(ret);
1386 return future;
1389 @Override
1390 public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) {
1391 Preconditions.checkNotNull(splitPoint,
1392 "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
1393 CompletableFuture<Void> future = new CompletableFuture<>();
1394 addListener(getRegionLocation(regionName), (location, err) -> {
1395 RegionInfo regionInfo = location.getRegion();
1396 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1397 future
1398 .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
1399 "Replicas are auto-split when their primary is split."));
1400 return;
1402 ServerName serverName = location.getServerName();
1403 if (serverName == null) {
1404 future
1405 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1406 return;
1408 if (regionInfo.getStartKey() != null &&
1409 Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0) {
1410 future.completeExceptionally(
1411 new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
1412 return;
1414 addListener(split(regionInfo, splitPoint), (ret, err2) -> {
1415 if (err2 != null) {
1416 future.completeExceptionally(err2);
1417 } else {
1418 future.complete(ret);
1422 return future;
1425 private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
1426 CompletableFuture<Void> future = new CompletableFuture<>();
1427 TableName tableName = hri.getTable();
1428 SplitTableRegionRequest request = null;
1429 try {
1430 request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
1431 ng.newNonce());
1432 } catch (DeserializationException e) {
1433 future.completeExceptionally(e);
1434 return future;
1437 addListener(
1438 this.<SplitTableRegionRequest, SplitTableRegionResponse> procedureCall(tableName,
1439 request, (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(),
1440 new SplitTableRegionProcedureBiConsumer(tableName)),
1441 (ret, err2) -> {
1442 if (err2 != null) {
1443 future.completeExceptionally(err2);
1444 } else {
1445 future.complete(ret);
1448 return future;
1451 @Override
1452 public CompletableFuture<Void> assign(byte[] regionName) {
1453 CompletableFuture<Void> future = new CompletableFuture<>();
1454 addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1455 if (err != null) {
1456 future.completeExceptionally(err);
1457 return;
1459 addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable())
1460 .action(((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
1461 controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
1462 (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)))
1463 .call(), (ret, err2) -> {
1464 if (err2 != null) {
1465 future.completeExceptionally(err2);
1466 } else {
1467 future.complete(ret);
1471 return future;
1474 @Override
1475 public CompletableFuture<Void> unassign(byte[] regionName, boolean forcible) {
1476 CompletableFuture<Void> future = new CompletableFuture<>();
1477 addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1478 if (err != null) {
1479 future.completeExceptionally(err);
1480 return;
1482 addListener(
1483 this.<Void> newMasterCaller().priority(regionInfo.getTable())
1484 .action(((controller, stub) -> this
1485 .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
1486 RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible),
1487 (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)))
1488 .call(),
1489 (ret, err2) -> {
1490 if (err2 != null) {
1491 future.completeExceptionally(err2);
1492 } else {
1493 future.complete(ret);
1497 return future;
1500 @Override
1501 public CompletableFuture<Void> offline(byte[] regionName) {
1502 CompletableFuture<Void> future = new CompletableFuture<>();
1503 addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1504 if (err != null) {
1505 future.completeExceptionally(err);
1506 return;
1508 addListener(
1509 this.<Void> newMasterCaller().priority(regionInfo.getTable())
1510 .action(((controller, stub) -> this
1511 .<OfflineRegionRequest, OfflineRegionResponse, Void> call(controller, stub,
1512 RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
1513 (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)))
1514 .call(),
1515 (ret, err2) -> {
1516 if (err2 != null) {
1517 future.completeExceptionally(err2);
1518 } else {
1519 future.complete(ret);
1523 return future;
1526 @Override
1527 public CompletableFuture<Void> move(byte[] regionName) {
1528 CompletableFuture<Void> future = new CompletableFuture<>();
1529 addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1530 if (err != null) {
1531 future.completeExceptionally(err);
1532 return;
1534 addListener(
1535 moveRegion(regionInfo,
1536 RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
1537 (ret, err2) -> {
1538 if (err2 != null) {
1539 future.completeExceptionally(err2);
1540 } else {
1541 future.complete(ret);
1545 return future;
1548 @Override
1549 public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) {
1550 Preconditions.checkNotNull(destServerName,
1551 "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
1552 CompletableFuture<Void> future = new CompletableFuture<>();
1553 addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1554 if (err != null) {
1555 future.completeExceptionally(err);
1556 return;
1558 addListener(
1559 moveRegion(regionInfo, RequestConverter
1560 .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
1561 (ret, err2) -> {
1562 if (err2 != null) {
1563 future.completeExceptionally(err2);
1564 } else {
1565 future.complete(ret);
1569 return future;
1572 private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) {
1573 return this.<Void> newMasterCaller().priority(regionInfo.getTable())
1574 .action(
1575 (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
1576 stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null))
1577 .call();
1580 @Override
1581 public CompletableFuture<Void> setQuota(QuotaSettings quota) {
1582 return this
1583 .<Void> newMasterCaller()
1584 .action(
1585 (controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
1586 stub, QuotaSettings.buildSetQuotaRequestProto(quota),
1587 (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)).call();
1590 @Override
1591 public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
1592 CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
1593 Scan scan = QuotaTableUtil.makeScan(filter);
1594 this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build()
1595 .scan(scan, new AdvancedScanResultConsumer() {
1596 List<QuotaSettings> settings = new ArrayList<>();
1598 @Override
1599 public void onNext(Result[] results, ScanController controller) {
1600 for (Result result : results) {
1601 try {
1602 QuotaTableUtil.parseResultToCollection(result, settings);
1603 } catch (IOException e) {
1604 controller.terminate();
1605 future.completeExceptionally(e);
1610 @Override
1611 public void onError(Throwable error) {
1612 future.completeExceptionally(error);
1615 @Override
1616 public void onComplete() {
1617 future.complete(settings);
1620 return future;
1623 @Override
1624 public CompletableFuture<Void> addReplicationPeer(String peerId,
1625 ReplicationPeerConfig peerConfig, boolean enabled) {
1626 return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
1627 RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
1628 (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1629 new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
1632 @Override
1633 public CompletableFuture<Void> removeReplicationPeer(String peerId) {
1634 return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall(
1635 RequestConverter.buildRemoveReplicationPeerRequest(peerId),
1636 (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1637 new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER"));
1640 @Override
1641 public CompletableFuture<Void> enableReplicationPeer(String peerId) {
1642 return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall(
1643 RequestConverter.buildEnableReplicationPeerRequest(peerId),
1644 (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1645 new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER"));
1648 @Override
1649 public CompletableFuture<Void> disableReplicationPeer(String peerId) {
1650 return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall(
1651 RequestConverter.buildDisableReplicationPeerRequest(peerId),
1652 (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1653 new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER"));
1656 @Override
1657 public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
1658 return this.<ReplicationPeerConfig> newMasterCaller().action((controller, stub) -> this
1659 .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(
1660 controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId),
1661 (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
1662 (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig())))
1663 .call();
1666 @Override
1667 public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
1668 ReplicationPeerConfig peerConfig) {
1669 return this
1670 .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse> procedureCall(
1671 RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
1672 (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
1673 (resp) -> resp.getProcId(),
1674 new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
1677 @Override
1678 public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId,
1679 SyncReplicationState clusterState) {
1680 return this
1681 .<TransitReplicationPeerSyncReplicationStateRequest, TransitReplicationPeerSyncReplicationStateResponse> procedureCall(
1682 RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId,
1683 clusterState),
1684 (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done),
1685 (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId,
1686 () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE"));
1689 @Override
1690 public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
1691 Map<TableName, List<String>> tableCfs) {
1692 if (tableCfs == null) {
1693 return failedFuture(new ReplicationException("tableCfs is null"));
1696 CompletableFuture<Void> future = new CompletableFuture<Void>();
1697 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1698 if (!completeExceptionally(future, error)) {
1699 ReplicationPeerConfig newPeerConfig =
1700 ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
1701 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1702 if (!completeExceptionally(future, error)) {
1703 future.complete(result);
1708 return future;
1711 @Override
1712 public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
1713 Map<TableName, List<String>> tableCfs) {
1714 if (tableCfs == null) {
1715 return failedFuture(new ReplicationException("tableCfs is null"));
1718 CompletableFuture<Void> future = new CompletableFuture<Void>();
1719 addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1720 if (!completeExceptionally(future, error)) {
1721 ReplicationPeerConfig newPeerConfig = null;
1722 try {
1723 newPeerConfig = ReplicationPeerConfigUtil
1724 .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
1725 } catch (ReplicationException e) {
1726 future.completeExceptionally(e);
1727 return;
1729 addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1730 if (!completeExceptionally(future, error)) {
1731 future.complete(result);
1736 return future;
1739 @Override
1740 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() {
1741 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null));
1744 @Override
1745 public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) {
1746 Preconditions.checkNotNull(pattern,
1747 "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead");
1748 return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern));
1751 private CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(
1752 ListReplicationPeersRequest request) {
1753 return this
1754 .<List<ReplicationPeerDescription>> newMasterCaller()
1755 .action(
1756 (controller, stub) -> this
1757 .<ListReplicationPeersRequest, ListReplicationPeersResponse, List<ReplicationPeerDescription>> call(
1758 controller,
1759 stub,
1760 request,
1761 (s, c, req, done) -> s.listReplicationPeers(c, req, done),
1762 (resp) -> resp.getPeerDescList().stream()
1763 .map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
1764 .collect(Collectors.toList()))).call();
1767 @Override
1768 public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
1769 CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
1770 addListener(listTableDescriptors(), (tables, error) -> {
1771 if (!completeExceptionally(future, error)) {
1772 List<TableCFs> replicatedTableCFs = new ArrayList<>();
1773 tables.forEach(table -> {
1774 Map<String, Integer> cfs = new HashMap<>();
1775 Stream.of(table.getColumnFamilies())
1776 .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
1777 .forEach(column -> {
1778 cfs.put(column.getNameAsString(), column.getScope());
1780 if (!cfs.isEmpty()) {
1781 replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
1784 future.complete(replicatedTableCFs);
1787 return future;
1790 @Override
1791 public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
1792 SnapshotProtos.SnapshotDescription snapshot =
1793 ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
1794 try {
1795 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
1796 } catch (IllegalArgumentException e) {
1797 return failedFuture(e);
1799 CompletableFuture<Void> future = new CompletableFuture<>();
1800 final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
1801 addListener(this.<Long> newMasterCaller()
1802 .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
1803 stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
1804 resp -> resp.getExpectedTimeout()))
1805 .call(), (expectedTimeout, err) -> {
1806 if (err != null) {
1807 future.completeExceptionally(err);
1808 return;
1810 TimerTask pollingTask = new TimerTask() {
1811 int tries = 0;
1812 long startTime = EnvironmentEdgeManager.currentTime();
1813 long endTime = startTime + expectedTimeout;
1814 long maxPauseTime = expectedTimeout / maxAttempts;
1816 @Override
1817 public void run(Timeout timeout) throws Exception {
1818 if (EnvironmentEdgeManager.currentTime() < endTime) {
1819 addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> {
1820 if (err2 != null) {
1821 future.completeExceptionally(err2);
1822 } else if (done) {
1823 future.complete(null);
1824 } else {
1825 // retry again after pauseTime.
1826 long pauseTime =
1827 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
1828 pauseTime = Math.min(pauseTime, maxPauseTime);
1829 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
1830 TimeUnit.MILLISECONDS);
1833 } else {
1834 future.completeExceptionally(
1835 new SnapshotCreationException("Snapshot '" + snapshot.getName() +
1836 "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshotDesc));
1840 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
1842 return future;
1845 @Override
1846 public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
1847 return this
1848 .<Boolean> newMasterCaller()
1849 .action(
1850 (controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> call(
1851 controller,
1852 stub,
1853 IsSnapshotDoneRequest.newBuilder()
1854 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
1855 req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())).call();
1858 @Override
1859 public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
1860 boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
1861 HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
1862 HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
1863 return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
1866 @Override
1867 public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot,
1868 boolean restoreAcl) {
1869 CompletableFuture<Void> future = new CompletableFuture<>();
1870 addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
1871 if (err != null) {
1872 future.completeExceptionally(err);
1873 return;
1875 TableName tableName = null;
1876 if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
1877 for (SnapshotDescription snap : snapshotDescriptions) {
1878 if (snap.getName().equals(snapshotName)) {
1879 tableName = snap.getTableName();
1880 break;
1884 if (tableName == null) {
1885 future.completeExceptionally(new RestoreSnapshotException(
1886 "Unable to find the table name for snapshot=" + snapshotName));
1887 return;
1889 final TableName finalTableName = tableName;
1890 addListener(tableExists(finalTableName), (exists, err2) -> {
1891 if (err2 != null) {
1892 future.completeExceptionally(err2);
1893 } else if (!exists) {
1894 // if table does not exist, then just clone snapshot into new table.
1895 completeConditionalOnFuture(future,
1896 internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl));
1897 } else {
1898 addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
1899 if (err4 != null) {
1900 future.completeExceptionally(err4);
1901 } else if (!disabled) {
1902 future.completeExceptionally(new TableNotDisabledException(finalTableName));
1903 } else {
1904 completeConditionalOnFuture(future,
1905 restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl));
1911 return future;
1914 private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
1915 boolean takeFailSafeSnapshot, boolean restoreAcl) {
1916 if (takeFailSafeSnapshot) {
1917 CompletableFuture<Void> future = new CompletableFuture<>();
1918 // Step.1 Take a snapshot of the current state
1919 String failSafeSnapshotSnapshotNameFormat =
1920 this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
1921 HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
1922 final String failSafeSnapshotSnapshotName =
1923 failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
1924 .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
1925 .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
1926 LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
1927 addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
1928 if (err != null) {
1929 future.completeExceptionally(err);
1930 } else {
1931 // Step.2 Restore snapshot
1932 addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl),
1933 (void2, err2) -> {
1934 if (err2 != null) {
1935 // Step.3.a Something went wrong during the restore and try to rollback.
1936 addListener(
1937 internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName, restoreAcl),
1938 (void3, err3) -> {
1939 if (err3 != null) {
1940 future.completeExceptionally(err3);
1941 } else {
1942 String msg =
1943 "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" +
1944 failSafeSnapshotSnapshotName + " succeeded.";
1945 future.completeExceptionally(new RestoreSnapshotException(msg, err2));
1948 } else {
1949 // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
1950 LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
1951 addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
1952 if (err3 != null) {
1953 LOG.error(
1954 "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
1955 err3);
1956 future.completeExceptionally(err3);
1957 } else {
1958 future.complete(ret3);
1965 return future;
1966 } else {
1967 return internalRestoreSnapshot(snapshotName, tableName, restoreAcl);
1971 private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
1972 CompletableFuture<T> parentFuture) {
1973 addListener(parentFuture, (res, err) -> {
1974 if (err != null) {
1975 dependentFuture.completeExceptionally(err);
1976 } else {
1977 dependentFuture.complete(res);
1982 @Override
1983 public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName,
1984 boolean restoreAcl) {
1985 CompletableFuture<Void> future = new CompletableFuture<>();
1986 addListener(tableExists(tableName), (exists, err) -> {
1987 if (err != null) {
1988 future.completeExceptionally(err);
1989 } else if (exists) {
1990 future.completeExceptionally(new TableExistsException(tableName));
1991 } else {
1992 completeConditionalOnFuture(future,
1993 internalRestoreSnapshot(snapshotName, tableName, restoreAcl));
1996 return future;
1999 private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName,
2000 boolean restoreAcl) {
2001 SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
2002 .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2003 try {
2004 ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2005 } catch (IllegalArgumentException e) {
2006 return failedFuture(e);
2008 return waitProcedureResult(this.<Long> newMasterCaller().action((controller, stub) -> this
2009 .<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> call(controller, stub,
2010 RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
2011 .setNonce(ng.newNonce()).setRestoreACL(restoreAcl).build(),
2012 (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId()))
2013 .call());
2016 @Override
2017 public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
2018 return getCompletedSnapshots(null);
2021 @Override
2022 public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
2023 Preconditions.checkNotNull(pattern,
2024 "pattern is null. If you don't specify a pattern, use listSnapshots() instead");
2025 return getCompletedSnapshots(pattern);
2028 private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) {
2029 return this.<List<SnapshotDescription>> newMasterCaller().action((controller, stub) -> this
2030 .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, List<SnapshotDescription>>
2031 call(controller, stub, GetCompletedSnapshotsRequest.newBuilder().build(),
2032 (s, c, req, done) -> s.getCompletedSnapshots(c, req, done),
2033 resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern)))
2034 .call();
2037 @Override
2038 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) {
2039 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2040 + " If you don't specify a tableNamePattern, use listSnapshots() instead");
2041 return getCompletedSnapshots(tableNamePattern, null);
2044 @Override
2045 public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
2046 Pattern snapshotNamePattern) {
2047 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2048 + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead");
2049 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2050 + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead");
2051 return getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2054 private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(
2055 Pattern tableNamePattern, Pattern snapshotNamePattern) {
2056 CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
2057 addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
2058 if (err != null) {
2059 future.completeExceptionally(err);
2060 return;
2062 if (tableNames == null || tableNames.size() <= 0) {
2063 future.complete(Collections.emptyList());
2064 return;
2066 addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
2067 if (err2 != null) {
2068 future.completeExceptionally(err2);
2069 return;
2071 if (snapshotDescList == null || snapshotDescList.isEmpty()) {
2072 future.complete(Collections.emptyList());
2073 return;
2075 future.complete(snapshotDescList.stream()
2076 .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
2077 .collect(Collectors.toList()));
2080 return future;
2083 @Override
2084 public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
2085 return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
2088 @Override
2089 public CompletableFuture<Void> deleteSnapshots() {
2090 return internalDeleteSnapshots(null, null);
2093 @Override
2094 public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
2095 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2096 + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead");
2097 return internalDeleteSnapshots(null, snapshotNamePattern);
2100 @Override
2101 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) {
2102 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2103 + " If you don't specify a tableNamePattern, use deleteSnapshots() instead");
2104 return internalDeleteSnapshots(tableNamePattern, null);
2107 @Override
2108 public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
2109 Pattern snapshotNamePattern) {
2110 Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2111 + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead");
2112 Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2113 + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead");
2114 return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern);
2117 private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern,
2118 Pattern snapshotNamePattern) {
2119 CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture;
2120 if (tableNamePattern == null) {
2121 listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern);
2122 } else {
2123 listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2125 CompletableFuture<Void> future = new CompletableFuture<>();
2126 addListener(listSnapshotsFuture, ((snapshotDescriptions, err) -> {
2127 if (err != null) {
2128 future.completeExceptionally(err);
2129 return;
2131 if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
2132 future.complete(null);
2133 return;
2135 addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
2136 .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
2137 if (e != null) {
2138 future.completeExceptionally(e);
2139 } else {
2140 future.complete(v);
2143 }));
2144 return future;
2147 private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
2148 return this
2149 .<Void> newMasterCaller()
2150 .action(
2151 (controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
2152 controller,
2153 stub,
2154 DeleteSnapshotRequest.newBuilder()
2155 .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
2156 req, done) -> s.deleteSnapshot(c, req, done), resp -> null)).call();
2159 @Override
2160 public CompletableFuture<Void> execProcedure(String signature, String instance,
2161 Map<String, String> props) {
2162 CompletableFuture<Void> future = new CompletableFuture<>();
2163 ProcedureDescription procDesc =
2164 ProtobufUtil.buildProcedureDescription(signature, instance, props);
2165 addListener(this.<Long> newMasterCaller()
2166 .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
2167 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
2168 (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
2169 .call(), (expectedTimeout, err) -> {
2170 if (err != null) {
2171 future.completeExceptionally(err);
2172 return;
2174 TimerTask pollingTask = new TimerTask() {
2175 int tries = 0;
2176 long startTime = EnvironmentEdgeManager.currentTime();
2177 long endTime = startTime + expectedTimeout;
2178 long maxPauseTime = expectedTimeout / maxAttempts;
2180 @Override
2181 public void run(Timeout timeout) throws Exception {
2182 if (EnvironmentEdgeManager.currentTime() < endTime) {
2183 addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
2184 if (err2 != null) {
2185 future.completeExceptionally(err2);
2186 return;
2188 if (done) {
2189 future.complete(null);
2190 } else {
2191 // retry again after pauseTime.
2192 long pauseTime =
2193 ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2194 pauseTime = Math.min(pauseTime, maxPauseTime);
2195 AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
2196 TimeUnit.MICROSECONDS);
2199 } else {
2200 future.completeExceptionally(new IOException("Procedure '" + signature + " : " +
2201 instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
2205 // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
2206 AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2208 return future;
2211 @Override
2212 public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance,
2213 Map<String, String> props) {
2214 ProcedureDescription proDesc =
2215 ProtobufUtil.buildProcedureDescription(signature, instance, props);
2216 return this.<byte[]> newMasterCaller()
2217 .action(
2218 (controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
2219 controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
2220 (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
2221 resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
2222 .call();
2225 @Override
2226 public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
2227 Map<String, String> props) {
2228 ProcedureDescription proDesc =
2229 ProtobufUtil.buildProcedureDescription(signature, instance, props);
2230 return this.<Boolean> newMasterCaller()
2231 .action((controller, stub) -> this
2232 .<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(controller, stub,
2233 IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
2234 (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
2235 .call();
2238 @Override
2239 public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
2240 return this.<Boolean> newMasterCaller().action(
2241 (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
2242 controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
2243 (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
2244 .call();
2247 @Override
2248 public CompletableFuture<String> getProcedures() {
2249 return this
2250 .<String> newMasterCaller()
2251 .action(
2252 (controller, stub) -> this
2253 .<GetProceduresRequest, GetProceduresResponse, String> call(
2254 controller, stub, GetProceduresRequest.newBuilder().build(),
2255 (s, c, req, done) -> s.getProcedures(c, req, done),
2256 resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList()))).call();
2259 @Override
2260 public CompletableFuture<String> getLocks() {
2261 return this
2262 .<String> newMasterCaller()
2263 .action(
2264 (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(
2265 controller, stub, GetLocksRequest.newBuilder().build(),
2266 (s, c, req, done) -> s.getLocks(c, req, done),
2267 resp -> ProtobufUtil.toLockJson(resp.getLockList()))).call();
2270 @Override
2271 public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers, boolean offload) {
2272 return this.<Void> newMasterCaller()
2273 .action((controller, stub) -> this
2274 .<DecommissionRegionServersRequest, DecommissionRegionServersResponse, Void> call(
2275 controller, stub, RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
2276 (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
2277 .call();
2280 @Override
2281 public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
2282 return this.<List<ServerName>> newMasterCaller()
2283 .action((controller, stub) -> this
2284 .<ListDecommissionedRegionServersRequest, ListDecommissionedRegionServersResponse,
2285 List<ServerName>> call(
2286 controller, stub, ListDecommissionedRegionServersRequest.newBuilder().build(),
2287 (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done),
2288 resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
2289 .collect(Collectors.toList())))
2290 .call();
2293 @Override
2294 public CompletableFuture<Void> recommissionRegionServer(ServerName server,
2295 List<byte[]> encodedRegionNames) {
2296 return this.<Void> newMasterCaller()
2297 .action((controller, stub) -> this
2298 .<RecommissionRegionServerRequest, RecommissionRegionServerResponse, Void> call(controller,
2299 stub, RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames),
2300 (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null))
2301 .call();
2305 * Get the region location for the passed region name. The region name may be a full region name
2306 * or encoded region name. If the region does not found, then it'll throw an
2307 * UnknownRegionException wrapped by a {@link CompletableFuture}
2308 * @param regionNameOrEncodedRegionName region name or encoded region name
2309 * @return region location, wrapped by a {@link CompletableFuture}
2311 @VisibleForTesting
2312 CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
2313 if (regionNameOrEncodedRegionName == null) {
2314 return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2316 try {
2317 CompletableFuture<Optional<HRegionLocation>> future;
2318 if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
2319 String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
2320 if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
2321 // old format encodedName, should be meta region
2322 future = connection.registry.getMetaRegionLocation()
2323 .thenApply(locs -> Stream.of(locs.getRegionLocations())
2324 .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
2325 } else {
2326 future = AsyncMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
2327 regionNameOrEncodedRegionName);
2329 } else {
2330 RegionInfo regionInfo =
2331 MetaTableAccessor.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName);
2332 if (regionInfo.isMetaRegion()) {
2333 future = connection.registry.getMetaRegionLocation()
2334 .thenApply(locs -> Stream.of(locs.getRegionLocations())
2335 .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
2336 .findFirst());
2337 } else {
2338 future =
2339 AsyncMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
2343 CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
2344 addListener(future, (location, err) -> {
2345 if (err != null) {
2346 returnedFuture.completeExceptionally(err);
2347 return;
2349 if (!location.isPresent() || location.get().getRegion() == null) {
2350 returnedFuture.completeExceptionally(
2351 new UnknownRegionException("Invalid region name or encoded region name: " +
2352 Bytes.toStringBinary(regionNameOrEncodedRegionName)));
2353 } else {
2354 returnedFuture.complete(location.get());
2357 return returnedFuture;
2358 } catch (IOException e) {
2359 return failedFuture(e);
2364 * Get the region info for the passed region name. The region name may be a full region name or
2365 * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
2366 * wrapped by a {@link CompletableFuture}
2367 * @param regionNameOrEncodedRegionName
2368 * @return region info, wrapped by a {@link CompletableFuture}
2370 private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
2371 if (regionNameOrEncodedRegionName == null) {
2372 return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2375 if (Bytes.equals(regionNameOrEncodedRegionName,
2376 RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) ||
2377 Bytes.equals(regionNameOrEncodedRegionName,
2378 RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
2379 return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
2382 CompletableFuture<RegionInfo> future = new CompletableFuture<>();
2383 addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
2384 if (err != null) {
2385 future.completeExceptionally(err);
2386 } else {
2387 future.complete(location.getRegion());
2390 return future;
2393 private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
2394 if (numRegions < 3) {
2395 throw new IllegalArgumentException("Must create at least three regions");
2396 } else if (Bytes.compareTo(startKey, endKey) >= 0) {
2397 throw new IllegalArgumentException("Start key must be smaller than end key");
2399 if (numRegions == 3) {
2400 return new byte[][] { startKey, endKey };
2402 byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
2403 if (splitKeys == null || splitKeys.length != numRegions - 1) {
2404 throw new IllegalArgumentException("Unable to split key range into enough regions");
2406 return splitKeys;
2409 private void verifySplitKeys(byte[][] splitKeys) {
2410 Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
2411 // Verify there are no duplicate split keys
2412 byte[] lastKey = null;
2413 for (byte[] splitKey : splitKeys) {
2414 if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
2415 throw new IllegalArgumentException("Empty split key must not be passed in the split keys.");
2417 if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
2418 throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: "
2419 + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey));
2421 lastKey = splitKey;
2425 private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
2427 abstract void onFinished();
2429 abstract void onError(Throwable error);
2431 @Override
2432 public void accept(Void v, Throwable error) {
2433 if (error != null) {
2434 onError(error);
2435 return;
2437 onFinished();
2441 private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
2442 protected final TableName tableName;
2444 TableProcedureBiConsumer(TableName tableName) {
2445 this.tableName = tableName;
2448 abstract String getOperationType();
2450 String getDescription() {
2451 return "Operation: " + getOperationType() + ", " + "Table Name: "
2452 + tableName.getNameWithNamespaceInclAsString();
2455 @Override
2456 void onFinished() {
2457 LOG.info(getDescription() + " completed");
2460 @Override
2461 void onError(Throwable error) {
2462 LOG.info(getDescription() + " failed with " + error.getMessage());
2466 private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
2467 protected final String namespaceName;
2469 NamespaceProcedureBiConsumer(String namespaceName) {
2470 this.namespaceName = namespaceName;
2473 abstract String getOperationType();
2475 String getDescription() {
2476 return "Operation: " + getOperationType() + ", Namespace: " + namespaceName;
2479 @Override
2480 void onFinished() {
2481 LOG.info(getDescription() + " completed");
2484 @Override
2485 void onError(Throwable error) {
2486 LOG.info(getDescription() + " failed with " + error.getMessage());
2490 private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2492 CreateTableProcedureBiConsumer(TableName tableName) {
2493 super(tableName);
2496 @Override
2497 String getOperationType() {
2498 return "CREATE";
2502 private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
2504 ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2505 super(tableName);
2508 @Override
2509 String getOperationType() {
2510 return "ENABLE";
2514 private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
2516 DeleteTableProcedureBiConsumer(TableName tableName) {
2517 super(tableName);
2520 @Override
2521 String getOperationType() {
2522 return "DELETE";
2525 @Override
2526 void onFinished() {
2527 connection.getLocator().clearCache(this.tableName);
2528 super.onFinished();
2532 private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2534 TruncateTableProcedureBiConsumer(TableName tableName) {
2535 super(tableName);
2538 @Override
2539 String getOperationType() {
2540 return "TRUNCATE";
2544 private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2546 EnableTableProcedureBiConsumer(TableName tableName) {
2547 super(tableName);
2550 @Override
2551 String getOperationType() {
2552 return "ENABLE";
2556 private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2558 DisableTableProcedureBiConsumer(TableName tableName) {
2559 super(tableName);
2562 @Override
2563 String getOperationType() {
2564 return "DISABLE";
2568 private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2570 AddColumnFamilyProcedureBiConsumer(TableName tableName) {
2571 super(tableName);
2574 @Override
2575 String getOperationType() {
2576 return "ADD_COLUMN_FAMILY";
2580 private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2582 DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
2583 super(tableName);
2586 @Override
2587 String getOperationType() {
2588 return "DELETE_COLUMN_FAMILY";
2592 private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2594 ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
2595 super(tableName);
2598 @Override
2599 String getOperationType() {
2600 return "MODIFY_COLUMN_FAMILY";
2604 private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2606 CreateNamespaceProcedureBiConsumer(String namespaceName) {
2607 super(namespaceName);
2610 @Override
2611 String getOperationType() {
2612 return "CREATE_NAMESPACE";
2616 private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2618 DeleteNamespaceProcedureBiConsumer(String namespaceName) {
2619 super(namespaceName);
2622 @Override
2623 String getOperationType() {
2624 return "DELETE_NAMESPACE";
2628 private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2630 ModifyNamespaceProcedureBiConsumer(String namespaceName) {
2631 super(namespaceName);
2634 @Override
2635 String getOperationType() {
2636 return "MODIFY_NAMESPACE";
2640 private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2642 MergeTableRegionProcedureBiConsumer(TableName tableName) {
2643 super(tableName);
2646 @Override
2647 String getOperationType() {
2648 return "MERGE_REGIONS";
2652 private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2654 SplitTableRegionProcedureBiConsumer(TableName tableName) {
2655 super(tableName);
2658 @Override
2659 String getOperationType() {
2660 return "SPLIT_REGION";
2664 private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
2665 private final String peerId;
2666 private final Supplier<String> getOperation;
2668 ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) {
2669 this.peerId = peerId;
2670 this.getOperation = getOperation;
2673 String getDescription() {
2674 return "Operation: " + getOperation.get() + ", peerId: " + peerId;
2677 @Override
2678 void onFinished() {
2679 LOG.info(getDescription() + " completed");
2682 @Override
2683 void onError(Throwable error) {
2684 LOG.info(getDescription() + " failed with " + error.getMessage());
2688 private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
2689 CompletableFuture<Void> future = new CompletableFuture<>();
2690 addListener(procFuture, (procId, error) -> {
2691 if (error != null) {
2692 future.completeExceptionally(error);
2693 return;
2695 getProcedureResult(procId, future, 0);
2697 return future;
2700 private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) {
2701 addListener(
2702 this.<GetProcedureResultResponse> newMasterCaller()
2703 .action((controller, stub) -> this
2704 .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call(
2705 controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
2706 (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
2707 .call(),
2708 (response, error) -> {
2709 if (error != null) {
2710 LOG.warn("failed to get the procedure result procId={}", procId,
2711 ConnectionUtils.translateException(error));
2712 retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2713 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2714 return;
2716 if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
2717 retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2718 ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2719 return;
2721 if (response.hasException()) {
2722 IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
2723 future.completeExceptionally(ioe);
2724 } else {
2725 future.complete(null);
2730 private <T> CompletableFuture<T> failedFuture(Throwable error) {
2731 CompletableFuture<T> future = new CompletableFuture<>();
2732 future.completeExceptionally(error);
2733 return future;
2736 private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) {
2737 if (error != null) {
2738 future.completeExceptionally(error);
2739 return true;
2741 return false;
2744 @Override
2745 public CompletableFuture<ClusterMetrics> getClusterMetrics() {
2746 return getClusterMetrics(EnumSet.allOf(Option.class));
2749 @Override
2750 public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) {
2751 return this
2752 .<ClusterMetrics> newMasterCaller()
2753 .action(
2754 (controller, stub) -> this
2755 .<GetClusterStatusRequest, GetClusterStatusResponse, ClusterMetrics> call(controller,
2756 stub, RequestConverter.buildGetClusterStatusRequest(options),
2757 (s, c, req, done) -> s.getClusterStatus(c, req, done),
2758 resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus()))).call();
2761 @Override
2762 public CompletableFuture<Void> shutdown() {
2763 return this.<Void> newMasterCaller().priority(HIGH_QOS)
2764 .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
2765 stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done),
2766 resp -> null))
2767 .call();
2770 @Override
2771 public CompletableFuture<Void> stopMaster() {
2772 return this.<Void> newMasterCaller().priority(HIGH_QOS)
2773 .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(
2774 controller, stub, StopMasterRequest.newBuilder().build(),
2775 (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null))
2776 .call();
2779 @Override
2780 public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
2781 StopServerRequest request = RequestConverter
2782 .buildStopServerRequest("Called by admin client " + this.connection.toString());
2783 return this.<Void> newAdminCaller().priority(HIGH_QOS)
2784 .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
2785 controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
2786 resp -> null))
2787 .serverName(serverName).call();
2790 @Override
2791 public CompletableFuture<Void> updateConfiguration(ServerName serverName) {
2792 return this
2793 .<Void> newAdminCaller()
2794 .action(
2795 (controller, stub) -> this
2796 .<UpdateConfigurationRequest, UpdateConfigurationResponse, Void> adminCall(
2797 controller, stub, UpdateConfigurationRequest.getDefaultInstance(),
2798 (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null))
2799 .serverName(serverName).call();
2802 @Override
2803 public CompletableFuture<Void> updateConfiguration() {
2804 CompletableFuture<Void> future = new CompletableFuture<Void>();
2805 addListener(
2806 getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS, Option.MASTER, Option.BACKUP_MASTERS)),
2807 (status, err) -> {
2808 if (err != null) {
2809 future.completeExceptionally(err);
2810 } else {
2811 List<CompletableFuture<Void>> futures = new ArrayList<>();
2812 status.getLiveServerMetrics().keySet()
2813 .forEach(server -> futures.add(updateConfiguration(server)));
2814 futures.add(updateConfiguration(status.getMasterName()));
2815 status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
2816 addListener(
2817 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
2818 (result, err2) -> {
2819 if (err2 != null) {
2820 future.completeExceptionally(err2);
2821 } else {
2822 future.complete(result);
2827 return future;
2830 @Override
2831 public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
2832 return this
2833 .<Void> newAdminCaller()
2834 .action(
2835 (controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, Void> adminCall(
2836 controller, stub, RequestConverter.buildRollWALWriterRequest(),
2837 (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null))
2838 .serverName(serverName).call();
2841 @Override
2842 public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
2843 return this
2844 .<Void> newAdminCaller()
2845 .action(
2846 (controller, stub) -> this
2847 .<ClearCompactionQueuesRequest, ClearCompactionQueuesResponse, Void> adminCall(
2848 controller, stub, RequestConverter.buildClearCompactionQueuesRequest(queues), (s,
2849 c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null))
2850 .serverName(serverName).call();
2853 @Override
2854 public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() {
2855 return this
2856 .<List<SecurityCapability>> newMasterCaller()
2857 .action(
2858 (controller, stub) -> this
2859 .<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, List<SecurityCapability>> call(
2860 controller, stub, SecurityCapabilitiesRequest.newBuilder().build(), (s, c, req,
2861 done) -> s.getSecurityCapabilities(c, req, done), (resp) -> ProtobufUtil
2862 .toSecurityCapabilityList(resp.getCapabilitiesList()))).call();
2865 @Override
2866 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) {
2867 return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName);
2870 @Override
2871 public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName,
2872 TableName tableName) {
2873 Preconditions.checkNotNull(tableName,
2874 "tableName is null. If you don't specify a tableName, use getRegionLoads() instead");
2875 return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName);
2878 private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request,
2879 ServerName serverName) {
2880 return this.<List<RegionMetrics>> newAdminCaller()
2881 .action((controller, stub) -> this
2882 .<GetRegionLoadRequest, GetRegionLoadResponse, List<RegionMetrics>>
2883 adminCall(controller, stub, request, (s, c, req, done) ->
2884 s.getRegionLoad(controller, req, done), RegionMetricsBuilder::toRegionMetrics))
2885 .serverName(serverName).call();
2888 @Override
2889 public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
2890 return this
2891 .<Boolean> newMasterCaller()
2892 .action(
2893 (controller, stub) -> this
2894 .<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, Boolean> call(controller,
2895 stub, IsInMaintenanceModeRequest.newBuilder().build(),
2896 (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
2897 resp -> resp.getInMaintenanceMode())).call();
2900 @Override
2901 public CompletableFuture<CompactionState> getCompactionState(TableName tableName,
2902 CompactType compactType) {
2903 CompletableFuture<CompactionState> future = new CompletableFuture<>();
2905 switch (compactType) {
2906 case MOB:
2907 addListener(connection.registry.getMasterAddress(), (serverName, err) -> {
2908 if (err != null) {
2909 future.completeExceptionally(err);
2910 return;
2912 RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
2914 addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
2915 .action((controller, stub) -> this
2916 .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
2917 controller, stub,
2918 RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
2919 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
2920 .call(), (resp2, err2) -> {
2921 if (err2 != null) {
2922 future.completeExceptionally(err2);
2923 } else {
2924 if (resp2.hasCompactionState()) {
2925 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
2926 } else {
2927 future.complete(CompactionState.NONE);
2932 break;
2933 case NORMAL:
2934 addListener(getTableHRegionLocations(tableName), (locations, err) -> {
2935 if (err != null) {
2936 future.completeExceptionally(err);
2937 return;
2939 List<CompactionState> regionStates = new ArrayList<>();
2940 List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
2941 locations.stream().filter(loc -> loc.getServerName() != null)
2942 .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
2943 .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
2944 futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
2945 // If any region compaction state is MAJOR_AND_MINOR
2946 // the table compaction state is MAJOR_AND_MINOR, too.
2947 if (err2 != null) {
2948 future.completeExceptionally(unwrapCompletionException(err2));
2949 } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
2950 future.complete(regionState);
2951 } else {
2952 regionStates.add(regionState);
2954 }));
2956 addListener(
2957 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
2958 (ret, err3) -> {
2959 // If future not completed, check all regions's compaction state
2960 if (!future.isCompletedExceptionally() && !future.isDone()) {
2961 CompactionState state = CompactionState.NONE;
2962 for (CompactionState regionState : regionStates) {
2963 switch (regionState) {
2964 case MAJOR:
2965 if (state == CompactionState.MINOR) {
2966 future.complete(CompactionState.MAJOR_AND_MINOR);
2967 } else {
2968 state = CompactionState.MAJOR;
2970 break;
2971 case MINOR:
2972 if (state == CompactionState.MAJOR) {
2973 future.complete(CompactionState.MAJOR_AND_MINOR);
2974 } else {
2975 state = CompactionState.MINOR;
2977 break;
2978 case NONE:
2979 default:
2981 if (!future.isDone()) {
2982 future.complete(state);
2988 break;
2989 default:
2990 throw new IllegalArgumentException("Unknown compactType: " + compactType);
2993 return future;
2996 @Override
2997 public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
2998 CompletableFuture<CompactionState> future = new CompletableFuture<>();
2999 addListener(getRegionLocation(regionName), (location, err) -> {
3000 if (err != null) {
3001 future.completeExceptionally(err);
3002 return;
3004 ServerName serverName = location.getServerName();
3005 if (serverName == null) {
3006 future
3007 .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
3008 return;
3010 addListener(
3011 this.<GetRegionInfoResponse> newAdminCaller()
3012 .action((controller, stub) -> this
3013 .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
3014 controller, stub,
3015 RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(),
3016 true),
3017 (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3018 .serverName(serverName).call(),
3019 (resp2, err2) -> {
3020 if (err2 != null) {
3021 future.completeExceptionally(err2);
3022 } else {
3023 if (resp2.hasCompactionState()) {
3024 future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3025 } else {
3026 future.complete(CompactionState.NONE);
3031 return future;
3034 @Override
3035 public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
3036 MajorCompactionTimestampRequest request =
3037 MajorCompactionTimestampRequest.newBuilder()
3038 .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
3039 return this
3040 .<Optional<Long>> newMasterCaller()
3041 .action(
3042 (controller, stub) -> this
3043 .<MajorCompactionTimestampRequest, MajorCompactionTimestampResponse, Optional<Long>> call(
3044 controller, stub, request,
3045 (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done),
3046 ProtobufUtil::toOptionalTimestamp)).call();
3049 @Override
3050 public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion(
3051 byte[] regionName) {
3052 CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
3053 // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
3054 addListener(getRegionInfo(regionName), (region, err) -> {
3055 if (err != null) {
3056 future.completeExceptionally(err);
3057 return;
3059 MajorCompactionTimestampForRegionRequest.Builder builder =
3060 MajorCompactionTimestampForRegionRequest.newBuilder();
3061 builder.setRegion(
3062 RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
3063 addListener(this.<Optional<Long>> newMasterCaller().action((controller, stub) -> this
3064 .<MajorCompactionTimestampForRegionRequest,
3065 MajorCompactionTimestampResponse, Optional<Long>> call(
3066 controller, stub, builder.build(),
3067 (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done),
3068 ProtobufUtil::toOptionalTimestamp))
3069 .call(), (timestamp, err2) -> {
3070 if (err2 != null) {
3071 future.completeExceptionally(err2);
3072 } else {
3073 future.complete(timestamp);
3077 return future;
3080 @Override
3081 public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
3082 List<String> serverNamesList) {
3083 CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>();
3084 addListener(getRegionServerList(serverNamesList), (serverNames, err) -> {
3085 if (err != null) {
3086 future.completeExceptionally(err);
3087 return;
3089 // Accessed by multiple threads.
3090 Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size());
3091 List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size());
3092 serverNames.stream().forEach(serverName -> {
3093 futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
3094 if (err2 != null) {
3095 future.completeExceptionally(unwrapCompletionException(err2));
3096 } else {
3097 serverStates.put(serverName, serverState);
3099 }));
3101 addListener(
3102 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3103 (ret, err3) -> {
3104 if (!future.isCompletedExceptionally()) {
3105 if (err3 != null) {
3106 future.completeExceptionally(err3);
3107 } else {
3108 future.complete(serverStates);
3113 return future;
3116 private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) {
3117 CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
3118 if (serverNamesList.isEmpty()) {
3119 CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture =
3120 getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
3121 addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> {
3122 if (err != null) {
3123 future.completeExceptionally(err);
3124 } else {
3125 future.complete(new ArrayList<>(clusterMetrics.getLiveServerMetrics().keySet()));
3128 return future;
3129 } else {
3130 List<ServerName> serverList = new ArrayList<>();
3131 for (String regionServerName : serverNamesList) {
3132 ServerName serverName = null;
3133 try {
3134 serverName = ServerName.valueOf(regionServerName);
3135 } catch (Exception e) {
3136 future.completeExceptionally(
3137 new IllegalArgumentException(String.format("ServerName format: %s", regionServerName)));
3139 if (serverName == null) {
3140 future.completeExceptionally(
3141 new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName)));
3144 future.complete(serverList);
3146 return future;
3149 private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) {
3150 return this
3151 .<Boolean>newAdminCaller()
3152 .serverName(serverName)
3153 .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse,
3154 Boolean>adminCall(controller, stub,
3155 CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(), (s, c, req, done) ->
3156 s.compactionSwitch(c, req, done), resp -> resp.getPrevState())).call();
3159 @Override
3160 public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) {
3161 return this.<Boolean> newMasterCaller()
3162 .action((controller, stub) -> this
3163 .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller, stub,
3164 RequestConverter.buildSetBalancerRunningRequest(on, drainRITs),
3165 (s, c, req, done) -> s.setBalancerRunning(c, req, done),
3166 (resp) -> resp.getPrevBalanceValue()))
3167 .call();
3170 @Override
3171 public CompletableFuture<Boolean> balance(boolean forcible) {
3172 return this
3173 .<Boolean> newMasterCaller()
3174 .action(
3175 (controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean> call(controller,
3176 stub, RequestConverter.buildBalanceRequest(forcible),
3177 (s, c, req, done) -> s.balance(c, req, done), (resp) -> resp.getBalancerRan())).call();
3180 @Override
3181 public CompletableFuture<Boolean> isBalancerEnabled() {
3182 return this
3183 .<Boolean> newMasterCaller()
3184 .action(
3185 (controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(
3186 controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
3187 (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3188 .call();
3191 @Override
3192 public CompletableFuture<Boolean> normalizerSwitch(boolean on) {
3193 return this
3194 .<Boolean> newMasterCaller()
3195 .action(
3196 (controller, stub) -> this
3197 .<SetNormalizerRunningRequest, SetNormalizerRunningResponse, Boolean> call(
3198 controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on), (s, c,
3199 req, done) -> s.setNormalizerRunning(c, req, done), (resp) -> resp
3200 .getPrevNormalizerValue())).call();
3203 @Override
3204 public CompletableFuture<Boolean> isNormalizerEnabled() {
3205 return this
3206 .<Boolean> newMasterCaller()
3207 .action(
3208 (controller, stub) -> this
3209 .<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse, Boolean> call(controller,
3210 stub, RequestConverter.buildIsNormalizerEnabledRequest(),
3211 (s, c, req, done) -> s.isNormalizerEnabled(c, req, done),
3212 (resp) -> resp.getEnabled())).call();
3215 @Override
3216 public CompletableFuture<Boolean> normalize() {
3217 return this
3218 .<Boolean> newMasterCaller()
3219 .action(
3220 (controller, stub) -> this.<NormalizeRequest, NormalizeResponse, Boolean> call(
3221 controller, stub, RequestConverter.buildNormalizeRequest(),
3222 (s, c, req, done) -> s.normalize(c, req, done), (resp) -> resp.getNormalizerRan()))
3223 .call();
3226 @Override
3227 public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) {
3228 return this
3229 .<Boolean> newMasterCaller()
3230 .action(
3231 (controller, stub) -> this
3232 .<SetCleanerChoreRunningRequest, SetCleanerChoreRunningResponse, Boolean> call(
3233 controller, stub, RequestConverter.buildSetCleanerChoreRunningRequest(enabled), (s,
3234 c, req, done) -> s.setCleanerChoreRunning(c, req, done), (resp) -> resp
3235 .getPrevValue())).call();
3238 @Override
3239 public CompletableFuture<Boolean> isCleanerChoreEnabled() {
3240 return this
3241 .<Boolean> newMasterCaller()
3242 .action(
3243 (controller, stub) -> this
3244 .<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse, Boolean> call(
3245 controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(), (s, c, req,
3246 done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue()))
3247 .call();
3250 @Override
3251 public CompletableFuture<Boolean> runCleanerChore() {
3252 return this
3253 .<Boolean> newMasterCaller()
3254 .action(
3255 (controller, stub) -> this
3256 .<RunCleanerChoreRequest, RunCleanerChoreResponse, Boolean> call(controller, stub,
3257 RequestConverter.buildRunCleanerChoreRequest(),
3258 (s, c, req, done) -> s.runCleanerChore(c, req, done),
3259 (resp) -> resp.getCleanerChoreRan())).call();
3262 @Override
3263 public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) {
3264 return this
3265 .<Boolean> newMasterCaller()
3266 .action(
3267 (controller, stub) -> this
3268 .<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse, Boolean> call(
3269 controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled), (s,
3270 c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp
3271 .getPrevValue())).call();
3274 @Override
3275 public CompletableFuture<Boolean> isCatalogJanitorEnabled() {
3276 return this
3277 .<Boolean> newMasterCaller()
3278 .action(
3279 (controller, stub) -> this
3280 .<IsCatalogJanitorEnabledRequest, IsCatalogJanitorEnabledResponse, Boolean> call(
3281 controller, stub, RequestConverter.buildIsCatalogJanitorEnabledRequest(), (s, c,
3282 req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp
3283 .getValue())).call();
3286 @Override
3287 public CompletableFuture<Integer> runCatalogJanitor() {
3288 return this
3289 .<Integer> newMasterCaller()
3290 .action(
3291 (controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse, Integer> call(
3292 controller, stub, RequestConverter.buildCatalogScanRequest(),
3293 (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
3294 .call();
3297 @Override
3298 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3299 ServiceCaller<S, R> callable) {
3300 MasterCoprocessorRpcChannelImpl channel =
3301 new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
3302 S stub = stubMaker.apply(channel);
3303 CompletableFuture<R> future = new CompletableFuture<>();
3304 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3305 callable.call(stub, controller, resp -> {
3306 if (controller.failed()) {
3307 future.completeExceptionally(controller.getFailed());
3308 } else {
3309 future.complete(resp);
3312 return future;
3315 @Override
3316 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3317 ServiceCaller<S, R> callable, ServerName serverName) {
3318 RegionServerCoprocessorRpcChannelImpl channel =
3319 new RegionServerCoprocessorRpcChannelImpl(this.<Message> newServerCaller().serverName(
3320 serverName));
3321 S stub = stubMaker.apply(channel);
3322 CompletableFuture<R> future = new CompletableFuture<>();
3323 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3324 callable.call(stub, controller, resp -> {
3325 if (controller.failed()) {
3326 future.completeExceptionally(controller.getFailed());
3327 } else {
3328 future.complete(resp);
3331 return future;
3334 @Override
3335 public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
3336 return this.<List<ServerName>> newMasterCaller()
3337 .action((controller, stub) -> this
3338 .<ClearDeadServersRequest, ClearDeadServersResponse, List<ServerName>> call(
3339 controller, stub, RequestConverter.buildClearDeadServersRequest(servers),
3340 (s, c, req, done) -> s.clearDeadServers(c, req, done),
3341 (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
3342 .call();
3345 private <T> ServerRequestCallerBuilder<T> newServerCaller() {
3346 return this.connection.callerFactory.<T> serverRequest()
3347 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
3348 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
3349 .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
3350 .startLogErrorsCnt(startLogErrorsCnt);
3353 @Override
3354 public CompletableFuture<Void> enableTableReplication(TableName tableName) {
3355 if (tableName == null) {
3356 return failedFuture(new IllegalArgumentException("Table name is null"));
3358 CompletableFuture<Void> future = new CompletableFuture<>();
3359 addListener(tableExists(tableName), (exist, err) -> {
3360 if (err != null) {
3361 future.completeExceptionally(err);
3362 return;
3364 if (!exist) {
3365 future.completeExceptionally(new TableNotFoundException(
3366 "Table '" + tableName.getNameAsString() + "' does not exists."));
3367 return;
3369 addListener(getTableSplits(tableName), (splits, err1) -> {
3370 if (err1 != null) {
3371 future.completeExceptionally(err1);
3372 } else {
3373 addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> {
3374 if (err2 != null) {
3375 future.completeExceptionally(err2);
3376 } else {
3377 addListener(setTableReplication(tableName, true), (result3, err3) -> {
3378 if (err3 != null) {
3379 future.completeExceptionally(err3);
3380 } else {
3381 future.complete(result3);
3389 return future;
3392 @Override
3393 public CompletableFuture<Void> disableTableReplication(TableName tableName) {
3394 if (tableName == null) {
3395 return failedFuture(new IllegalArgumentException("Table name is null"));
3397 CompletableFuture<Void> future = new CompletableFuture<>();
3398 addListener(tableExists(tableName), (exist, err) -> {
3399 if (err != null) {
3400 future.completeExceptionally(err);
3401 return;
3403 if (!exist) {
3404 future.completeExceptionally(new TableNotFoundException(
3405 "Table '" + tableName.getNameAsString() + "' does not exists."));
3406 return;
3408 addListener(setTableReplication(tableName, false), (result, err2) -> {
3409 if (err2 != null) {
3410 future.completeExceptionally(err2);
3411 } else {
3412 future.complete(result);
3416 return future;
3419 private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
3420 CompletableFuture<byte[][]> future = new CompletableFuture<>();
3421 addListener(getRegions(tableName), (regions, err2) -> {
3422 if (err2 != null) {
3423 future.completeExceptionally(err2);
3424 return;
3426 if (regions.size() == 1) {
3427 future.complete(null);
3428 } else {
3429 byte[][] splits = new byte[regions.size() - 1][];
3430 for (int i = 1; i < regions.size(); i++) {
3431 splits[i - 1] = regions.get(i).getStartKey();
3433 future.complete(splits);
3436 return future;
3440 * Connect to peer and check the table descriptor on peer:
3441 * <ol>
3442 * <li>Create the same table on peer when not exist.</li>
3443 * <li>Throw an exception if the table already has replication enabled on any of the column
3444 * families.</li>
3445 * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
3446 * </ol>
3447 * @param tableName name of the table to sync to the peer
3448 * @param splits table split keys
3450 private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName,
3451 byte[][] splits) {
3452 CompletableFuture<Void> future = new CompletableFuture<>();
3453 addListener(listReplicationPeers(), (peers, err) -> {
3454 if (err != null) {
3455 future.completeExceptionally(err);
3456 return;
3458 if (peers == null || peers.size() <= 0) {
3459 future.completeExceptionally(
3460 new IllegalArgumentException("Found no peer cluster for replication."));
3461 return;
3463 List<CompletableFuture<Void>> futures = new ArrayList<>();
3464 peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName))
3465 .forEach(peer -> {
3466 futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
3468 addListener(
3469 CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3470 (result, err2) -> {
3471 if (err2 != null) {
3472 future.completeExceptionally(err2);
3473 } else {
3474 future.complete(result);
3478 return future;
3481 private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
3482 ReplicationPeerDescription peer) {
3483 Configuration peerConf = null;
3484 try {
3485 peerConf =
3486 ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer);
3487 } catch (IOException e) {
3488 return failedFuture(e);
3490 CompletableFuture<Void> future = new CompletableFuture<>();
3491 addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> {
3492 if (err != null) {
3493 future.completeExceptionally(err);
3494 return;
3496 addListener(getDescriptor(tableName), (tableDesc, err1) -> {
3497 if (err1 != null) {
3498 future.completeExceptionally(err1);
3499 return;
3501 AsyncAdmin peerAdmin = conn.getAdmin();
3502 addListener(peerAdmin.tableExists(tableName), (exist, err2) -> {
3503 if (err2 != null) {
3504 future.completeExceptionally(err2);
3505 return;
3507 if (!exist) {
3508 CompletableFuture<Void> createTableFuture = null;
3509 if (splits == null) {
3510 createTableFuture = peerAdmin.createTable(tableDesc);
3511 } else {
3512 createTableFuture = peerAdmin.createTable(tableDesc, splits);
3514 addListener(createTableFuture, (result, err3) -> {
3515 if (err3 != null) {
3516 future.completeExceptionally(err3);
3517 } else {
3518 future.complete(result);
3521 } else {
3522 addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin),
3523 (result, err4) -> {
3524 if (err4 != null) {
3525 future.completeExceptionally(err4);
3526 } else {
3527 future.complete(result);
3534 return future;
3537 private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName,
3538 TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) {
3539 CompletableFuture<Void> future = new CompletableFuture<>();
3540 addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> {
3541 if (err != null) {
3542 future.completeExceptionally(err);
3543 return;
3545 if (peerTableDesc == null) {
3546 future.completeExceptionally(
3547 new IllegalArgumentException("Failed to get table descriptor for table " +
3548 tableName.getNameAsString() + " from peer cluster " + peer.getPeerId()));
3549 return;
3551 if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) {
3552 future.completeExceptionally(new IllegalArgumentException(
3553 "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() +
3554 ", but the table descriptors are not same when compared with source cluster." +
3555 " Thus can not enable the table's replication switch."));
3556 return;
3558 future.complete(null);
3560 return future;
3564 * Set the table's replication switch if the table's replication switch is already not set.
3565 * @param tableName name of the table
3566 * @param enableRep is replication switch enable or disable
3568 private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) {
3569 CompletableFuture<Void> future = new CompletableFuture<>();
3570 addListener(getDescriptor(tableName), (tableDesc, err) -> {
3571 if (err != null) {
3572 future.completeExceptionally(err);
3573 return;
3575 if (!tableDesc.matchReplicationScope(enableRep)) {
3576 int scope =
3577 enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
3578 TableDescriptor newTableDesc =
3579 TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
3580 addListener(modifyTable(newTableDesc), (result, err2) -> {
3581 if (err2 != null) {
3582 future.completeExceptionally(err2);
3583 } else {
3584 future.complete(result);
3587 } else {
3588 future.complete(null);
3591 return future;
3594 @Override
3595 public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
3596 CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
3597 addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3598 if (err != null) {
3599 future.completeExceptionally(err);
3600 return;
3602 Map<ServerName, List<RegionInfo>> regionInfoByServerName =
3603 locations.stream().filter(l -> l.getRegion() != null)
3604 .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
3605 .collect(Collectors.groupingBy(l -> l.getServerName(),
3606 Collectors.mapping(l -> l.getRegion(), Collectors.toList())));
3607 List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>();
3608 CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator();
3609 for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
3610 futures
3611 .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
3612 if (err2 != null) {
3613 future.completeExceptionally(unwrapCompletionException(err2));
3614 } else {
3615 aggregator.append(stats);
3617 }));
3619 addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
3620 (ret, err3) -> {
3621 if (err3 != null) {
3622 future.completeExceptionally(unwrapCompletionException(err3));
3623 } else {
3624 future.complete(aggregator.sum());
3628 return future;
3631 @Override
3632 public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName,
3633 boolean preserveSplits) {
3634 CompletableFuture<Void> future = new CompletableFuture<>();
3635 addListener(tableExists(tableName), (exist, err) -> {
3636 if (err != null) {
3637 future.completeExceptionally(err);
3638 return;
3640 if (!exist) {
3641 future.completeExceptionally(new TableNotFoundException(tableName));
3642 return;
3644 addListener(tableExists(newTableName), (exist1, err1) -> {
3645 if (err1 != null) {
3646 future.completeExceptionally(err1);
3647 return;
3649 if (exist1) {
3650 future.completeExceptionally(new TableExistsException(newTableName));
3651 return;
3653 addListener(getDescriptor(tableName), (tableDesc, err2) -> {
3654 if (err2 != null) {
3655 future.completeExceptionally(err2);
3656 return;
3658 TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc);
3659 if (preserveSplits) {
3660 addListener(getTableSplits(tableName), (splits, err3) -> {
3661 if (err3 != null) {
3662 future.completeExceptionally(err3);
3663 } else {
3664 addListener(createTable(newTableDesc, splits), (result, err4) -> {
3665 if (err4 != null) {
3666 future.completeExceptionally(err4);
3667 } else {
3668 future.complete(result);
3673 } else {
3674 addListener(createTable(newTableDesc), (result, err5) -> {
3675 if (err5 != null) {
3676 future.completeExceptionally(err5);
3677 } else {
3678 future.complete(result);
3685 return future;
3688 private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName,
3689 List<RegionInfo> hris) {
3690 return this.<CacheEvictionStats> newAdminCaller().action((controller, stub) -> this
3691 .<ClearRegionBlockCacheRequest, ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(
3692 controller, stub, RequestConverter.buildClearRegionBlockCacheRequest(hris),
3693 (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done),
3694 resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
3695 .serverName(serverName).call();
3698 @Override
3699 public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) {
3700 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3701 .action((controller, stub) -> this
3702 .<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse, Boolean> call(controller, stub,
3703 SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(),
3704 (s, c, req, done) -> s.switchRpcThrottle(c, req, done),
3705 resp -> resp.getPreviousRpcThrottleEnabled()))
3706 .call();
3707 return future;
3710 @Override
3711 public CompletableFuture<Boolean> isRpcThrottleEnabled() {
3712 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3713 .action((controller, stub) -> this
3714 .<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse, Boolean> call(controller,
3715 stub, IsRpcThrottleEnabledRequest.newBuilder().build(),
3716 (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done),
3717 resp -> resp.getRpcThrottleEnabled()))
3718 .call();
3719 return future;
3722 @Override
3723 public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) {
3724 CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3725 .action((controller, stub) -> this
3726 .<SwitchExceedThrottleQuotaRequest, SwitchExceedThrottleQuotaResponse, Boolean> call(
3727 controller, stub,
3728 SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable)
3729 .build(),
3730 (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done),
3731 resp -> resp.getPreviousExceedThrottleQuotaEnabled()))
3732 .call();
3733 return future;
3736 @Override
3737 public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() {
3738 return this.<Map<TableName, Long>> newMasterCaller().action((controller, stub) -> this
3739 .<GetSpaceQuotaRegionSizesRequest, GetSpaceQuotaRegionSizesResponse,
3740 Map<TableName, Long>> call(controller, stub,
3741 RequestConverter.buildGetSpaceQuotaRegionSizesRequest(),
3742 (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done),
3743 resp -> resp.getSizesList().stream().collect(Collectors
3744 .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize))))
3745 .call();
3748 @Override
3749 public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>> getRegionServerSpaceQuotaSnapshots(
3750 ServerName serverName) {
3751 return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller()
3752 .action((controller, stub) -> this
3753 .<GetSpaceQuotaSnapshotsRequest, GetSpaceQuotaSnapshotsResponse,
3754 Map<TableName, SpaceQuotaSnapshot>> adminCall(controller, stub,
3755 RequestConverter.buildGetSpaceQuotaSnapshotsRequest(),
3756 (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done),
3757 resp -> resp.getSnapshotsList().stream()
3758 .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()),
3759 snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot())))))
3760 .serverName(serverName).call();
3763 private CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(
3764 Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) {
3765 return this.<SpaceQuotaSnapshot> newMasterCaller()
3766 .action((controller, stub) -> this
3767 .<GetQuotaStatesRequest, GetQuotaStatesResponse, SpaceQuotaSnapshot> call(controller, stub,
3768 RequestConverter.buildGetQuotaStatesRequest(),
3769 (s, c, req, done) -> s.getQuotaStates(c, req, done), converter))
3770 .call();
3773 @Override
3774 public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) {
3775 return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream()
3776 .filter(s -> s.getNamespace().equals(namespace)).findFirst()
3777 .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3780 @Override
3781 public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) {
3782 HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName);
3783 return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream()
3784 .filter(s -> s.getTableName().equals(protoTableName)).findFirst()
3785 .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
3788 @Override
3789 public CompletableFuture<Void> grant(UserPermission userPermission,
3790 boolean mergeExistingPermissions) {
3791 return this.<Void> newMasterCaller()
3792 .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller,
3793 stub, ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions),
3794 (s, c, req, done) -> s.grant(c, req, done), resp -> null))
3795 .call();
3798 @Override
3799 public CompletableFuture<Void> revoke(UserPermission userPermission) {
3800 return this.<Void> newMasterCaller()
3801 .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller,
3802 stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission),
3803 (s, c, req, done) -> s.revoke(c, req, done), resp -> null))
3804 .call();
3807 @Override
3808 public CompletableFuture<List<UserPermission>>
3809 getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) {
3810 return this.<List<UserPermission>> newMasterCaller().action((controller,
3811 stub) -> this.<AccessControlProtos.GetUserPermissionsRequest, GetUserPermissionsResponse,
3812 List<UserPermission>> call(controller, stub,
3813 ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest),
3814 (s, c, req, done) -> s.getUserPermissions(c, req, done),
3815 resp -> resp.getUserPermissionList().stream()
3816 .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm))
3817 .collect(Collectors.toList())))
3818 .call();