HBASE-26474 Implement connection-level attributes (addendum)
[hbase.git] / hbase-client / src / test / java / org / apache / hadoop / hbase / client / TestAsyncTableTracing.java
blobd8a645349cb077c64c32225582f8a32704e67056
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
21 import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
22 import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
23 import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
24 import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildConnectionAttributesMatcher;
25 import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildTableAttributesMatcher;
26 import static org.hamcrest.MatcherAssert.assertThat;
27 import static org.hamcrest.Matchers.allOf;
28 import static org.hamcrest.Matchers.containsString;
29 import static org.hamcrest.Matchers.hasItem;
30 import static org.junit.Assert.fail;
31 import static org.mockito.ArgumentMatchers.any;
32 import static org.mockito.ArgumentMatchers.anyInt;
33 import static org.mockito.ArgumentMatchers.anyLong;
34 import static org.mockito.Mockito.doAnswer;
35 import static org.mockito.Mockito.mock;
36 import io.opentelemetry.api.trace.SpanKind;
37 import io.opentelemetry.api.trace.StatusCode;
38 import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
39 import io.opentelemetry.sdk.trace.data.SpanData;
40 import java.io.IOException;
41 import java.util.Arrays;
42 import java.util.concurrent.CompletableFuture;
43 import java.util.concurrent.ForkJoinPool;
44 import java.util.concurrent.atomic.AtomicInteger;
45 import org.apache.hadoop.conf.Configuration;
46 import org.apache.hadoop.hbase.Cell;
47 import org.apache.hadoop.hbase.Cell.Type;
48 import org.apache.hadoop.hbase.CellBuilderFactory;
49 import org.apache.hadoop.hbase.CellBuilderType;
50 import org.apache.hadoop.hbase.HBaseClassTestRule;
51 import org.apache.hadoop.hbase.HBaseConfiguration;
52 import org.apache.hadoop.hbase.HRegionLocation;
53 import org.apache.hadoop.hbase.MatcherPredicate;
54 import org.apache.hadoop.hbase.ServerName;
55 import org.apache.hadoop.hbase.TableName;
56 import org.apache.hadoop.hbase.Waiter;
57 import org.apache.hadoop.hbase.filter.PrefixFilter;
58 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
59 import org.apache.hadoop.hbase.security.User;
60 import org.apache.hadoop.hbase.security.UserProvider;
61 import org.apache.hadoop.hbase.testclassification.ClientTests;
62 import org.apache.hadoop.hbase.testclassification.MediumTests;
63 import org.apache.hadoop.hbase.util.Bytes;
64 import org.hamcrest.Matcher;
65 import org.hamcrest.core.IsAnything;
66 import org.junit.After;
67 import org.junit.Before;
68 import org.junit.ClassRule;
69 import org.junit.Rule;
70 import org.junit.Test;
71 import org.junit.experimental.categories.Category;
72 import org.mockito.invocation.InvocationOnMock;
73 import org.mockito.stubbing.Answer;
74 import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
75 import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
76 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
77 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
78 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
79 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
80 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
81 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
82 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
83 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
84 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
85 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue;
86 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
87 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
88 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
89 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
91 @Category({ ClientTests.class, MediumTests.class })
92 public class TestAsyncTableTracing {
94 @ClassRule
95 public static final HBaseClassTestRule CLASS_RULE =
96 HBaseClassTestRule.forClass(TestAsyncTableTracing.class);
98 private static Configuration CONF = HBaseConfiguration.create();
100 private ClientService.Interface stub;
102 private AsyncConnectionImpl conn;
104 private AsyncTable<?> table;
106 @Rule
107 public OpenTelemetryRule traceRule = OpenTelemetryRule.create();
109 @Before
110 public void setUp() throws IOException {
111 stub = mock(ClientService.Interface.class);
112 AtomicInteger scanNextCalled = new AtomicInteger(0);
113 doAnswer(new Answer<Void>() {
115 @Override
116 public Void answer(InvocationOnMock invocation) throws Throwable {
117 ScanRequest req = invocation.getArgument(1);
118 RpcCallback<ScanResponse> done = invocation.getArgument(2);
119 if (!req.hasScannerId()) {
120 done.run(ScanResponse.newBuilder().setScannerId(1).setTtl(800)
121 .setMoreResultsInRegion(true).setMoreResults(true).build());
122 } else {
123 if (req.hasCloseScanner() && req.getCloseScanner()) {
124 done.run(ScanResponse.getDefaultInstance());
125 } else {
126 Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put)
127 .setRow(Bytes.toBytes(scanNextCalled.incrementAndGet()))
128 .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq"))
129 .setValue(Bytes.toBytes("v")).build();
130 Result result = Result.create(Arrays.asList(cell));
131 ScanResponse.Builder builder = ScanResponse.newBuilder().setScannerId(1).setTtl(800)
132 .addResults(ProtobufUtil.toResult(result));
133 if (req.getLimitOfRows() == 1) {
134 builder.setMoreResultsInRegion(false).setMoreResults(false);
135 } else {
136 builder.setMoreResultsInRegion(true).setMoreResults(true);
138 ForkJoinPool.commonPool().execute(() -> done.run(builder.build()));
141 return null;
143 }).when(stub).scan(any(HBaseRpcController.class), any(ScanRequest.class), any());
144 doAnswer(new Answer<Void>() {
146 @Override
147 public Void answer(InvocationOnMock invocation) throws Throwable {
148 ClientProtos.MultiRequest req = invocation.getArgument(1);
149 ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder();
150 for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
151 RegionActionResult.Builder raBuilder = RegionActionResult.newBuilder();
152 for (ClientProtos.Action ignored : regionAction.getActionList()) {
153 raBuilder.addResultOrException(
154 ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result())));
156 builder.addRegionActionResult(raBuilder);
158 ClientProtos.MultiResponse resp = builder.build();
159 RpcCallback<ClientProtos.MultiResponse> done = invocation.getArgument(2);
160 ForkJoinPool.commonPool().execute(() -> done.run(resp));
161 return null;
163 }).when(stub).multi(any(HBaseRpcController.class), any(ClientProtos.MultiRequest.class), any());
164 doAnswer(new Answer<Void>() {
166 @Override
167 public Void answer(InvocationOnMock invocation) throws Throwable {
168 MutationProto req = ((MutateRequest) invocation.getArgument(1)).getMutation();
169 MutateResponse resp;
170 switch (req.getMutateType()) {
171 case INCREMENT:
172 ColumnValue value = req.getColumnValue(0);
173 QualifierValue qvalue = value.getQualifierValue(0);
174 Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put)
175 .setRow(req.getRow().toByteArray()).setFamily(value.getFamily().toByteArray())
176 .setQualifier(qvalue.getQualifier().toByteArray())
177 .setValue(qvalue.getValue().toByteArray()).build();
178 resp = MutateResponse.newBuilder()
179 .setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell)))).build();
180 break;
181 default:
182 resp = MutateResponse.getDefaultInstance();
183 break;
185 RpcCallback<MutateResponse> done = invocation.getArgument(2);
186 ForkJoinPool.commonPool().execute(() -> done.run(resp));
187 return null;
189 }).when(stub).mutate(any(HBaseRpcController.class), any(MutateRequest.class), any());
190 doAnswer(new Answer<Void>() {
192 @Override
193 public Void answer(InvocationOnMock invocation) throws Throwable {
194 RpcCallback<GetResponse> done = invocation.getArgument(2);
195 ForkJoinPool.commonPool().execute(() -> done.run(GetResponse.getDefaultInstance()));
196 return null;
198 }).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any());
199 final User user = UserProvider.instantiate(CONF).getCurrent();
200 conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF), "test", null,
201 user) {
203 @Override
204 AsyncRegionLocator getLocator() {
205 AsyncRegionLocator locator = mock(AsyncRegionLocator.class);
206 Answer<CompletableFuture<HRegionLocation>> answer =
207 new Answer<CompletableFuture<HRegionLocation>>() {
209 @Override
210 public CompletableFuture<HRegionLocation> answer(InvocationOnMock invocation)
211 throws Throwable {
212 TableName tableName = invocation.getArgument(0);
213 RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
214 ServerName serverName = ServerName.valueOf("rs", 16010, 12345);
215 HRegionLocation loc = new HRegionLocation(info, serverName);
216 return CompletableFuture.completedFuture(loc);
219 doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class),
220 any(RegionLocateType.class), anyLong());
221 doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class),
222 anyInt(), any(RegionLocateType.class), anyLong());
223 return locator;
226 @Override
227 ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException {
228 return stub;
231 table = conn.getTable(TableName.valueOf("table"), ForkJoinPool.commonPool());
234 @After
235 public void tearDown() throws IOException {
236 Closeables.close(conn, true);
239 private void assertTrace(String tableOperation) {
240 assertTrace(tableOperation, new IsAnything<>());
243 private void assertTrace(String tableOperation, Matcher<SpanData> matcher) {
244 final TableName tableName = table.getName();
245 final Matcher<SpanData> spanLocator = allOf(
246 hasName(containsString(tableOperation)), hasEnded());
247 final String expectedName = tableOperation + " " + tableName.getNameWithNamespaceInclAsString();
249 Waiter.waitFor(CONF, 1000, new MatcherPredicate<>(
250 "waiting for span to emit",
251 () -> traceRule.getSpans(), hasItem(spanLocator)));
252 SpanData data = traceRule.getSpans()
253 .stream()
254 .filter(spanLocator::matches)
255 .findFirst()
256 .orElseThrow(AssertionError::new);
257 assertThat(data, allOf(
258 hasName(expectedName),
259 hasKind(SpanKind.CLIENT),
260 hasStatusWithCode(StatusCode.OK),
261 buildConnectionAttributesMatcher(conn),
262 buildTableAttributesMatcher(tableName),
263 matcher));
266 @Test
267 public void testExists() {
268 table.exists(new Get(Bytes.toBytes(0))).join();
269 assertTrace("GET");
272 @Test
273 public void testGet() {
274 table.get(new Get(Bytes.toBytes(0))).join();
275 assertTrace("GET");
278 @Test
279 public void testPut() {
280 table.put(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
281 Bytes.toBytes("v"))).join();
282 assertTrace("PUT");
285 @Test
286 public void testDelete() {
287 table.delete(new Delete(Bytes.toBytes(0))).join();
288 assertTrace("DELETE");
291 @Test
292 public void testAppend() {
293 table.append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
294 Bytes.toBytes("v"))).join();
295 assertTrace("APPEND");
298 @Test
299 public void testIncrement() {
300 table
301 .increment(
302 new Increment(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1))
303 .join();
304 assertTrace("INCREMENT");
307 @Test
308 public void testIncrementColumnValue1() {
309 table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1)
310 .join();
311 assertTrace("INCREMENT");
314 @Test
315 public void testIncrementColumnValue2() {
316 table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
317 Durability.ASYNC_WAL).join();
318 assertTrace("INCREMENT");
321 @Test
322 public void testCheckAndMutate() {
323 table.checkAndMutate(CheckAndMutate.newBuilder(Bytes.toBytes(0))
324 .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
325 .build(new Delete(Bytes.toBytes(0)))).join();
326 assertTrace("CHECK_AND_MUTATE");
329 @Test
330 public void testCheckAndMutateList() {
331 CompletableFuture
332 .allOf(table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
333 .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
334 .build(new Delete(Bytes.toBytes(0))))).toArray(new CompletableFuture[0]))
335 .join();
336 assertTrace("BATCH");
339 @Test
340 public void testCheckAndMutateAll() {
341 table.checkAndMutateAll(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
342 .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
343 .build(new Delete(Bytes.toBytes(0))))).join();
344 assertTrace("BATCH");
347 private void testCheckAndMutateBuilder(Row op) {
348 AsyncTable.CheckAndMutateBuilder builder =
349 table.checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf"))
350 .qualifier(Bytes.toBytes("cq"))
351 .ifEquals(Bytes.toBytes("v"));
352 if (op instanceof Put) {
353 Put put = (Put) op;
354 builder.thenPut(put).join();
355 } else if (op instanceof Delete) {
356 Delete delete = (Delete) op;
357 builder.thenDelete(delete).join();
358 } else if (op instanceof RowMutations) {
359 RowMutations mutations = (RowMutations) op;
360 builder.thenMutate(mutations).join();
361 } else {
362 fail("unsupported CheckAndPut operation " + op);
364 assertTrace("CHECK_AND_MUTATE");
367 @Test
368 public void testCheckAndMutateBuilderThenPut() {
369 Put put = new Put(Bytes.toBytes(0))
370 .addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), Bytes.toBytes("v"));
371 testCheckAndMutateBuilder(put);
374 @Test
375 public void testCheckAndMutateBuilderThenDelete() {
376 testCheckAndMutateBuilder(new Delete(Bytes.toBytes(0)));
379 @Test
380 public void testCheckAndMutateBuilderThenMutations() throws IOException {
381 RowMutations mutations = new RowMutations(Bytes.toBytes(0))
382 .add(new Put(Bytes.toBytes(0))
383 .addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
384 .add(new Delete(Bytes.toBytes(0)));
385 testCheckAndMutateBuilder(mutations);
388 private void testCheckAndMutateWithFilterBuilder(Row op) {
389 // use of `PrefixFilter` is completely arbitrary here.
390 AsyncTable.CheckAndMutateWithFilterBuilder builder =
391 table.checkAndMutate(Bytes.toBytes(0), new PrefixFilter(Bytes.toBytes(0)));
392 if (op instanceof Put) {
393 Put put = (Put) op;
394 builder.thenPut(put).join();
395 } else if (op instanceof Delete) {
396 Delete delete = (Delete) op;
397 builder.thenDelete(delete).join();
398 } else if (op instanceof RowMutations) {
399 RowMutations mutations = (RowMutations) op;
400 builder.thenMutate(mutations).join();
401 } else {
402 fail("unsupported CheckAndPut operation " + op);
404 assertTrace("CHECK_AND_MUTATE");
407 @Test
408 public void testCheckAndMutateWithFilterBuilderThenPut() {
409 Put put = new Put(Bytes.toBytes(0))
410 .addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), Bytes.toBytes("v"));
411 testCheckAndMutateWithFilterBuilder(put);
414 @Test
415 public void testCheckAndMutateWithFilterBuilderThenDelete() {
416 testCheckAndMutateWithFilterBuilder(new Delete(Bytes.toBytes(0)));
419 @Test
420 public void testCheckAndMutateWithFilterBuilderThenMutations() throws IOException {
421 RowMutations mutations = new RowMutations(Bytes.toBytes(0))
422 .add(new Put(Bytes.toBytes(0))
423 .addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
424 .add(new Delete(Bytes.toBytes(0)));
425 testCheckAndMutateWithFilterBuilder(mutations);
428 @Test
429 public void testMutateRow() throws IOException {
430 table.mutateRow(new RowMutations(Bytes.toBytes(0)).add(new Delete(Bytes.toBytes(0))));
431 assertTrace("BATCH");
434 @Test
435 public void testScanAll() {
436 table.scanAll(new Scan().setCaching(1).setMaxResultSize(1).setLimit(1)).join();
437 assertTrace("SCAN");
440 @Test
441 public void testExistsList() {
442 CompletableFuture
443 .allOf(
444 table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
445 .join();
446 assertTrace("BATCH");
449 @Test
450 public void testExistsAll() {
451 table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
452 assertTrace("BATCH");
455 @Test
456 public void testGetList() {
457 CompletableFuture
458 .allOf(table.get(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
459 .join();
460 assertTrace("BATCH");
463 @Test
464 public void testGetAll() {
465 table.getAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
466 assertTrace("BATCH");
469 @Test
470 public void testPutList() {
471 CompletableFuture
472 .allOf(table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
473 Bytes.toBytes("cq"), Bytes.toBytes("v")))).toArray(new CompletableFuture[0]))
474 .join();
475 assertTrace("BATCH");
478 @Test
479 public void testPutAll() {
480 table.putAll(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
481 Bytes.toBytes("cq"), Bytes.toBytes("v")))).join();
482 assertTrace("BATCH");
485 @Test
486 public void testDeleteList() {
487 CompletableFuture
488 .allOf(
489 table.delete(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
490 .join();
491 assertTrace("BATCH");
494 @Test
495 public void testDeleteAll() {
496 table.deleteAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
497 assertTrace("BATCH");
500 @Test
501 public void testBatch() {
502 CompletableFuture
503 .allOf(
504 table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
505 .join();
506 assertTrace("BATCH");
509 @Test
510 public void testBatchAll() {
511 table.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
512 assertTrace("BATCH");