2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.wal
;
20 import java
.io
.IOException
;
21 import java
.util
.ArrayList
;
22 import java
.util
.List
;
24 import java
.util
.NavigableMap
;
25 import java
.util
.TreeMap
;
26 import java
.util
.UUID
;
27 import org
.apache
.hadoop
.hbase
.HBaseInterfaceAudience
;
28 import org
.apache
.hadoop
.hbase
.HConstants
;
29 import org
.apache
.hadoop
.hbase
.TableName
;
30 import org
.apache
.hadoop
.hbase
.regionserver
.MultiVersionConcurrencyControl
;
31 import org
.apache
.hadoop
.hbase
.regionserver
.SequenceId
;
32 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.CompressionContext
;
33 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.WALCellCodec
;
34 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
35 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
36 import org
.apache
.yetus
.audience
.InterfaceAudience
;
38 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.annotations
.VisibleForTesting
;
39 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.ByteString
;
41 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.HBaseProtos
;
42 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
;
43 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.FamilyScope
;
44 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
.ScopeType
;
47 * Default implementation of Key for an Entry in the WAL.
48 * For internal use only though Replication needs to have access.
50 * The log intermingles edits to many tables and rows, so each log entry
51 * identifies the appropriate table and row. Within a table and row, they're
54 * <p>Some Transactional edits (START, COMMIT, ABORT) will not have an associated row.
57 // TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical
58 // purposes. They need to be merged into WALEntry.
59 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience
.REPLICATION
})
60 public class WALKeyImpl
implements WALKey
{
61 public static final WALKeyImpl EMPTY_WALKEYIMPL
= new WALKeyImpl();
63 public MultiVersionConcurrencyControl
getMvcc() {
68 * Use it to complete mvcc transaction. This WALKeyImpl was part of
69 * (the transaction is started when you call append; see the comment on FSHLog#append). To
71 * {@link MultiVersionConcurrencyControl#complete(MultiVersionConcurrencyControl.WriteEntry)}
72 * or {@link MultiVersionConcurrencyControl#complete(MultiVersionConcurrencyControl.WriteEntry)}
73 * @return A WriteEntry gotten from local WAL subsystem.
74 * @see #setWriteEntry(MultiVersionConcurrencyControl.WriteEntry)
76 public MultiVersionConcurrencyControl
.WriteEntry
getWriteEntry() {
77 return this.writeEntry
;
80 public void setWriteEntry(MultiVersionConcurrencyControl
.WriteEntry writeEntry
) {
81 assert this.writeEntry
== null;
82 this.writeEntry
= writeEntry
;
83 // Set our sequenceid now using WriteEntry.
84 this.sequenceId
= writeEntry
.getWriteNumber();
87 private byte [] encodedRegionName
;
89 private TableName tablename
;
92 * SequenceId for this edit. Set post-construction at write-to-WAL time. Until then it is
93 * NO_SEQUENCE_ID. Change it so multiple threads can read it -- e.g. access is synchronized.
95 private long sequenceId
;
98 * Used during WAL replay; the sequenceId of the edit when it came into the system.
100 private long origLogSeqNum
= 0;
102 /** Time at which this edit was written. */
103 private long writeTime
;
105 /** The first element in the list is the cluster id on which the change has originated */
106 private List
<UUID
> clusterIds
;
108 private NavigableMap
<byte[], Integer
> replicationScope
;
110 private long nonceGroup
= HConstants
.NO_NONCE
;
111 private long nonce
= HConstants
.NO_NONCE
;
112 private MultiVersionConcurrencyControl mvcc
;
115 * Set in a way visible to multiple threads; e.g. synchronized getter/setters.
117 private MultiVersionConcurrencyControl
.WriteEntry writeEntry
;
119 public WALKeyImpl() {
120 init(null, null, 0L, HConstants
.LATEST_TIMESTAMP
,
121 new ArrayList
<>(), HConstants
.NO_NONCE
, HConstants
.NO_NONCE
, null, null);
124 public WALKeyImpl(final NavigableMap
<byte[], Integer
> replicationScope
) {
125 init(null, null, 0L, HConstants
.LATEST_TIMESTAMP
,
126 new ArrayList
<>(), HConstants
.NO_NONCE
, HConstants
.NO_NONCE
, null, replicationScope
);
130 public WALKeyImpl(final byte[] encodedRegionName
, final TableName tablename
, long logSeqNum
,
131 final long now
, UUID clusterId
) {
132 List
<UUID
> clusterIds
= new ArrayList
<>(1);
133 clusterIds
.add(clusterId
);
134 init(encodedRegionName
, tablename
, logSeqNum
, now
, clusterIds
, HConstants
.NO_NONCE
,
135 HConstants
.NO_NONCE
, null, null);
139 public WALKeyImpl(final byte[] encodedRegionName
, final TableName tablename
, long logSeqNum
,
140 final long now
, UUID clusterId
, MultiVersionConcurrencyControl mvcc
) {
141 List
<UUID
> clusterIds
= new ArrayList
<>(1);
142 clusterIds
.add(clusterId
);
143 init(encodedRegionName
, tablename
, logSeqNum
, now
, clusterIds
, HConstants
.NO_NONCE
,
144 HConstants
.NO_NONCE
, mvcc
, null);
147 // TODO: Fix being able to pass in sequenceid.
148 public WALKeyImpl(final byte[] encodedRegionName
, final TableName tablename
, final long now
) {
149 init(encodedRegionName
,
159 // TODO: Fix being able to pass in sequenceid.
160 public WALKeyImpl(final byte[] encodedRegionName
, final TableName tablename
, final long now
,
161 final NavigableMap
<byte[], Integer
> replicationScope
) {
162 init(encodedRegionName
, tablename
, NO_SEQUENCE_ID
, now
, EMPTY_UUIDS
, HConstants
.NO_NONCE
,
163 HConstants
.NO_NONCE
, null, replicationScope
);
166 public WALKeyImpl(final byte[] encodedRegionName
, final TableName tablename
, final long now
,
167 MultiVersionConcurrencyControl mvcc
, final NavigableMap
<byte[], Integer
> replicationScope
) {
168 init(encodedRegionName
, tablename
, NO_SEQUENCE_ID
, now
, EMPTY_UUIDS
, HConstants
.NO_NONCE
,
169 HConstants
.NO_NONCE
, mvcc
, replicationScope
);
172 public WALKeyImpl(final byte[] encodedRegionName
,
173 final TableName tablename
,
175 MultiVersionConcurrencyControl mvcc
) {
176 init(encodedRegionName
,
187 * Create the log key for writing to somewhere.
188 * We maintain the tablename mainly for debugging purposes.
189 * A regionName is always a sub-table object.
190 * <p>Used by log splitting and snapshots.
192 * @param encodedRegionName Encoded name of the region as returned by
193 * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
194 * @param tablename - name of table
195 * @param logSeqNum - log sequence number
196 * @param now Time at which this edit was written.
197 * @param clusterIds the clusters that have consumed the change(used in Replication)
198 * @param nonceGroup the nonceGroup
199 * @param nonce the nonce
200 * @param mvcc the mvcc associate the WALKeyImpl
201 * @param replicationScope the non-default replication scope
202 * associated with the region's column families
204 // TODO: Fix being able to pass in sequenceid.
205 public WALKeyImpl(final byte[] encodedRegionName
, final TableName tablename
, long logSeqNum
,
206 final long now
, List
<UUID
> clusterIds
, long nonceGroup
, long nonce
,
207 MultiVersionConcurrencyControl mvcc
, final NavigableMap
<byte[], Integer
> replicationScope
) {
208 init(encodedRegionName
, tablename
, logSeqNum
, now
, clusterIds
, nonceGroup
, nonce
, mvcc
,
213 * Create the log key for writing to somewhere.
214 * We maintain the tablename mainly for debugging purposes.
215 * A regionName is always a sub-table object.
216 * <p>Used by log splitting and snapshots.
218 * @param encodedRegionName Encoded name of the region as returned by
219 * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
220 * @param tablename - name of table
221 * @param logSeqNum - log sequence number
222 * @param now Time at which this edit was written.
223 * @param clusterIds the clusters that have consumed the change(used in Replication)
225 // TODO: Fix being able to pass in sequenceid.
226 public WALKeyImpl(final byte[] encodedRegionName
,
227 final TableName tablename
,
230 List
<UUID
> clusterIds
,
233 MultiVersionConcurrencyControl mvcc
) {
234 init(encodedRegionName
, tablename
, logSeqNum
, now
, clusterIds
, nonceGroup
, nonce
, mvcc
, null);
238 * Create the log key for writing to somewhere.
239 * We maintain the tablename mainly for debugging purposes.
240 * A regionName is always a sub-table object.
242 * @param encodedRegionName Encoded name of the region as returned by
243 * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
244 * @param tablename the tablename
245 * @param now Time at which this edit was written.
246 * @param clusterIds the clusters that have consumed the change(used in Replication)
249 * @param mvcc mvcc control used to generate sequence numbers and control read/write points
251 public WALKeyImpl(final byte[] encodedRegionName
, final TableName tablename
,
252 final long now
, List
<UUID
> clusterIds
, long nonceGroup
,
253 final long nonce
, final MultiVersionConcurrencyControl mvcc
) {
254 init(encodedRegionName
, tablename
, NO_SEQUENCE_ID
, now
, clusterIds
, nonceGroup
, nonce
, mvcc
,
259 * Create the log key for writing to somewhere.
260 * We maintain the tablename mainly for debugging purposes.
261 * A regionName is always a sub-table object.
263 * @param encodedRegionName Encoded name of the region as returned by
264 * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
266 * @param now Time at which this edit was written.
267 * @param clusterIds the clusters that have consumed the change(used in Replication)
268 * @param nonceGroup the nonceGroup
269 * @param nonce the nonce
270 * @param mvcc mvcc control used to generate sequence numbers and control read/write points
271 * @param replicationScope the non-default replication scope of the column families
273 public WALKeyImpl(final byte[] encodedRegionName
, final TableName tablename
,
274 final long now
, List
<UUID
> clusterIds
, long nonceGroup
,
275 final long nonce
, final MultiVersionConcurrencyControl mvcc
,
276 NavigableMap
<byte[], Integer
> replicationScope
) {
277 init(encodedRegionName
, tablename
, NO_SEQUENCE_ID
, now
, clusterIds
, nonceGroup
, nonce
, mvcc
,
282 * Create the log key for writing to somewhere.
283 * We maintain the tablename mainly for debugging purposes.
284 * A regionName is always a sub-table object.
286 * @param encodedRegionName Encoded name of the region as returned by
287 * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
293 // TODO: Fix being able to pass in sequenceid.
294 public WALKeyImpl(final byte[] encodedRegionName
,
295 final TableName tablename
,
299 final MultiVersionConcurrencyControl mvcc
) {
300 init(encodedRegionName
,
303 EnvironmentEdgeManager
.currentTime(),
310 @InterfaceAudience.Private
311 protected void init(final byte[] encodedRegionName
,
312 final TableName tablename
,
315 List
<UUID
> clusterIds
,
318 MultiVersionConcurrencyControl mvcc
,
319 NavigableMap
<byte[], Integer
> replicationScope
) {
320 this.sequenceId
= logSeqNum
;
321 this.writeTime
= now
;
322 this.clusterIds
= clusterIds
;
323 this.encodedRegionName
= encodedRegionName
;
324 this.tablename
= tablename
;
325 this.nonceGroup
= nonceGroup
;
328 if (logSeqNum
!= NO_SEQUENCE_ID
) {
329 setSequenceId(logSeqNum
);
331 this.replicationScope
= replicationScope
;
334 // For deserialization. DO NOT USE. See setWriteEntry below.
335 @InterfaceAudience.Private
336 protected void setSequenceId(long sequenceId
) {
337 this.sequenceId
= sequenceId
;
340 /** @return encoded region name */
342 public byte [] getEncodedRegionName() {
343 return encodedRegionName
;
346 /** @return table name */
348 public TableName
getTableName() {
352 /** @return log sequence number
353 * @deprecated Use {@link #getSequenceId()}
356 public long getLogSeqNum() {
357 return getSequenceId();
361 * Used to set original sequenceId for WALKeyImpl during WAL replay
363 public void setOrigLogSeqNum(final long sequenceId
) {
364 this.origLogSeqNum
= sequenceId
;
368 * Return a positive long if current WALKeyImpl is created from a replay edit; a replay edit is an
369 * edit that came in when replaying WALs of a crashed server.
370 * @return original sequence number of the WALEdit
373 public long getOrigLogSeqNum() {
374 return this.origLogSeqNum
;
378 * SequenceId is only available post WAL-assign. Calls before this will get you a
379 * {@link SequenceId#NO_SEQUENCE_ID}. See the comment on FSHLog#append and #getWriteNumber in this
380 * method for more on when this sequenceId comes available.
381 * @return long the new assigned sequence number
384 public long getSequenceId() {
385 return this.sequenceId
;
389 * @return the write time
392 public long getWriteTime() {
393 return this.writeTime
;
396 public NavigableMap
<byte[], Integer
> getReplicationScopes() {
397 return replicationScope
;
400 /** @return The nonce group */
402 public long getNonceGroup() {
406 /** @return The nonce */
408 public long getNonce() {
412 private void setReplicationScope(NavigableMap
<byte[], Integer
> replicationScope
) {
413 this.replicationScope
= replicationScope
;
416 public void clearReplicationScope() {
417 setReplicationScope(null);
421 * Marks that the cluster with the given clusterId has consumed the change
423 public void addClusterId(UUID clusterId
) {
424 if (!clusterIds
.contains(clusterId
)) {
425 clusterIds
.add(clusterId
);
430 * @return the set of cluster Ids that have consumed the change
432 public List
<UUID
> getClusterIds() {
437 * @return the cluster id on which the change has originated. It there is no such cluster, it
438 * returns DEFAULT_CLUSTER_ID (cases where replication is not enabled)
441 public UUID
getOriginatingClusterId(){
442 return clusterIds
.isEmpty()? HConstants
.DEFAULT_CLUSTER_ID
: clusterIds
.get(0);
446 public String
toString() {
447 return tablename
+ "/" + Bytes
.toString(encodedRegionName
) + "/" + sequenceId
;
451 public boolean equals(Object obj
) {
455 if (obj
== null || getClass() != obj
.getClass()) {
458 return compareTo((WALKey
)obj
) == 0;
462 public int hashCode() {
463 int result
= Bytes
.hashCode(this.encodedRegionName
);
464 result
= (int) (result ^
getSequenceId());
465 result
= (int) (result ^
this.writeTime
);
470 public int compareTo(WALKey o
) {
471 int result
= Bytes
.compareTo(this.encodedRegionName
, o
.getEncodedRegionName());
473 long sid
= getSequenceId();
474 long otherSid
= o
.getSequenceId();
475 if (sid
< otherSid
) {
477 } else if (sid
> otherSid
) {
481 if (this.writeTime
< o
.getWriteTime()) {
483 } else if (this.writeTime
> o
.getWriteTime()) {
488 // why isn't cluster id accounted for?
493 * Drop this instance's tablename byte array and instead
494 * hold a reference to the provided tablename. This is not
495 * meant to be a general purpose setter - it's only used
496 * to collapse references to conserve memory.
498 void internTableName(TableName tablename
) {
499 // We should not use this as a setter - only to swap
500 // in a new reference to the same table name.
501 assert tablename
.equals(this.tablename
);
502 this.tablename
= tablename
;
506 * Drop this instance's region name byte array and instead
507 * hold a reference to the provided region name. This is not
508 * meant to be a general purpose setter - it's only used
509 * to collapse references to conserve memory.
511 void internEncodedRegionName(byte []encodedRegionName
) {
512 // We should not use this as a setter - only to swap
513 // in a new reference to the same table name.
514 assert Bytes
.equals(this.encodedRegionName
, encodedRegionName
);
515 this.encodedRegionName
= encodedRegionName
;
518 public WALProtos
.WALKey
.Builder
getBuilder(WALCellCodec
.ByteStringCompressor compressor
)
520 WALProtos
.WALKey
.Builder builder
= WALProtos
.WALKey
.newBuilder();
521 builder
.setEncodedRegionName(
522 compressor
.compress(this.encodedRegionName
, CompressionContext
.DictionaryIndex
.REGION
));
523 builder
.setTableName(
524 compressor
.compress(this.tablename
.getName(), CompressionContext
.DictionaryIndex
.TABLE
));
525 builder
.setLogSequenceNumber(getSequenceId());
526 builder
.setWriteTime(writeTime
);
527 if (this.origLogSeqNum
> 0) {
528 builder
.setOrigSequenceNumber(this.origLogSeqNum
);
530 if (this.nonce
!= HConstants
.NO_NONCE
) {
531 builder
.setNonce(nonce
);
533 if (this.nonceGroup
!= HConstants
.NO_NONCE
) {
534 builder
.setNonceGroup(nonceGroup
);
536 HBaseProtos
.UUID
.Builder uuidBuilder
= HBaseProtos
.UUID
.newBuilder();
537 for (UUID clusterId
: clusterIds
) {
538 uuidBuilder
.setLeastSigBits(clusterId
.getLeastSignificantBits());
539 uuidBuilder
.setMostSigBits(clusterId
.getMostSignificantBits());
540 builder
.addClusterIds(uuidBuilder
.build());
542 if (replicationScope
!= null) {
543 for (Map
.Entry
<byte[], Integer
> e
: replicationScope
.entrySet()) {
545 compressor
.compress(e
.getKey(), CompressionContext
.DictionaryIndex
.FAMILY
);
546 builder
.addScopes(FamilyScope
.newBuilder().setFamily(family
)
547 .setScopeType(ScopeType
.forNumber(e
.getValue())));
553 public void readFieldsFromPb(WALProtos
.WALKey walKey
,
554 WALCellCodec
.ByteStringUncompressor uncompressor
) throws IOException
{
555 this.encodedRegionName
= uncompressor
.uncompress(walKey
.getEncodedRegionName(),
556 CompressionContext
.DictionaryIndex
.REGION
);
557 byte[] tablenameBytes
=
558 uncompressor
.uncompress(walKey
.getTableName(), CompressionContext
.DictionaryIndex
.TABLE
);
559 this.tablename
= TableName
.valueOf(tablenameBytes
);
561 for (HBaseProtos
.UUID clusterId
: walKey
.getClusterIdsList()) {
562 clusterIds
.add(new UUID(clusterId
.getMostSigBits(), clusterId
.getLeastSigBits()));
564 if (walKey
.hasNonceGroup()) {
565 this.nonceGroup
= walKey
.getNonceGroup();
567 if (walKey
.hasNonce()) {
568 this.nonce
= walKey
.getNonce();
570 this.replicationScope
= null;
571 if (walKey
.getScopesCount() > 0) {
572 this.replicationScope
= new TreeMap
<>(Bytes
.BYTES_COMPARATOR
);
573 for (FamilyScope scope
: walKey
.getScopesList()) {
575 uncompressor
.uncompress(scope
.getFamily(), CompressionContext
.DictionaryIndex
.FAMILY
);
576 this.replicationScope
.put(family
, scope
.getScopeType().getNumber());
579 setSequenceId(walKey
.getLogSequenceNumber());
580 this.writeTime
= walKey
.getWriteTime();
581 if (walKey
.hasOrigSequenceNumber()) {
582 this.origLogSeqNum
= walKey
.getOrigSequenceNumber();
587 public long estimatedSerializedSizeOf() {
588 long size
= encodedRegionName
!= null ? encodedRegionName
.length
: 0;
589 size
+= tablename
!= null ? tablename
.toBytes().length
: 0;
590 if (clusterIds
!= null) {
591 size
+= 16 * clusterIds
.size();
593 if (nonceGroup
!= HConstants
.NO_NONCE
) {
594 size
+= Bytes
.SIZEOF_LONG
; // nonce group
596 if (nonce
!= HConstants
.NO_NONCE
) {
597 size
+= Bytes
.SIZEOF_LONG
; // nonce
599 if (replicationScope
!= null) {
600 for (Map
.Entry
<byte[], Integer
> scope
: replicationScope
.entrySet()) {
601 size
+= scope
.getKey().length
;
602 size
+= Bytes
.SIZEOF_INT
;
605 size
+= Bytes
.SIZEOF_LONG
; // sequence number
606 size
+= Bytes
.SIZEOF_LONG
; // write time
607 if (origLogSeqNum
> 0) {
608 size
+= Bytes
.SIZEOF_LONG
; // original sequence number