HBASE-26765 Minor refactor of async scanning code (#4121)
[hbase.git] / hbase-client / src / main / java / org / apache / hadoop / hbase / client / ZKConnectionRegistry.java
blobbc521d0f7b19b3bac0f7839c93e14de26cb2e4f4
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.apache.hadoop.hbase.client.RegionInfo.DEFAULT_REPLICA_ID;
21 import static org.apache.hadoop.hbase.client.RegionInfoBuilder.FIRST_META_REGIONINFO;
22 import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica;
23 import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica;
24 import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic;
25 import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
26 import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
27 import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData;
29 import java.io.IOException;
30 import java.util.List;
31 import java.util.concurrent.CompletableFuture;
32 import java.util.stream.Collectors;
33 import org.apache.commons.lang3.mutable.MutableInt;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.hbase.ClusterId;
36 import org.apache.hadoop.hbase.HRegionLocation;
37 import org.apache.hadoop.hbase.RegionLocations;
38 import org.apache.hadoop.hbase.ServerName;
39 import org.apache.hadoop.hbase.exceptions.DeserializationException;
40 import org.apache.hadoop.hbase.master.RegionState;
41 import org.apache.hadoop.hbase.util.Pair;
42 import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient;
43 import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
44 import org.apache.yetus.audience.InterfaceAudience;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
48 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
49 import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
51 /**
52 * Zookeeper based registry implementation.
54 @InterfaceAudience.Private
55 class ZKConnectionRegistry implements ConnectionRegistry {
57 private static final Logger LOG = LoggerFactory.getLogger(ZKConnectionRegistry.class);
59 private final ReadOnlyZKClient zk;
61 private final ZNodePaths znodePaths;
63 ZKConnectionRegistry(Configuration conf) {
64 this.znodePaths = new ZNodePaths(conf);
65 this.zk = new ReadOnlyZKClient(conf);
68 private interface Converter<T> {
69 T convert(byte[] data) throws Exception;
72 private <T> CompletableFuture<T> getAndConvert(String path, Converter<T> converter) {
73 CompletableFuture<T> future = new CompletableFuture<>();
74 addListener(zk.get(path), (data, error) -> {
75 if (error != null) {
76 future.completeExceptionally(error);
77 return;
79 try {
80 future.complete(converter.convert(data));
81 } catch (Exception e) {
82 future.completeExceptionally(e);
84 });
85 return future;
88 private static String getClusterId(byte[] data) throws DeserializationException {
89 if (data == null || data.length == 0) {
90 return null;
92 data = removeMetaData(data);
93 return ClusterId.parseFrom(data).toString();
96 @Override
97 public CompletableFuture<String> getClusterId() {
98 return tracedFuture(
99 () -> getAndConvert(znodePaths.clusterIdZNode, ZKConnectionRegistry::getClusterId),
100 "ZKConnectionRegistry.getClusterId");
103 ReadOnlyZKClient getZKClient() {
104 return zk;
107 private static ZooKeeperProtos.MetaRegionServer getMetaProto(byte[] data) throws IOException {
108 if (data == null || data.length == 0) {
109 return null;
111 data = removeMetaData(data);
112 int prefixLen = lengthOfPBMagic();
113 return ZooKeeperProtos.MetaRegionServer.parser().parseFrom(data, prefixLen,
114 data.length - prefixLen);
117 private static void tryComplete(MutableInt remaining, HRegionLocation[] locs,
118 CompletableFuture<RegionLocations> future) {
119 remaining.decrement();
120 if (remaining.intValue() > 0) {
121 return;
123 future.complete(new RegionLocations(locs));
126 private Pair<RegionState.State, ServerName> getStateAndServerName(
127 ZooKeeperProtos.MetaRegionServer proto) {
128 RegionState.State state;
129 if (proto.hasState()) {
130 state = RegionState.State.convert(proto.getState());
131 } else {
132 state = RegionState.State.OPEN;
134 HBaseProtos.ServerName snProto = proto.getServer();
135 return Pair.newPair(state,
136 ServerName.valueOf(snProto.getHostName(), snProto.getPort(), snProto.getStartCode()));
139 private void getMetaRegionLocation(CompletableFuture<RegionLocations> future,
140 List<String> metaReplicaZNodes) {
141 if (metaReplicaZNodes.isEmpty()) {
142 future.completeExceptionally(new IOException("No meta znode available"));
144 HRegionLocation[] locs = new HRegionLocation[metaReplicaZNodes.size()];
145 MutableInt remaining = new MutableInt(locs.length);
146 for (String metaReplicaZNode : metaReplicaZNodes) {
147 int replicaId = znodePaths.getMetaReplicaIdFromZNode(metaReplicaZNode);
148 String path = ZNodePaths.joinZNode(znodePaths.baseZNode, metaReplicaZNode);
149 if (replicaId == DEFAULT_REPLICA_ID) {
150 addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> {
151 if (error != null) {
152 future.completeExceptionally(error);
153 return;
155 if (proto == null) {
156 future.completeExceptionally(new IOException("Meta znode is null"));
157 return;
159 Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto);
160 if (stateAndServerName.getFirst() != RegionState.State.OPEN) {
161 LOG.warn("Meta region is in state " + stateAndServerName.getFirst());
163 locs[DEFAULT_REPLICA_ID] = new HRegionLocation(
164 getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO), stateAndServerName.getSecond());
165 tryComplete(remaining, locs, future);
167 } else {
168 addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> {
169 if (future.isDone()) {
170 return;
172 if (error != null) {
173 LOG.warn("Failed to fetch " + path, error);
174 locs[replicaId] = null;
175 } else if (proto == null) {
176 LOG.warn("Meta znode for replica " + replicaId + " is null");
177 locs[replicaId] = null;
178 } else {
179 Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto);
180 if (stateAndServerName.getFirst() != RegionState.State.OPEN) {
181 LOG.warn("Meta region for replica " + replicaId + " is in state " +
182 stateAndServerName.getFirst());
183 locs[replicaId] = null;
184 } else {
185 locs[replicaId] =
186 new HRegionLocation(getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId),
187 stateAndServerName.getSecond());
190 tryComplete(remaining, locs, future);
196 @Override
197 public CompletableFuture<RegionLocations> getMetaRegionLocations() {
198 return tracedFuture(() -> {
199 CompletableFuture<RegionLocations> future = new CompletableFuture<>();
200 addListener(
201 zk.list(znodePaths.baseZNode).thenApply(children -> children.stream()
202 .filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())),
203 (metaReplicaZNodes, error) -> {
204 if (error != null) {
205 future.completeExceptionally(error);
206 return;
208 getMetaRegionLocation(future, metaReplicaZNodes);
210 return future;
211 }, "ZKConnectionRegistry.getMetaRegionLocations");
214 private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOException {
215 if (data == null || data.length == 0) {
216 return null;
218 data = removeMetaData(data);
219 int prefixLen = lengthOfPBMagic();
220 return ZooKeeperProtos.Master.parser().parseFrom(data, prefixLen, data.length - prefixLen);
223 @Override
224 public CompletableFuture<ServerName> getActiveMaster() {
225 return tracedFuture(
226 () -> getAndConvert(znodePaths.masterAddressZNode, ZKConnectionRegistry::getMasterProto)
227 .thenApply(proto -> {
228 if (proto == null) {
229 return null;
231 HBaseProtos.ServerName snProto = proto.getMaster();
232 return ServerName.valueOf(snProto.getHostName(), snProto.getPort(),
233 snProto.getStartCode());
235 "ZKConnectionRegistry.getActiveMaster");
238 @Override
239 public String getConnectionString() {
240 final String serverList = zk.getConnectString();
241 final String baseZNode = znodePaths.baseZNode;
242 return serverList + ":" + baseZNode;
245 @Override
246 public void close() {
247 zk.close();