2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.apache
.hadoop
.hbase
.client
.trace
.hamcrest
.SpanDataMatchers
.hasEnded
;
21 import static org
.apache
.hadoop
.hbase
.client
.trace
.hamcrest
.SpanDataMatchers
.hasKind
;
22 import static org
.apache
.hadoop
.hbase
.client
.trace
.hamcrest
.SpanDataMatchers
.hasName
;
23 import static org
.apache
.hadoop
.hbase
.client
.trace
.hamcrest
.SpanDataMatchers
.hasStatusWithCode
;
24 import static org
.apache
.hadoop
.hbase
.client
.trace
.hamcrest
.TraceTestUtil
.buildConnectionAttributesMatcher
;
25 import static org
.apache
.hadoop
.hbase
.client
.trace
.hamcrest
.TraceTestUtil
.buildTableAttributesMatcher
;
26 import static org
.hamcrest
.MatcherAssert
.assertThat
;
27 import static org
.hamcrest
.Matchers
.allOf
;
28 import static org
.hamcrest
.Matchers
.containsString
;
29 import static org
.hamcrest
.Matchers
.hasItem
;
30 import static org
.junit
.Assert
.fail
;
31 import static org
.mockito
.ArgumentMatchers
.any
;
32 import static org
.mockito
.ArgumentMatchers
.anyInt
;
33 import static org
.mockito
.ArgumentMatchers
.anyLong
;
34 import static org
.mockito
.Mockito
.doAnswer
;
35 import static org
.mockito
.Mockito
.mock
;
36 import io
.opentelemetry
.api
.trace
.SpanKind
;
37 import io
.opentelemetry
.api
.trace
.StatusCode
;
38 import io
.opentelemetry
.sdk
.testing
.junit4
.OpenTelemetryRule
;
39 import io
.opentelemetry
.sdk
.trace
.data
.SpanData
;
40 import java
.io
.IOException
;
41 import java
.util
.Arrays
;
42 import java
.util
.concurrent
.CompletableFuture
;
43 import java
.util
.concurrent
.ForkJoinPool
;
44 import java
.util
.concurrent
.atomic
.AtomicInteger
;
45 import org
.apache
.hadoop
.conf
.Configuration
;
46 import org
.apache
.hadoop
.hbase
.Cell
;
47 import org
.apache
.hadoop
.hbase
.Cell
.Type
;
48 import org
.apache
.hadoop
.hbase
.CellBuilderFactory
;
49 import org
.apache
.hadoop
.hbase
.CellBuilderType
;
50 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
51 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
52 import org
.apache
.hadoop
.hbase
.HRegionLocation
;
53 import org
.apache
.hadoop
.hbase
.MatcherPredicate
;
54 import org
.apache
.hadoop
.hbase
.ServerName
;
55 import org
.apache
.hadoop
.hbase
.TableName
;
56 import org
.apache
.hadoop
.hbase
.Waiter
;
57 import org
.apache
.hadoop
.hbase
.filter
.PrefixFilter
;
58 import org
.apache
.hadoop
.hbase
.ipc
.HBaseRpcController
;
59 import org
.apache
.hadoop
.hbase
.security
.User
;
60 import org
.apache
.hadoop
.hbase
.security
.UserProvider
;
61 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
62 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
63 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
64 import org
.hamcrest
.Matcher
;
65 import org
.hamcrest
.core
.IsAnything
;
66 import org
.junit
.After
;
67 import org
.junit
.Before
;
68 import org
.junit
.ClassRule
;
69 import org
.junit
.Rule
;
70 import org
.junit
.Test
;
71 import org
.junit
.experimental
.categories
.Category
;
72 import org
.mockito
.invocation
.InvocationOnMock
;
73 import org
.mockito
.stubbing
.Answer
;
74 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.io
.Closeables
;
75 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.RpcCallback
;
76 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
77 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
;
78 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.ClientService
;
79 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.GetRequest
;
80 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.GetResponse
;
81 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.MutateRequest
;
82 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.MutateResponse
;
83 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.MutationProto
;
84 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.MutationProto
.ColumnValue
;
85 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.MutationProto
.ColumnValue
.QualifierValue
;
86 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.RegionActionResult
;
87 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.ResultOrException
;
88 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.ScanRequest
;
89 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.ScanResponse
;
91 @Category({ ClientTests
.class, MediumTests
.class })
92 public class TestAsyncTableTracing
{
95 public static final HBaseClassTestRule CLASS_RULE
=
96 HBaseClassTestRule
.forClass(TestAsyncTableTracing
.class);
98 private static Configuration CONF
= HBaseConfiguration
.create();
100 private ClientService
.Interface stub
;
102 private AsyncConnectionImpl conn
;
104 private AsyncTable
<?
> table
;
107 public OpenTelemetryRule traceRule
= OpenTelemetryRule
.create();
110 public void setUp() throws IOException
{
111 stub
= mock(ClientService
.Interface
.class);
112 AtomicInteger scanNextCalled
= new AtomicInteger(0);
113 doAnswer(new Answer
<Void
>() {
116 public Void
answer(InvocationOnMock invocation
) throws Throwable
{
117 ScanRequest req
= invocation
.getArgument(1);
118 RpcCallback
<ScanResponse
> done
= invocation
.getArgument(2);
119 if (!req
.hasScannerId()) {
120 done
.run(ScanResponse
.newBuilder().setScannerId(1).setTtl(800)
121 .setMoreResultsInRegion(true).setMoreResults(true).build());
123 if (req
.hasCloseScanner() && req
.getCloseScanner()) {
124 done
.run(ScanResponse
.getDefaultInstance());
126 Cell cell
= CellBuilderFactory
.create(CellBuilderType
.SHALLOW_COPY
).setType(Type
.Put
)
127 .setRow(Bytes
.toBytes(scanNextCalled
.incrementAndGet()))
128 .setFamily(Bytes
.toBytes("cf")).setQualifier(Bytes
.toBytes("cq"))
129 .setValue(Bytes
.toBytes("v")).build();
130 Result result
= Result
.create(Arrays
.asList(cell
));
131 ScanResponse
.Builder builder
= ScanResponse
.newBuilder().setScannerId(1).setTtl(800)
132 .addResults(ProtobufUtil
.toResult(result
));
133 if (req
.getLimitOfRows() == 1) {
134 builder
.setMoreResultsInRegion(false).setMoreResults(false);
136 builder
.setMoreResultsInRegion(true).setMoreResults(true);
138 ForkJoinPool
.commonPool().execute(() -> done
.run(builder
.build()));
143 }).when(stub
).scan(any(HBaseRpcController
.class), any(ScanRequest
.class), any());
144 doAnswer(new Answer
<Void
>() {
147 public Void
answer(InvocationOnMock invocation
) throws Throwable
{
148 ClientProtos
.MultiRequest req
= invocation
.getArgument(1);
149 ClientProtos
.MultiResponse
.Builder builder
= ClientProtos
.MultiResponse
.newBuilder();
150 for (ClientProtos
.RegionAction regionAction
: req
.getRegionActionList()) {
151 RegionActionResult
.Builder raBuilder
= RegionActionResult
.newBuilder();
152 for (ClientProtos
.Action ignored
: regionAction
.getActionList()) {
153 raBuilder
.addResultOrException(
154 ResultOrException
.newBuilder().setResult(ProtobufUtil
.toResult(new Result())));
156 builder
.addRegionActionResult(raBuilder
);
158 ClientProtos
.MultiResponse resp
= builder
.build();
159 RpcCallback
<ClientProtos
.MultiResponse
> done
= invocation
.getArgument(2);
160 ForkJoinPool
.commonPool().execute(() -> done
.run(resp
));
163 }).when(stub
).multi(any(HBaseRpcController
.class), any(ClientProtos
.MultiRequest
.class), any());
164 doAnswer(new Answer
<Void
>() {
167 public Void
answer(InvocationOnMock invocation
) throws Throwable
{
168 MutationProto req
= ((MutateRequest
) invocation
.getArgument(1)).getMutation();
170 switch (req
.getMutateType()) {
172 ColumnValue value
= req
.getColumnValue(0);
173 QualifierValue qvalue
= value
.getQualifierValue(0);
174 Cell cell
= CellBuilderFactory
.create(CellBuilderType
.SHALLOW_COPY
).setType(Type
.Put
)
175 .setRow(req
.getRow().toByteArray()).setFamily(value
.getFamily().toByteArray())
176 .setQualifier(qvalue
.getQualifier().toByteArray())
177 .setValue(qvalue
.getValue().toByteArray()).build();
178 resp
= MutateResponse
.newBuilder()
179 .setResult(ProtobufUtil
.toResult(Result
.create(Arrays
.asList(cell
)))).build();
182 resp
= MutateResponse
.getDefaultInstance();
185 RpcCallback
<MutateResponse
> done
= invocation
.getArgument(2);
186 ForkJoinPool
.commonPool().execute(() -> done
.run(resp
));
189 }).when(stub
).mutate(any(HBaseRpcController
.class), any(MutateRequest
.class), any());
190 doAnswer(new Answer
<Void
>() {
193 public Void
answer(InvocationOnMock invocation
) throws Throwable
{
194 RpcCallback
<GetResponse
> done
= invocation
.getArgument(2);
195 ForkJoinPool
.commonPool().execute(() -> done
.run(GetResponse
.getDefaultInstance()));
198 }).when(stub
).get(any(HBaseRpcController
.class), any(GetRequest
.class), any());
199 final User user
= UserProvider
.instantiate(CONF
).getCurrent();
200 conn
= new AsyncConnectionImpl(CONF
, new DoNothingConnectionRegistry(CONF
), "test", null,
204 AsyncRegionLocator
getLocator() {
205 AsyncRegionLocator locator
= mock(AsyncRegionLocator
.class);
206 Answer
<CompletableFuture
<HRegionLocation
>> answer
=
207 new Answer
<CompletableFuture
<HRegionLocation
>>() {
210 public CompletableFuture
<HRegionLocation
> answer(InvocationOnMock invocation
)
212 TableName tableName
= invocation
.getArgument(0);
213 RegionInfo info
= RegionInfoBuilder
.newBuilder(tableName
).build();
214 ServerName serverName
= ServerName
.valueOf("rs", 16010, 12345);
215 HRegionLocation loc
= new HRegionLocation(info
, serverName
);
216 return CompletableFuture
.completedFuture(loc
);
219 doAnswer(answer
).when(locator
).getRegionLocation(any(TableName
.class), any(byte[].class),
220 any(RegionLocateType
.class), anyLong());
221 doAnswer(answer
).when(locator
).getRegionLocation(any(TableName
.class), any(byte[].class),
222 anyInt(), any(RegionLocateType
.class), anyLong());
227 ClientService
.Interface
getRegionServerStub(ServerName serverName
) throws IOException
{
231 table
= conn
.getTable(TableName
.valueOf("table"), ForkJoinPool
.commonPool());
235 public void tearDown() throws IOException
{
236 Closeables
.close(conn
, true);
239 private void assertTrace(String tableOperation
) {
240 assertTrace(tableOperation
, new IsAnything
<>());
243 private void assertTrace(String tableOperation
, Matcher
<SpanData
> matcher
) {
244 final TableName tableName
= table
.getName();
245 final Matcher
<SpanData
> spanLocator
= allOf(
246 hasName(containsString(tableOperation
)), hasEnded());
247 final String expectedName
= tableOperation
+ " " + tableName
.getNameWithNamespaceInclAsString();
249 Waiter
.waitFor(CONF
, 1000, new MatcherPredicate
<>(
250 "waiting for span to emit",
251 () -> traceRule
.getSpans(), hasItem(spanLocator
)));
252 SpanData data
= traceRule
.getSpans()
254 .filter(spanLocator
::matches
)
256 .orElseThrow(AssertionError
::new);
257 assertThat(data
, allOf(
258 hasName(expectedName
),
259 hasKind(SpanKind
.CLIENT
),
260 hasStatusWithCode(StatusCode
.OK
),
261 buildConnectionAttributesMatcher(conn
),
262 buildTableAttributesMatcher(tableName
),
267 public void testExists() {
268 table
.exists(new Get(Bytes
.toBytes(0))).join();
273 public void testGet() {
274 table
.get(new Get(Bytes
.toBytes(0))).join();
279 public void testPut() {
280 table
.put(new Put(Bytes
.toBytes(0)).addColumn(Bytes
.toBytes("cf"), Bytes
.toBytes("cq"),
281 Bytes
.toBytes("v"))).join();
286 public void testDelete() {
287 table
.delete(new Delete(Bytes
.toBytes(0))).join();
288 assertTrace("DELETE");
292 public void testAppend() {
293 table
.append(new Append(Bytes
.toBytes(0)).addColumn(Bytes
.toBytes("cf"), Bytes
.toBytes("cq"),
294 Bytes
.toBytes("v"))).join();
295 assertTrace("APPEND");
299 public void testIncrement() {
302 new Increment(Bytes
.toBytes(0)).addColumn(Bytes
.toBytes("cf"), Bytes
.toBytes("cq"), 1))
304 assertTrace("INCREMENT");
308 public void testIncrementColumnValue1() {
309 table
.incrementColumnValue(Bytes
.toBytes(0), Bytes
.toBytes("cf"), Bytes
.toBytes("cq"), 1)
311 assertTrace("INCREMENT");
315 public void testIncrementColumnValue2() {
316 table
.incrementColumnValue(Bytes
.toBytes(0), Bytes
.toBytes("cf"), Bytes
.toBytes("cq"), 1,
317 Durability
.ASYNC_WAL
).join();
318 assertTrace("INCREMENT");
322 public void testCheckAndMutate() {
323 table
.checkAndMutate(CheckAndMutate
.newBuilder(Bytes
.toBytes(0))
324 .ifEquals(Bytes
.toBytes("cf"), Bytes
.toBytes("cq"), Bytes
.toBytes("v"))
325 .build(new Delete(Bytes
.toBytes(0)))).join();
326 assertTrace("CHECK_AND_MUTATE");
330 public void testCheckAndMutateList() {
332 .allOf(table
.checkAndMutate(Arrays
.asList(CheckAndMutate
.newBuilder(Bytes
.toBytes(0))
333 .ifEquals(Bytes
.toBytes("cf"), Bytes
.toBytes("cq"), Bytes
.toBytes("v"))
334 .build(new Delete(Bytes
.toBytes(0))))).toArray(new CompletableFuture
[0]))
336 assertTrace("BATCH");
340 public void testCheckAndMutateAll() {
341 table
.checkAndMutateAll(Arrays
.asList(CheckAndMutate
.newBuilder(Bytes
.toBytes(0))
342 .ifEquals(Bytes
.toBytes("cf"), Bytes
.toBytes("cq"), Bytes
.toBytes("v"))
343 .build(new Delete(Bytes
.toBytes(0))))).join();
344 assertTrace("BATCH");
347 private void testCheckAndMutateBuilder(Row op
) {
348 AsyncTable
.CheckAndMutateBuilder builder
=
349 table
.checkAndMutate(Bytes
.toBytes(0), Bytes
.toBytes("cf"))
350 .qualifier(Bytes
.toBytes("cq"))
351 .ifEquals(Bytes
.toBytes("v"));
352 if (op
instanceof Put
) {
354 builder
.thenPut(put
).join();
355 } else if (op
instanceof Delete
) {
356 Delete delete
= (Delete
) op
;
357 builder
.thenDelete(delete
).join();
358 } else if (op
instanceof RowMutations
) {
359 RowMutations mutations
= (RowMutations
) op
;
360 builder
.thenMutate(mutations
).join();
362 fail("unsupported CheckAndPut operation " + op
);
364 assertTrace("CHECK_AND_MUTATE");
368 public void testCheckAndMutateBuilderThenPut() {
369 Put put
= new Put(Bytes
.toBytes(0))
370 .addColumn(Bytes
.toBytes("f"), Bytes
.toBytes("cq"), Bytes
.toBytes("v"));
371 testCheckAndMutateBuilder(put
);
375 public void testCheckAndMutateBuilderThenDelete() {
376 testCheckAndMutateBuilder(new Delete(Bytes
.toBytes(0)));
380 public void testCheckAndMutateBuilderThenMutations() throws IOException
{
381 RowMutations mutations
= new RowMutations(Bytes
.toBytes(0))
382 .add(new Put(Bytes
.toBytes(0))
383 .addColumn(Bytes
.toBytes("f"), Bytes
.toBytes("cq"), Bytes
.toBytes("v")))
384 .add(new Delete(Bytes
.toBytes(0)));
385 testCheckAndMutateBuilder(mutations
);
388 private void testCheckAndMutateWithFilterBuilder(Row op
) {
389 // use of `PrefixFilter` is completely arbitrary here.
390 AsyncTable
.CheckAndMutateWithFilterBuilder builder
=
391 table
.checkAndMutate(Bytes
.toBytes(0), new PrefixFilter(Bytes
.toBytes(0)));
392 if (op
instanceof Put
) {
394 builder
.thenPut(put
).join();
395 } else if (op
instanceof Delete
) {
396 Delete delete
= (Delete
) op
;
397 builder
.thenDelete(delete
).join();
398 } else if (op
instanceof RowMutations
) {
399 RowMutations mutations
= (RowMutations
) op
;
400 builder
.thenMutate(mutations
).join();
402 fail("unsupported CheckAndPut operation " + op
);
404 assertTrace("CHECK_AND_MUTATE");
408 public void testCheckAndMutateWithFilterBuilderThenPut() {
409 Put put
= new Put(Bytes
.toBytes(0))
410 .addColumn(Bytes
.toBytes("f"), Bytes
.toBytes("cq"), Bytes
.toBytes("v"));
411 testCheckAndMutateWithFilterBuilder(put
);
415 public void testCheckAndMutateWithFilterBuilderThenDelete() {
416 testCheckAndMutateWithFilterBuilder(new Delete(Bytes
.toBytes(0)));
420 public void testCheckAndMutateWithFilterBuilderThenMutations() throws IOException
{
421 RowMutations mutations
= new RowMutations(Bytes
.toBytes(0))
422 .add(new Put(Bytes
.toBytes(0))
423 .addColumn(Bytes
.toBytes("f"), Bytes
.toBytes("cq"), Bytes
.toBytes("v")))
424 .add(new Delete(Bytes
.toBytes(0)));
425 testCheckAndMutateWithFilterBuilder(mutations
);
429 public void testMutateRow() throws IOException
{
430 table
.mutateRow(new RowMutations(Bytes
.toBytes(0)).add(new Delete(Bytes
.toBytes(0))));
431 assertTrace("BATCH");
435 public void testScanAll() {
436 table
.scanAll(new Scan().setCaching(1).setMaxResultSize(1).setLimit(1)).join();
441 public void testExistsList() {
444 table
.exists(Arrays
.asList(new Get(Bytes
.toBytes(0)))).toArray(new CompletableFuture
[0]))
446 assertTrace("BATCH");
450 public void testExistsAll() {
451 table
.existsAll(Arrays
.asList(new Get(Bytes
.toBytes(0)))).join();
452 assertTrace("BATCH");
456 public void testGetList() {
458 .allOf(table
.get(Arrays
.asList(new Get(Bytes
.toBytes(0)))).toArray(new CompletableFuture
[0]))
460 assertTrace("BATCH");
464 public void testGetAll() {
465 table
.getAll(Arrays
.asList(new Get(Bytes
.toBytes(0)))).join();
466 assertTrace("BATCH");
470 public void testPutList() {
472 .allOf(table
.put(Arrays
.asList(new Put(Bytes
.toBytes(0)).addColumn(Bytes
.toBytes("cf"),
473 Bytes
.toBytes("cq"), Bytes
.toBytes("v")))).toArray(new CompletableFuture
[0]))
475 assertTrace("BATCH");
479 public void testPutAll() {
480 table
.putAll(Arrays
.asList(new Put(Bytes
.toBytes(0)).addColumn(Bytes
.toBytes("cf"),
481 Bytes
.toBytes("cq"), Bytes
.toBytes("v")))).join();
482 assertTrace("BATCH");
486 public void testDeleteList() {
489 table
.delete(Arrays
.asList(new Delete(Bytes
.toBytes(0)))).toArray(new CompletableFuture
[0]))
491 assertTrace("BATCH");
495 public void testDeleteAll() {
496 table
.deleteAll(Arrays
.asList(new Delete(Bytes
.toBytes(0)))).join();
497 assertTrace("BATCH");
501 public void testBatch() {
504 table
.batch(Arrays
.asList(new Delete(Bytes
.toBytes(0)))).toArray(new CompletableFuture
[0]))
506 assertTrace("BATCH");
510 public void testBatchAll() {
511 table
.batchAll(Arrays
.asList(new Delete(Bytes
.toBytes(0)))).join();
512 assertTrace("BATCH");