3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
20 package org
.apache
.hadoop
.hbase
.util
;
22 import java
.util
.HashSet
;
25 import java
.util
.concurrent
.ConcurrentHashMap
;
27 import org
.apache
.hadoop
.conf
.Configuration
;
28 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
29 import org
.apache
.hadoop
.hbase
.HConstants
;
30 import org
.apache
.yetus
.audience
.InterfaceAudience
;
31 import org
.slf4j
.Logger
;
32 import org
.slf4j
.LoggerFactory
;
37 * LossyCounting utility, bounded data structure that maintains approximate high frequency
38 * elements in data stream.
40 * Bucket size is 1 / error rate. (Error rate is 0.02 by default)
41 * Lemma If element\x01 does not appear in set, then is frequency is less than e * N
42 * (N is total element counts until now.)
44 * http://www.vldb.org/conf/2002/S10P03.pdf
47 @InterfaceAudience.Public
48 public class LossyCounting
{
49 private static final Logger LOG
= LoggerFactory
.getLogger(LossyCounting
.class);
50 private long bucketSize
;
51 private long currentTerm
;
52 private double errorRate
;
53 private Map
<String
, Integer
> data
;
54 private long totalDataCount
;
56 public LossyCounting(double errorRate
) {
57 this.errorRate
= errorRate
;
58 if (errorRate
< 0.0 || errorRate
> 1.0) {
59 throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]");
61 this.bucketSize
= (long) Math
.ceil(1 / errorRate
);
63 this.totalDataCount
= 0;
64 this.errorRate
= errorRate
;
65 this.data
= new ConcurrentHashMap
<>();
66 calculateCurrentTerm();
69 public LossyCounting() {
70 Configuration conf
= HBaseConfiguration
.create();
71 this.errorRate
= conf
.getDouble(HConstants
.DEFAULT_LOSSY_COUNTING_ERROR_RATE
, 0.02);
72 this.bucketSize
= (long) Math
.ceil(1.0 / errorRate
);
74 this.totalDataCount
= 0;
75 this.data
= new ConcurrentHashMap
<>();
76 calculateCurrentTerm();
79 public Set
<String
> addByOne(String key
) {
80 if(data
.containsKey(key
)) {
81 data
.put(key
, data
.get(key
) +1);
86 calculateCurrentTerm();
87 Set
<String
> dataToBeSwept
= new HashSet
<>();
88 if(totalDataCount
% bucketSize
== 0) {
89 dataToBeSwept
= sweep();
95 * sweep low frequency data
96 * @return Names of elements got swept
98 private Set
<String
> sweep() {
99 Set
<String
> dataToBeSwept
= new HashSet
<>();
100 for(Map
.Entry
<String
, Integer
> entry
: data
.entrySet()) {
101 if(entry
.getValue() + errorRate
< currentTerm
) {
102 dataToBeSwept
.add(entry
.getKey());
105 for(String key
: dataToBeSwept
) {
108 LOG
.debug(String
.format("Swept %d of elements.", dataToBeSwept
.size()));
109 return dataToBeSwept
;
113 * Calculate and set current term
115 private void calculateCurrentTerm() {
116 this.currentTerm
= (int) Math
.ceil(1.0 * totalDataCount
/ bucketSize
);
119 public long getBuketSize(){
123 public long getDataSize() {
127 public boolean contains(String key
) {
128 return data
.containsKey(key
);
131 public long getCurrentTerm() {