2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.rest
;
20 import java
.io
.IOException
;
21 import java
.io
.OutputStream
;
22 import java
.util
.List
;
23 import org
.apache
.hadoop
.hbase
.Cell
;
24 import org
.apache
.hadoop
.hbase
.CellUtil
;
25 import org
.apache
.hadoop
.hbase
.client
.Result
;
26 import org
.apache
.hadoop
.hbase
.client
.ResultScanner
;
27 import org
.apache
.hadoop
.hbase
.rest
.model
.CellModel
;
28 import org
.apache
.hadoop
.hbase
.rest
.model
.CellSetModel
;
29 import org
.apache
.hadoop
.hbase
.rest
.model
.RowModel
;
30 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
31 import org
.apache
.yetus
.audience
.InterfaceAudience
;
32 import org
.slf4j
.Logger
;
33 import org
.slf4j
.LoggerFactory
;
35 import org
.apache
.hbase
.thirdparty
.javax
.ws
.rs
.WebApplicationException
;
36 import org
.apache
.hbase
.thirdparty
.javax
.ws
.rs
.core
.StreamingOutput
;
38 @InterfaceAudience.Private
39 public class ProtobufStreamingOutput
implements StreamingOutput
{
40 private static final Logger LOG
= LoggerFactory
.getLogger(ProtobufStreamingOutput
.class);
42 private String contentType
;
43 private ResultScanner resultScanner
;
45 private int fetchSize
;
47 protected ProtobufStreamingOutput(ResultScanner scanner
, String type
, int limit
, int fetchSize
) {
48 this.resultScanner
= scanner
;
49 this.contentType
= type
;
51 this.fetchSize
= fetchSize
;
52 if (LOG
.isTraceEnabled()) {
53 LOG
.trace("Created StreamingOutput with content type = " + this.contentType
54 + " user limit : " + this.limit
+ " scan fetch size : " + this.fetchSize
);
59 public void write(OutputStream outStream
) throws IOException
, WebApplicationException
{
61 if(limit
< fetchSize
){
62 rowsToSend
= this.resultScanner
.next(limit
);
63 writeToStream(createModelFromResults(rowsToSend
), this.contentType
, outStream
);
67 if (count
< fetchSize
) {
68 rowsToSend
= this.resultScanner
.next(count
);
70 rowsToSend
= this.resultScanner
.next(this.fetchSize
);
72 if(rowsToSend
.length
== 0){
75 count
= count
- rowsToSend
.length
;
76 writeToStream(createModelFromResults(rowsToSend
), this.contentType
, outStream
);
81 private void writeToStream(CellSetModel model
, String contentType
, OutputStream outStream
)
83 byte[] objectBytes
= model
.createProtobufOutput();
84 outStream
.write(Bytes
.toBytes((short)objectBytes
.length
));
85 outStream
.write(objectBytes
);
87 if (LOG
.isTraceEnabled()) {
88 LOG
.trace("Wrote " + model
.getRows().size() + " rows to stream successfully.");
92 private CellSetModel
createModelFromResults(Result
[] results
) {
93 CellSetModel cellSetModel
= new CellSetModel();
94 for (Result rs
: results
) {
95 byte[] rowKey
= rs
.getRow();
96 RowModel rModel
= new RowModel(rowKey
);
97 List
<Cell
> kvs
= rs
.listCells();
99 rModel
.addCell(new CellModel(CellUtil
.cloneFamily(kv
), CellUtil
.cloneQualifier(kv
), kv
100 .getTimestamp(), CellUtil
.cloneValue(kv
)));
102 cellSetModel
.addRow(rModel
);