2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
;
20 import java
.util
.List
;
21 import java
.util
.Objects
;
22 import java
.util
.StringJoiner
;
23 import java
.util
.concurrent
.CompletableFuture
;
24 import java
.util
.function
.Supplier
;
25 import java
.util
.stream
.Collectors
;
26 import org
.apache
.hadoop
.hbase
.client
.AsyncAdmin
;
27 import org
.apache
.hadoop
.hbase
.client
.AsyncConnection
;
28 import org
.junit
.ClassRule
;
29 import org
.junit
.Rule
;
30 import org
.junit
.rules
.ExternalResource
;
31 import org
.junit
.rules
.TestRule
;
32 import org
.slf4j
.Logger
;
33 import org
.slf4j
.LoggerFactory
;
36 * A {@link TestRule} that clears all user namespaces and tables
37 * {@link ExternalResource#before() before} the test executes. Can be used in either the
38 * {@link Rule} or {@link ClassRule} positions. Lazily realizes the provided
39 * {@link AsyncConnection} so as to avoid initialization races with other {@link Rule Rules}.
40 * <b>Does not</b> {@link AsyncConnection#close() close()} provided connection instance when
43 * Use in combination with {@link MiniClusterRule} and {@link ConnectionRule}, for example:
46 * public class TestMyClass {
48 * public static final MiniClusterRule miniClusterRule = new MiniClusterRule();
50 * private final ConnectionRule connectionRule =
51 * new ConnectionRule(miniClusterRule::createConnection);
52 * private final ClearUserNamespacesAndTablesRule clearUserNamespacesAndTablesRule =
53 * new ClearUserNamespacesAndTablesRule(connectionRule::getConnection);
56 * public TestRule rule = RuleChain
57 * .outerRule(connectionRule)
58 * .around(clearUserNamespacesAndTablesRule);
62 public class ClearUserNamespacesAndTablesRule
extends ExternalResource
{
63 private static final Logger logger
=
64 LoggerFactory
.getLogger(ClearUserNamespacesAndTablesRule
.class);
66 private final Supplier
<AsyncConnection
> connectionSupplier
;
67 private AsyncAdmin admin
;
69 public ClearUserNamespacesAndTablesRule(final Supplier
<AsyncConnection
> connectionSupplier
) {
70 this.connectionSupplier
= connectionSupplier
;
74 protected void before() throws Throwable
{
75 final AsyncConnection connection
= Objects
.requireNonNull(connectionSupplier
.get());
76 admin
= connection
.getAdmin();
78 clearTablesAndNamespaces().join();
81 private CompletableFuture
<Void
> clearTablesAndNamespaces() {
82 return deleteUserTables().thenCompose(_void
-> deleteUserNamespaces());
85 private CompletableFuture
<Void
> deleteUserTables() {
86 return listTableNames()
87 .thenApply(tableNames
-> tableNames
.stream()
88 .map(tableName
-> disableIfEnabled(tableName
).thenCompose(_void
-> deleteTable(tableName
)))
89 .toArray(CompletableFuture
[]::new))
90 .thenCompose(CompletableFuture
::allOf
);
93 private CompletableFuture
<List
<TableName
>> listTableNames() {
94 return CompletableFuture
95 .runAsync(() -> logger
.trace("listing tables"))
96 .thenCompose(_void
-> admin
.listTableNames(false))
97 .thenApply(tableNames
-> {
98 if (logger
.isTraceEnabled()) {
99 final StringJoiner joiner
= new StringJoiner(", ", "[", "]");
100 tableNames
.stream().map(TableName
::getNameAsString
).forEach(joiner
::add
);
101 logger
.trace("found existing tables {}", joiner
.toString());
107 private CompletableFuture
<Boolean
> isTableEnabled(final TableName tableName
) {
108 return admin
.isTableEnabled(tableName
)
109 .thenApply(isEnabled
-> {
110 logger
.trace("table {} is enabled.", tableName
);
115 private CompletableFuture
<Void
> disableIfEnabled(final TableName tableName
) {
116 return isTableEnabled(tableName
)
117 .thenCompose(isEnabled
-> isEnabled
118 ?
disableTable(tableName
)
119 : CompletableFuture
.completedFuture(null));
122 private CompletableFuture
<Void
> disableTable(final TableName tableName
) {
123 return CompletableFuture
124 .runAsync(() -> logger
.trace("disabling enabled table {}", tableName
))
125 .thenCompose(_void
-> admin
.disableTable(tableName
));
128 private CompletableFuture
<Void
> deleteTable(final TableName tableName
) {
129 return CompletableFuture
130 .runAsync(() -> logger
.trace("deleting disabled table {}", tableName
))
131 .thenCompose(_void
-> admin
.deleteTable(tableName
));
134 private CompletableFuture
<List
<String
>> listUserNamespaces() {
135 return CompletableFuture
136 .runAsync(() -> logger
.trace("listing namespaces"))
137 .thenCompose(_void
-> admin
.listNamespaceDescriptors())
138 .thenApply(namespaceDescriptors
-> {
139 final StringJoiner joiner
= new StringJoiner(", ", "[", "]");
140 final List
<String
> names
= namespaceDescriptors
.stream()
141 .map(NamespaceDescriptor
::getName
)
143 .collect(Collectors
.toList());
144 logger
.trace("found existing namespaces {}", joiner
);
147 .thenApply(namespaces
-> namespaces
.stream()
148 .filter(namespace
-> !Objects
.equals(
149 namespace
, NamespaceDescriptor
.SYSTEM_NAMESPACE
.getName()))
150 .filter(namespace
-> !Objects
.equals(
151 namespace
, NamespaceDescriptor
.DEFAULT_NAMESPACE
.getName()))
152 .collect(Collectors
.toList()));
155 private CompletableFuture
<Void
> deleteNamespace(final String namespace
) {
156 return CompletableFuture
157 .runAsync(() -> logger
.trace("deleting namespace {}", namespace
))
158 .thenCompose(_void
-> admin
.deleteNamespace(namespace
));
161 private CompletableFuture
<Void
> deleteUserNamespaces() {
162 return listUserNamespaces()
163 .thenCompose(namespaces
-> CompletableFuture
.allOf(namespaces
.stream()
164 .map(this::deleteNamespace
)
165 .toArray(CompletableFuture
[]::new)));