2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.apache
.hadoop
.hbase
.client
.RegionInfo
.DEFAULT_REPLICA_ID
;
21 import static org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
.FIRST_META_REGIONINFO
;
22 import static org
.apache
.hadoop
.hbase
.client
.RegionReplicaUtil
.getRegionInfoForDefaultReplica
;
23 import static org
.apache
.hadoop
.hbase
.client
.RegionReplicaUtil
.getRegionInfoForReplica
;
24 import static org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
.lengthOfPBMagic
;
25 import static org
.apache
.hadoop
.hbase
.trace
.TraceUtil
.tracedFuture
;
26 import static org
.apache
.hadoop
.hbase
.util
.FutureUtils
.addListener
;
27 import static org
.apache
.hadoop
.hbase
.zookeeper
.ZKMetadata
.removeMetaData
;
29 import java
.io
.IOException
;
30 import java
.util
.List
;
31 import java
.util
.concurrent
.CompletableFuture
;
32 import java
.util
.stream
.Collectors
;
33 import org
.apache
.commons
.lang3
.mutable
.MutableInt
;
34 import org
.apache
.hadoop
.conf
.Configuration
;
35 import org
.apache
.hadoop
.hbase
.ClusterId
;
36 import org
.apache
.hadoop
.hbase
.HRegionLocation
;
37 import org
.apache
.hadoop
.hbase
.RegionLocations
;
38 import org
.apache
.hadoop
.hbase
.ServerName
;
39 import org
.apache
.hadoop
.hbase
.exceptions
.DeserializationException
;
40 import org
.apache
.hadoop
.hbase
.master
.RegionState
;
41 import org
.apache
.hadoop
.hbase
.util
.Pair
;
42 import org
.apache
.hadoop
.hbase
.zookeeper
.ReadOnlyZKClient
;
43 import org
.apache
.hadoop
.hbase
.zookeeper
.ZNodePaths
;
44 import org
.apache
.yetus
.audience
.InterfaceAudience
;
45 import org
.slf4j
.Logger
;
46 import org
.slf4j
.LoggerFactory
;
48 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
;
49 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ZooKeeperProtos
;
52 * Zookeeper based registry implementation.
54 @InterfaceAudience.Private
55 class ZKConnectionRegistry
implements ConnectionRegistry
{
57 private static final Logger LOG
= LoggerFactory
.getLogger(ZKConnectionRegistry
.class);
59 private final ReadOnlyZKClient zk
;
61 private final ZNodePaths znodePaths
;
63 ZKConnectionRegistry(Configuration conf
) {
64 this.znodePaths
= new ZNodePaths(conf
);
65 this.zk
= new ReadOnlyZKClient(conf
);
68 private interface Converter
<T
> {
69 T
convert(byte[] data
) throws Exception
;
72 private <T
> CompletableFuture
<T
> getAndConvert(String path
, Converter
<T
> converter
) {
73 CompletableFuture
<T
> future
= new CompletableFuture
<>();
74 addListener(zk
.get(path
), (data
, error
) -> {
76 future
.completeExceptionally(error
);
80 future
.complete(converter
.convert(data
));
81 } catch (Exception e
) {
82 future
.completeExceptionally(e
);
88 private static String
getClusterId(byte[] data
) throws DeserializationException
{
89 if (data
== null || data
.length
== 0) {
92 data
= removeMetaData(data
);
93 return ClusterId
.parseFrom(data
).toString();
97 public CompletableFuture
<String
> getClusterId() {
99 () -> getAndConvert(znodePaths
.clusterIdZNode
, ZKConnectionRegistry
::getClusterId
),
100 "ZKConnectionRegistry.getClusterId");
103 ReadOnlyZKClient
getZKClient() {
107 private static ZooKeeperProtos
.MetaRegionServer
getMetaProto(byte[] data
) throws IOException
{
108 if (data
== null || data
.length
== 0) {
111 data
= removeMetaData(data
);
112 int prefixLen
= lengthOfPBMagic();
113 return ZooKeeperProtos
.MetaRegionServer
.parser().parseFrom(data
, prefixLen
,
114 data
.length
- prefixLen
);
117 private static void tryComplete(MutableInt remaining
, HRegionLocation
[] locs
,
118 CompletableFuture
<RegionLocations
> future
) {
119 remaining
.decrement();
120 if (remaining
.intValue() > 0) {
123 future
.complete(new RegionLocations(locs
));
126 private Pair
<RegionState
.State
, ServerName
> getStateAndServerName(
127 ZooKeeperProtos
.MetaRegionServer proto
) {
128 RegionState
.State state
;
129 if (proto
.hasState()) {
130 state
= RegionState
.State
.convert(proto
.getState());
132 state
= RegionState
.State
.OPEN
;
134 HBaseProtos
.ServerName snProto
= proto
.getServer();
135 return Pair
.newPair(state
,
136 ServerName
.valueOf(snProto
.getHostName(), snProto
.getPort(), snProto
.getStartCode()));
139 private void getMetaRegionLocation(CompletableFuture
<RegionLocations
> future
,
140 List
<String
> metaReplicaZNodes
) {
141 if (metaReplicaZNodes
.isEmpty()) {
142 future
.completeExceptionally(new IOException("No meta znode available"));
144 HRegionLocation
[] locs
= new HRegionLocation
[metaReplicaZNodes
.size()];
145 MutableInt remaining
= new MutableInt(locs
.length
);
146 for (String metaReplicaZNode
: metaReplicaZNodes
) {
147 int replicaId
= znodePaths
.getMetaReplicaIdFromZNode(metaReplicaZNode
);
148 String path
= ZNodePaths
.joinZNode(znodePaths
.baseZNode
, metaReplicaZNode
);
149 if (replicaId
== DEFAULT_REPLICA_ID
) {
150 addListener(getAndConvert(path
, ZKConnectionRegistry
::getMetaProto
), (proto
, error
) -> {
152 future
.completeExceptionally(error
);
156 future
.completeExceptionally(new IOException("Meta znode is null"));
159 Pair
<RegionState
.State
, ServerName
> stateAndServerName
= getStateAndServerName(proto
);
160 if (stateAndServerName
.getFirst() != RegionState
.State
.OPEN
) {
161 LOG
.warn("Meta region is in state " + stateAndServerName
.getFirst());
163 locs
[DEFAULT_REPLICA_ID
] = new HRegionLocation(
164 getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO
), stateAndServerName
.getSecond());
165 tryComplete(remaining
, locs
, future
);
168 addListener(getAndConvert(path
, ZKConnectionRegistry
::getMetaProto
), (proto
, error
) -> {
169 if (future
.isDone()) {
173 LOG
.warn("Failed to fetch " + path
, error
);
174 locs
[replicaId
] = null;
175 } else if (proto
== null) {
176 LOG
.warn("Meta znode for replica " + replicaId
+ " is null");
177 locs
[replicaId
] = null;
179 Pair
<RegionState
.State
, ServerName
> stateAndServerName
= getStateAndServerName(proto
);
180 if (stateAndServerName
.getFirst() != RegionState
.State
.OPEN
) {
181 LOG
.warn("Meta region for replica " + replicaId
+ " is in state " +
182 stateAndServerName
.getFirst());
183 locs
[replicaId
] = null;
186 new HRegionLocation(getRegionInfoForReplica(FIRST_META_REGIONINFO
, replicaId
),
187 stateAndServerName
.getSecond());
190 tryComplete(remaining
, locs
, future
);
197 public CompletableFuture
<RegionLocations
> getMetaRegionLocations() {
198 return tracedFuture(() -> {
199 CompletableFuture
<RegionLocations
> future
= new CompletableFuture
<>();
201 zk
.list(znodePaths
.baseZNode
).thenApply(children
-> children
.stream()
202 .filter(c
-> this.znodePaths
.isMetaZNodePrefix(c
)).collect(Collectors
.toList())),
203 (metaReplicaZNodes
, error
) -> {
205 future
.completeExceptionally(error
);
208 getMetaRegionLocation(future
, metaReplicaZNodes
);
211 }, "ZKConnectionRegistry.getMetaRegionLocations");
214 private static ZooKeeperProtos
.Master
getMasterProto(byte[] data
) throws IOException
{
215 if (data
== null || data
.length
== 0) {
218 data
= removeMetaData(data
);
219 int prefixLen
= lengthOfPBMagic();
220 return ZooKeeperProtos
.Master
.parser().parseFrom(data
, prefixLen
, data
.length
- prefixLen
);
224 public CompletableFuture
<ServerName
> getActiveMaster() {
226 () -> getAndConvert(znodePaths
.masterAddressZNode
, ZKConnectionRegistry
::getMasterProto
)
227 .thenApply(proto
-> {
231 HBaseProtos
.ServerName snProto
= proto
.getMaster();
232 return ServerName
.valueOf(snProto
.getHostName(), snProto
.getPort(),
233 snProto
.getStartCode());
235 "ZKConnectionRegistry.getActiveMaster");
239 public String
getConnectionString() {
240 final String serverList
= zk
.getConnectString();
241 final String baseZNode
= znodePaths
.baseZNode
;
242 return serverList
+ ":" + baseZNode
;
246 public void close() {