2 * JVSTM: a Java library for Software Transactional Memory
3 * Copyright (C) 2005 INESC-ID Software Engineering Group
4 * http://www.esw.inesc-id.pt
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
21 * INESC-ID Software Engineering Group
28 import java
.util
.HashMap
;
30 import java
.util
.concurrent
.atomic
.AtomicInteger
;
32 import jvstm
.util
.Cons
;
35 * Parallel Nested Transaction is used to represent a part of a transaction that is
36 * running (potentially) in parallel with other subparts of the same
37 * transaction. The programmer is responsible for identifying the parts of a
38 * transaction that he wants to run concurrently. Consequently, those parts may
39 * not run in program order. The only guarantee is that their execution will be
40 * equivalent to some sequential order (plus the properties of opacity). If that
41 * guarantee is already provided by the disjoint accesses of each subpart,
42 * consider using UnsafeParallelTransaction.
47 public class ParallelNestedTransaction
extends ReadWriteTransaction
{
49 protected static final ExecuteParallelNestedTxSequentiallyException EXECUTE_SEQUENTIALLY_EXCEPTION
= new ExecuteParallelNestedTxSequentiallyException();
51 protected ThreadLocal
<AtomicInteger
> blocksFree
= new ThreadLocal
<AtomicInteger
>() {
53 protected AtomicInteger
initialValue() {
54 return new AtomicInteger(0);
58 protected ThreadLocal
<Cons
<ReadBlock
>> blocksPool
= new ThreadLocal
<Cons
<ReadBlock
>>() {
60 protected Cons
<ReadBlock
> initialValue() {
65 protected Cons
<ReadBlock
> globalReads
;
66 protected Map
<VBox
, InplaceWrite
> nestedReads
;
68 public ParallelNestedTransaction(ReadWriteTransaction parent
) {
71 int[] parentVers
= parent
.ancVersions
;
72 super.ancVersions
= new int[parentVers
.length
+ 1];
73 super.ancVersions
[0] = parent
.nestedCommitQueue
.commitNumber
;
74 for (int i
= 0; i
< parentVers
.length
; i
++) {
75 this.ancVersions
[i
+ 1] = parentVers
[i
];
78 this.nestedReads
= new HashMap
<VBox
, InplaceWrite
>();
79 this.globalReads
= Cons
.empty();
80 this.boxesWritten
= parent
.boxesWritten
;
83 public ParallelNestedTransaction(ReadWriteTransaction parent
, boolean multithreaded
) {
85 super.ancVersions
= EMPTY_VERSIONS
;
86 this.nestedReads
= ReadWriteTransaction
.EMPTY_MAP
;
87 this.globalReads
= Cons
.empty();
91 public Transaction
makeUnsafeMultithreaded() {
92 throw new Error("An Unsafe Parallel Transaction may only be spawned by another Unsafe or a Top-Level transaction");
96 public Transaction
makeNestedTransaction(boolean readOnly
) {
98 "A Parallel Nested Transaction cannot spawn a Linear Nested Transaction yet. Consider using a single Parallel Nested Transaction instead.");
102 protected Transaction
commitAndBeginTx(boolean readOnly
) {
104 return beginWithActiveRecord(readOnly
, null);
107 // Returns -2 if self; -1 if not anc; >= 0 as version on anc otherwise
108 protected int retrieveAncestorVersion(Transaction tx
) {
113 Transaction nextParent
= parent
;
114 while (nextParent
!= null) {
115 if (nextParent
== tx
) {
116 return ancVersions
[i
];
118 nextParent
= nextParent
.parent
;
124 private Transaction
retrieveLowestCommonAncestor(Transaction tx
) {
125 Transaction current
= tx
;
126 while (current
!= null) {
127 if (retrieveAncestorVersion(current
) >= 0) {
130 current
= current
.parent
;
136 public void abortTx() {
137 if (this.orec
.version
!= OwnershipRecord
.ABORTED
) {
140 Transaction
.current
.set(parent
);
143 private void manualAbort() {
144 ReadWriteTransaction parent
= getRWParent();
145 while (parent
!= null) {
146 for (ParallelNestedTransaction mergedIntoParent
: parent
.mergedTxs
) {
147 for (VBox vboxMergedIntoParent
: mergedIntoParent
.boxesWrittenInPlace
) {
148 revertOverwrite(vboxMergedIntoParent
);
151 for (VBox vboxMergedIntoParent
: parent
.boxesWrittenInPlace
) {
152 revertOverwrite(vboxMergedIntoParent
);
154 parent
= parent
.getRWParent();
157 this.orec
.version
= OwnershipRecord
.ABORTED
;
158 for (ReadWriteTransaction child
: mergedTxs
) {
159 child
.orec
.version
= OwnershipRecord
.ABORTED
;
161 super.boxesWritten
= null;
164 for (ReadBlock block
: globalReads
) {
168 blocksFree
.get().addAndGet(i
);
170 this.globalReads
= null;
171 this.nestedReads
= null;
172 super.mergedTxs
= null;
175 protected void revertOverwrite(VBox vboxWritten
) {
176 InplaceWrite write
= vboxWritten
.inplace
;
177 if (write
.orec
.owner
!= this) {
180 InplaceWrite overwritten
= write
;
181 while (overwritten
.next
!= null) {
182 overwritten
= overwritten
.next
;
183 if (overwritten
.orec
.owner
!= this && overwritten
.orec
.version
== OwnershipRecord
.RUNNING
) {
184 write
.tempValue
= overwritten
.tempValue
;
185 write
.next
= overwritten
.next
;
186 overwritten
.orec
.owner
= overwritten
.orec
.owner
; // enforce
188 write
.orec
= overwritten
.orec
;
194 protected <T
> T
readGlobal(VBox
<T
> vbox
) {
195 VBoxBody
<T
> body
= vbox
.body
;
196 if (body
.version
> number
) {
197 TransactionSignaller
.SIGNALLER
.signalEarlyAbort();
200 ReadBlock readBlock
= null;
202 if (blocksFree
.get().get() > 0) {
203 for (ReadBlock poolBlock
: blocksPool
.get()) {
204 if (poolBlock
.free
) {
205 poolBlock
.free
= false;
206 readBlock
= poolBlock
;
207 blocksFree
.get().decrementAndGet();
212 readBlock
= new ReadBlock(blocksFree
.get());
215 globalReads
= globalReads
.cons(readBlock
);
217 readBlock
= globalReads
.first();
219 readBlock
.entries
[next
--] = vbox
;
224 public <T
> T
getBoxValue(VBox
<T
> vbox
) {
225 InplaceWrite
<T
> inplaceWrite
= vbox
.inplace
;
226 T value
= inplaceWrite
.tempValue
;
227 OwnershipRecord inplaceOrec
= inplaceWrite
.orec
;
229 if (inplaceOrec
.version
> 0 && inplaceOrec
.version
<= number
) {
230 value
= readGlobal(vbox
);
235 int entryNestedVersion
= inplaceOrec
.nestedVersion
;
236 int versionOnAnc
= retrieveAncestorVersion(inplaceOrec
.owner
);
237 if (versionOnAnc
>= 0) {
238 if (entryNestedVersion
> versionOnAnc
) {
239 // eager w-r conflict, may restart immediately
241 TransactionSignaller
.SIGNALLER
.signalCommitFail(inplaceOrec
.owner
);
243 nestedReads
.put(vbox
, inplaceWrite
);
244 return (value
== NULL_VALUE
) ?
null : value
;
246 if (versionOnAnc
== -2) {
247 return (value
== NULL_VALUE
) ?
null : value
;
249 inplaceWrite
= inplaceWrite
.next
;
250 if (inplaceWrite
== null) {
253 value
= inplaceWrite
.tempValue
;
254 inplaceOrec
= inplaceWrite
.orec
;
257 if (boxesWritten
!= EMPTY_MAP
) {
258 value
= (T
) boxesWritten
.get(vbox
);
260 return (value
== NULL_VALUE
) ?
null : value
;
264 value
= readGlobal(vbox
);
270 public <T
> void setBoxValue(jvstm
.VBox
<T
> vbox
, T value
) {
271 InplaceWrite
<T
> inplaceWrite
= vbox
.inplace
;
272 OwnershipRecord currentOwner
= inplaceWrite
.orec
;
273 if (currentOwner
.owner
== this) { // we are already the current writer
274 inplaceWrite
.tempValue
= (value
== null ?
(T
) NULL_VALUE
: value
);
279 if (currentOwner
.version
!= 0) {
280 if (currentOwner
.version
<= this.number
) {
281 if (inplaceWrite
.CASowner(currentOwner
, this.orec
)) {
282 inplaceWrite
.tempValue
= (value
== null ?
(T
) NULL_VALUE
: value
);
283 boxesWrittenInPlace
= boxesWrittenInPlace
.cons(vbox
);
286 currentOwner
= inplaceWrite
.orec
;
289 // more recent than my number
292 if (retrieveAncestorVersion(currentOwner
.owner
) >= 0) {
293 if (vbox
.CASinplace(inplaceWrite
, new InplaceWrite
<T
>(this.orec
, (value
== null ?
(T
) NULL_VALUE
: value
),
297 inplaceWrite
= vbox
.inplace
;
298 currentOwner
= inplaceWrite
.orec
;
306 throw EXECUTE_SEQUENTIALLY_EXCEPTION
;
311 * Here we ensure that the local array read over ancestors is consistent with concurrent nested commits
312 * This procedure is blocking, accordingly to the support provided to VArrays.
315 protected <T
> T
getLocalArrayValue(VArrayEntry
<T
> entry
) {
316 if (this.arrayWrites
!= EMPTY_MAP
) {
317 VArrayEntry
<T
> wsEntry
= (VArrayEntry
<T
>) this.arrayWrites
.get(entry
);
318 if (wsEntry
!= null) {
319 return (wsEntry
.getWriteValue() == null ?
(T
) NULL_VALUE
: wsEntry
.getWriteValue());
323 ReadWriteTransaction iter
= getRWParent();
324 while (iter
!= null) {
325 synchronized (iter
) {
326 if (iter
.arrayWrites
!= EMPTY_MAP
) {
327 VArrayEntry
<T
> wsEntry
= (VArrayEntry
<T
>) iter
.arrayWrites
.get(entry
);
328 if (wsEntry
== null) {
329 iter
= iter
.getRWParent();
333 if (wsEntry
.nestedVersion
<= retrieveAncestorVersion(iter
)) {
334 this.arraysRead
= this.arraysRead
.cons(entry
);
335 entry
.setReadOwner(iter
);
336 return (wsEntry
.getWriteValue() == null ?
(T
) NULL_VALUE
: wsEntry
.getWriteValue());
338 TransactionSignaller
.SIGNALLER
.signalCommitFail(iter
);
342 iter
= iter
.getRWParent();
349 * Both parallel nested transactions and perTxBoxes may be seen as alternatives to work
350 * around inherently-conflicting workloads. An important question may be posed if we put
351 * them together: when should a perTxBox be committed, if write by a parallel nested
352 * transaction? Is it solving a conflict at top-level, or nested level of parallelism?
354 * Should the need for perTxBoxes arise in parNesting, that question shall have to be addressed.
357 public <T
> T
getPerTxValue(PerTxBox
<T
> box
, T initial
) {
358 throw new RuntimeException("Parallel Nested Transactions do not support PerTxBoxes");
362 public <T
> void setPerTxValue(PerTxBox
<T
> box
, T value
) {
363 throw new RuntimeException("Parallel Nested Transactions do not support PerTxBoxes");
367 protected void finish() {
374 protected void doCommit() {
377 perTxValues
= EMPTY_MAP
;
382 protected void cleanUp() {
383 boxesWrittenInPlace
= null;
385 for (ReadBlock block
: globalReads
) {
387 block
.freeBlocks
.incrementAndGet();
393 protected NestedCommitRecord
helpCommitAll(NestedCommitRecord start
) {
394 NestedCommitRecord lastSeen
= start
;
395 NestedCommitRecord current
= lastSeen
.next
.get();
396 while (current
!= null) {
397 if (!current
.recordCommitted
) {
398 current
.helpCommit();
401 current
= current
.next
.get();
407 protected void tryCommit() {
408 ReadWriteTransaction parent
= getRWParent();
409 NestedCommitRecord lastSeen
;
410 NestedCommitRecord newCommit
;
413 lastSeen
= helpCommitAll(parent
.nestedCommitQueue
);
414 snapshotValidation(lastSeen
.commitNumber
);
415 Cons
<VArrayEntry
<?
>> varrayReadsToPropagate
= validateNestedArrayReads();
416 newCommit
= new NestedCommitRecord(this, this.mergedTxs
, parent
.mergedTxs
, varrayReadsToPropagate
, arrayWrites
, arrayWritesCount
, lastSeen
.commitNumber
+ 1);
417 } while (!lastSeen
.next
.compareAndSet(null, newCommit
));
419 lastSeen
= parent
.nestedCommitQueue
;
420 while ((lastSeen
!= null) && (lastSeen
.commitNumber
<= newCommit
.commitNumber
)) {
421 if (!lastSeen
.recordCommitted
) {
422 lastSeen
.helpCommit();
423 parent
.nestedCommitQueue
= lastSeen
;
425 lastSeen
= lastSeen
.next
.get();
431 protected void snapshotValidation(int lastSeenNumber
) {
432 if (retrieveAncestorVersion(parent
) == lastSeenNumber
) {
436 for (Map
.Entry
<VBox
, InplaceWrite
> read
: nestedReads
.entrySet()) {
437 validateNestedRead(read
);
440 for (ParallelNestedTransaction mergedTx
: mergedTxs
) {
441 for (Map
.Entry
<VBox
, InplaceWrite
> read
: mergedTx
.nestedReads
.entrySet()) {
442 validateNestedRead(read
);
446 if (!this.globalReads
.isEmpty()) {
447 validateGlobalReads(globalReads
, next
);
450 for (ParallelNestedTransaction mergedTx
: mergedTxs
) {
451 if (!mergedTx
.globalReads
.isEmpty()) {
452 validateGlobalReads(mergedTx
.globalReads
, mergedTx
.next
);
459 * Validate a single read that was a read-after-write over some ancestor
460 * write. Iterate over the inplace writes of that VBox: if an entry is found
461 * belonging to an ancestor, it must be the one that it was read, in which
462 * case the search stops.
464 protected void validateNestedRead(Map
.Entry
<VBox
, InplaceWrite
> read
) {
465 InplaceWrite inplaceRead
= read
.getValue();
466 InplaceWrite iter
= read
.getKey().inplace
;
468 if (iter
== inplaceRead
) {
471 int maxVersion
= retrieveAncestorVersion(iter
.orec
.owner
);
472 if (maxVersion
>= 0) {
474 TransactionSignaller
.SIGNALLER
.signalCommitFail(iter
.orec
.owner
);
477 } while (iter
!= null);
481 * Validate a single read that obtained a VBoxBody Iterate over the inplace
482 * writes of that VBox: no entry may be found that belonged to an ancestor
484 protected void validateGlobalReads(Cons
<ReadBlock
> reads
, int startIdx
) {
485 VBox
[] array
= reads
.first().entries
;
486 // the first may not be full
487 for (int i
= startIdx
+ 1; i
< array
.length
; i
++) {
488 InplaceWrite iter
= array
[i
].inplace
;
490 int maxVersion
= retrieveAncestorVersion(iter
.orec
.owner
);
491 if (maxVersion
>= 0) {
493 TransactionSignaller
.SIGNALLER
.signalCommitFail(iter
.orec
.owner
);
496 } while (iter
!= null);
500 for (ReadBlock block
: reads
.rest()) {
501 array
= block
.entries
;
502 for (int i
= 0; i
< array
.length
; i
++) {
503 InplaceWrite iter
= array
[i
].inplace
;
505 int maxVersion
= retrieveAncestorVersion(iter
.orec
.owner
);
506 if (maxVersion
>= 0) {
508 TransactionSignaller
.SIGNALLER
.signalCommitFail(iter
.orec
.owner
);
511 } while (iter
!= null);
516 protected Cons
<VArrayEntry
<?
>> validateNestedArrayReads() {
517 Map
<VArrayEntry
<?
>, VArrayEntry
<?
>> parentArrayWrites
= getRWParent().arrayWrites
;
518 Cons
<VArrayEntry
<?
>> parentArrayReads
= getRWParent().arraysRead
;
519 int maxVersionOnParent
= retrieveAncestorVersion(parent
);
520 for (VArrayEntry
<?
> entry
: arraysRead
) {
522 // If the read was performed on an ancestor of the parent, then
523 // propagate it for further validation
524 if (entry
.owner
!= parent
) {
525 parentArrayReads
= parentArrayReads
.cons(entry
);
528 synchronized (parent
) {
529 if (parentArrayWrites
!= EMPTY_MAP
) {
530 // Verify if the parent contains a more recent write for the
531 // read that we performed somewhere in our ancestors
532 VArrayEntry
<?
> parentWrite
= parentArrayWrites
.get(entry
);
533 if (parentWrite
== null) {
536 if (parentWrite
.nestedVersion
> maxVersionOnParent
) {
537 TransactionSignaller
.SIGNALLER
.signalCommitFail(parent
);
543 return parentArrayReads
;