2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.apache
.hadoop
.hbase
.client
.trace
.hamcrest
.SpanDataMatchers
.hasEnded
;
21 import static org
.apache
.hadoop
.hbase
.client
.trace
.hamcrest
.SpanDataMatchers
.hasExceptionWithType
;
22 import static org
.apache
.hadoop
.hbase
.client
.trace
.hamcrest
.SpanDataMatchers
.hasName
;
23 import static org
.apache
.hadoop
.hbase
.client
.trace
.hamcrest
.SpanDataMatchers
.hasParentSpanId
;
24 import static org
.apache
.hadoop
.hbase
.client
.trace
.hamcrest
.SpanDataMatchers
.hasStatusWithCode
;
25 import static org
.hamcrest
.MatcherAssert
.assertThat
;
26 import static org
.hamcrest
.Matchers
.allOf
;
27 import static org
.hamcrest
.Matchers
.hasItem
;
28 import static org
.hamcrest
.Matchers
.startsWith
;
29 import io
.opentelemetry
.api
.trace
.StatusCode
;
30 import io
.opentelemetry
.sdk
.trace
.data
.SpanData
;
31 import java
.util
.List
;
32 import java
.util
.Objects
;
33 import java
.util
.concurrent
.ForkJoinPool
;
34 import java
.util
.function
.Supplier
;
35 import java
.util
.stream
.Collectors
;
36 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
37 import org
.apache
.hadoop
.hbase
.client
.trace
.StringTraceRenderer
;
38 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
39 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
40 import org
.hamcrest
.Matcher
;
41 import org
.junit
.ClassRule
;
42 import org
.junit
.experimental
.categories
.Category
;
43 import org
.junit
.runner
.RunWith
;
44 import org
.junit
.runners
.Parameterized
;
45 import org
.junit
.runners
.Parameterized
.Parameter
;
46 import org
.junit
.runners
.Parameterized
.Parameters
;
47 import org
.slf4j
.Logger
;
48 import org
.slf4j
.LoggerFactory
;
50 @RunWith(Parameterized
.class)
51 @Category({ LargeTests
.class, ClientTests
.class })
52 public class TestAsyncTableScan
extends AbstractTestAsyncTableScan
{
53 private static final Logger logger
= LoggerFactory
.getLogger(TestAsyncTableScan
.class);
56 public static final HBaseClassTestRule CLASS_RULE
=
57 HBaseClassTestRule
.forClass(TestAsyncTableScan
.class);
60 public String scanType
;
63 public Supplier
<Scan
> scanCreater
;
65 @Parameters(name
= "{index}: scan={0}")
66 public static List
<Object
[]> params() {
67 return getScanCreatorParams();
71 protected Scan
createScan() {
72 return scanCreater
.get();
76 protected List
<Result
> doScan(Scan scan
, int closeAfter
) throws Exception
{
77 AsyncTable
<ScanResultConsumer
> table
= connectionRule
.getAsyncConnection()
78 .getTable(TABLE_NAME
, ForkJoinPool
.commonPool());
81 // these tests batch settings with the sample data result in each result being
82 // split in two. so we must allow twice the expected results in order to reach
83 // our true limit. see convertFromBatchResult for details.
84 if (scan
.getBatch() > 0) {
85 closeAfter
= closeAfter
* 2;
87 TracedScanResultConsumer consumer
=
88 new TracedScanResultConsumer(new LimitedScanResultConsumer(closeAfter
));
89 table
.scan(scan
, consumer
);
90 results
= consumer
.getAll();
92 TracedScanResultConsumer consumer
=
93 new TracedScanResultConsumer(new SimpleScanResultConsumerImpl());
94 table
.scan(scan
, consumer
);
95 results
= consumer
.getAll();
97 if (scan
.getBatch() > 0) {
98 results
= convertFromBatchResult(results
);
104 protected void assertTraceContinuity() {
105 final String parentSpanName
= testName
.getMethodName();
106 final Matcher
<SpanData
> parentSpanMatcher
= allOf(
107 hasName(parentSpanName
),
108 hasStatusWithCode(StatusCode
.OK
),
110 waitForSpan(parentSpanMatcher
);
112 final List
<SpanData
> spans
= otelClassRule
.getSpans()
114 .filter(Objects
::nonNull
)
115 .collect(Collectors
.toList());
116 if (logger
.isDebugEnabled()) {
117 StringTraceRenderer stringTraceRenderer
= new StringTraceRenderer(spans
);
118 stringTraceRenderer
.render(logger
::debug
);
121 final String parentSpanId
= spans
.stream()
122 .filter(parentSpanMatcher
::matches
)
123 .map(SpanData
::getSpanId
)
125 .orElseThrow(AssertionError
::new);
127 final Matcher
<SpanData
> scanOperationSpanMatcher
= allOf(
128 hasName(startsWith("SCAN " + TABLE_NAME
.getNameWithNamespaceInclAsString())),
129 hasParentSpanId(parentSpanId
),
130 hasStatusWithCode(StatusCode
.OK
),
132 assertThat(spans
, hasItem(scanOperationSpanMatcher
));
133 final String scanOperationSpanId
= spans
.stream()
134 .filter(scanOperationSpanMatcher
::matches
)
135 .map(SpanData
::getSpanId
)
137 .orElseThrow(AssertionError
::new);
139 final Matcher
<SpanData
> onScanMetricsCreatedMatcher
=
140 hasName("TracedScanResultConsumer#onScanMetricsCreated");
141 assertThat(spans
, hasItem(onScanMetricsCreatedMatcher
));
143 .filter(onScanMetricsCreatedMatcher
::matches
)
144 .forEach(span
-> assertThat(span
, allOf(
145 onScanMetricsCreatedMatcher
,
146 hasParentSpanId(scanOperationSpanId
),
149 final Matcher
<SpanData
> onNextMatcher
= hasName("TracedScanResultConsumer#onNext");
150 assertThat(spans
, hasItem(onNextMatcher
));
152 .filter(onNextMatcher
::matches
)
153 .forEach(span
-> assertThat(span
, allOf(
155 hasParentSpanId(scanOperationSpanId
),
156 hasStatusWithCode(StatusCode
.OK
),
159 final Matcher
<SpanData
> onCompleteMatcher
= hasName("TracedScanResultConsumer#onComplete");
160 assertThat(spans
, hasItem(onCompleteMatcher
));
162 .filter(onCompleteMatcher
::matches
)
163 .forEach(span
-> assertThat(span
, allOf(
165 hasParentSpanId(scanOperationSpanId
),
166 hasStatusWithCode(StatusCode
.OK
),
171 protected void assertTraceError(Matcher
<String
> exceptionTypeNameMatcher
) {
172 final String parentSpanName
= testName
.getMethodName();
173 final Matcher
<SpanData
> parentSpanMatcher
= allOf(hasName(parentSpanName
), hasEnded());
174 waitForSpan(parentSpanMatcher
);
176 final List
<SpanData
> spans
= otelClassRule
.getSpans()
178 .filter(Objects
::nonNull
)
179 .collect(Collectors
.toList());
180 if (logger
.isDebugEnabled()) {
181 StringTraceRenderer stringTraceRenderer
= new StringTraceRenderer(spans
);
182 stringTraceRenderer
.render(logger
::debug
);
185 final String parentSpanId
= spans
.stream()
186 .filter(parentSpanMatcher
::matches
)
187 .map(SpanData
::getSpanId
)
189 .orElseThrow(AssertionError
::new);
191 final Matcher
<SpanData
> scanOperationSpanMatcher
= allOf(
192 hasName(startsWith("SCAN " + TABLE_NAME
.getNameWithNamespaceInclAsString())),
193 hasParentSpanId(parentSpanId
),
194 hasStatusWithCode(StatusCode
.ERROR
),
195 hasExceptionWithType(exceptionTypeNameMatcher
),
197 assertThat(spans
, hasItem(scanOperationSpanMatcher
));
198 final String scanOperationSpanId
= spans
.stream()
199 .filter(scanOperationSpanMatcher
::matches
)
200 .map(SpanData
::getSpanId
)
202 .orElseThrow(AssertionError
::new);
204 final Matcher
<SpanData
> onErrorMatcher
= hasName("TracedScanResultConsumer#onError");
205 assertThat(spans
, hasItem(onErrorMatcher
));
207 .filter(onErrorMatcher
::matches
)
208 .forEach(span
-> assertThat(span
, allOf(
210 hasParentSpanId(scanOperationSpanId
),
211 hasStatusWithCode(StatusCode
.OK
),