2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.mapreduce
;
21 import java
.io
.IOException
;
22 import java
.util
.ArrayList
;
23 import java
.util
.List
;
25 import java
.util
.NavigableMap
;
26 import java
.util
.TreeMap
;
27 import org
.apache
.hadoop
.conf
.Configurable
;
28 import org
.apache
.hadoop
.conf
.Configuration
;
29 import org
.apache
.hadoop
.fs
.FileUtil
;
30 import org
.apache
.hadoop
.hbase
.Cell
;
31 import org
.apache
.hadoop
.hbase
.CellUtil
;
32 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
33 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
34 import org
.apache
.hadoop
.hbase
.TableName
;
35 import org
.apache
.hadoop
.hbase
.client
.Admin
;
36 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
37 import org
.apache
.hadoop
.hbase
.client
.Connection
;
38 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
39 import org
.apache
.hadoop
.hbase
.client
.Durability
;
40 import org
.apache
.hadoop
.hbase
.client
.Put
;
41 import org
.apache
.hadoop
.hbase
.client
.Result
;
42 import org
.apache
.hadoop
.hbase
.client
.ResultScanner
;
43 import org
.apache
.hadoop
.hbase
.client
.Scan
;
44 import org
.apache
.hadoop
.hbase
.client
.Table
;
45 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
46 import org
.apache
.hadoop
.hbase
.io
.ImmutableBytesWritable
;
47 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
48 import org
.apache
.hadoop
.hbase
.testclassification
.MapReduceTests
;
49 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
50 import org
.apache
.hadoop
.io
.MapWritable
;
51 import org
.apache
.hadoop
.io
.Text
;
52 import org
.apache
.hadoop
.mapreduce
.Job
;
53 import org
.apache
.hadoop
.mapreduce
.lib
.output
.NullOutputFormat
;
54 import org
.junit
.AfterClass
;
55 import org
.junit
.Before
;
56 import org
.junit
.BeforeClass
;
57 import org
.junit
.ClassRule
;
58 import org
.junit
.Test
;
59 import org
.junit
.experimental
.categories
.Category
;
60 import org
.slf4j
.Logger
;
61 import org
.slf4j
.LoggerFactory
;
63 @Category({MapReduceTests
.class, LargeTests
.class})
64 public class TestTimeRangeMapRed
{
67 public static final HBaseClassTestRule CLASS_RULE
=
68 HBaseClassTestRule
.forClass(TestTimeRangeMapRed
.class);
70 private final static Logger log
= LoggerFactory
.getLogger(TestTimeRangeMapRed
.class);
71 private static final HBaseTestingUtility UTIL
=
72 new HBaseTestingUtility();
75 private static final byte [] KEY
= Bytes
.toBytes("row1");
76 private static final NavigableMap
<Long
, Boolean
> TIMESTAMP
= new TreeMap
<>();
78 TIMESTAMP
.put((long)1245620000, false);
79 TIMESTAMP
.put((long)1245620005, true); // include
80 TIMESTAMP
.put((long)1245620010, true); // include
81 TIMESTAMP
.put((long)1245620055, true); // include
82 TIMESTAMP
.put((long)1245620100, true); // include
83 TIMESTAMP
.put((long)1245620150, false);
84 TIMESTAMP
.put((long)1245620250, false);
86 static final long MINSTAMP
= 1245620005;
87 static final long MAXSTAMP
= 1245620100 + 1; // maxStamp itself is excluded. so increment it.
89 static final TableName TABLE_NAME
= TableName
.valueOf("table123");
90 static final byte[] FAMILY_NAME
= Bytes
.toBytes("text");
91 static final byte[] COLUMN_NAME
= Bytes
.toBytes("input");
94 public static void beforeClass() throws Exception
{
95 UTIL
.startMiniCluster();
99 public static void afterClass() throws Exception
{
100 UTIL
.shutdownMiniCluster();
104 public void before() throws Exception
{
105 this.admin
= UTIL
.getAdmin();
108 private static class ProcessTimeRangeMapper
109 extends TableMapper
<ImmutableBytesWritable
, MapWritable
>
110 implements Configurable
{
112 private Configuration conf
= null;
113 private Table table
= null;
116 public void map(ImmutableBytesWritable key
, Result result
,
119 List
<Long
> tsList
= new ArrayList
<>();
120 for (Cell kv
: result
.listCells()) {
121 tsList
.add(kv
.getTimestamp());
124 List
<Put
> puts
= new ArrayList
<>();
125 for (Long ts
: tsList
) {
126 Put put
= new Put(key
.get());
127 put
.setDurability(Durability
.SKIP_WAL
);
128 put
.addColumn(FAMILY_NAME
, COLUMN_NAME
, ts
, Bytes
.toBytes(true));
135 public Configuration
getConf() {
140 public void setConf(Configuration configuration
) {
141 this.conf
= configuration
;
143 Connection connection
= ConnectionFactory
.createConnection(conf
);
144 table
= connection
.getTable(TABLE_NAME
);
145 } catch (IOException e
) {
152 public void testTimeRangeMapRed()
153 throws IOException
, InterruptedException
, ClassNotFoundException
{
154 final TableDescriptorBuilder
.ModifyableTableDescriptor tableDescriptor
=
155 new TableDescriptorBuilder
.ModifyableTableDescriptor(TABLE_NAME
);
156 final ColumnFamilyDescriptorBuilder
.ModifyableColumnFamilyDescriptor familyDescriptor
=
157 new ColumnFamilyDescriptorBuilder
.ModifyableColumnFamilyDescriptor(FAMILY_NAME
);
158 familyDescriptor
.setMaxVersions(Integer
.MAX_VALUE
);
159 tableDescriptor
.setColumnFamily(familyDescriptor
);
160 admin
.createTable(tableDescriptor
);
161 List
<Put
> puts
= new ArrayList
<>();
162 for (Map
.Entry
<Long
, Boolean
> entry
: TIMESTAMP
.entrySet()) {
163 Put put
= new Put(KEY
);
164 put
.setDurability(Durability
.SKIP_WAL
);
165 put
.addColumn(FAMILY_NAME
, COLUMN_NAME
, entry
.getKey(), Bytes
.toBytes(false));
168 Table table
= UTIL
.getConnection().getTable(tableDescriptor
.getTableName());
175 private void runTestOnTable()
176 throws IOException
, InterruptedException
, ClassNotFoundException
{
179 job
= new Job(UTIL
.getConfiguration(), "test123");
180 job
.setOutputFormatClass(NullOutputFormat
.class);
181 job
.setNumReduceTasks(0);
182 Scan scan
= new Scan();
183 scan
.addColumn(FAMILY_NAME
, COLUMN_NAME
);
184 scan
.setTimeRange(MINSTAMP
, MAXSTAMP
);
185 scan
.setMaxVersions();
186 TableMapReduceUtil
.initTableMapperJob(TABLE_NAME
,
187 scan
, ProcessTimeRangeMapper
.class, Text
.class, Text
.class, job
);
188 job
.waitForCompletion(true);
189 } catch (IOException e
) {
190 // TODO Auto-generated catch block
194 FileUtil
.fullyDelete(
195 new File(job
.getConfiguration().get("hadoop.tmp.dir")));
200 private void verify(final Table table
) throws IOException
{
201 Scan scan
= new Scan();
202 scan
.addColumn(FAMILY_NAME
, COLUMN_NAME
);
203 scan
.setMaxVersions(1);
204 ResultScanner scanner
= table
.getScanner(scan
);
205 for (Result r
: scanner
) {
206 for (Cell kv
: r
.listCells()) {
207 log
.debug(Bytes
.toString(r
.getRow()) + "\t" + Bytes
.toString(CellUtil
.cloneFamily(kv
))
208 + "\t" + Bytes
.toString(CellUtil
.cloneQualifier(kv
))
209 + "\t" + kv
.getTimestamp() + "\t" + Bytes
.toBoolean(CellUtil
.cloneValue(kv
)));
210 org
.junit
.Assert
.assertEquals(TIMESTAMP
.get(kv
.getTimestamp()),
211 Bytes
.toBoolean(CellUtil
.cloneValue(kv
)));