3 static int reduction_debug
= 0;
5 #define DebugPrintf if (reduction_debug) CmiPrintf
7 void Cpm_megacon_ack(CpmDestination
);
10 char head
[CmiMsgHeaderSizeBytes
];
19 void pupTwoInts(pup_er p
, void *data
) {
20 struct twoInts
*d
= (struct twoInts
*)data
;
21 DebugPrintf("[%d] called pupTwoInts on %p (%d/%d)\n",CmiMyPe(),data
,d
->positive
,d
->negative
);
22 pup_int(p
, &d
->positive
);
23 pup_int(p
, &d
->negative
);
26 void deleteTwoInts(void *data
) {
27 DebugPrintf("[%d] called deleteTwoInts %p (%d/%d)\n",CmiMyPe(),data
,
28 ((struct twoInts
*)data
)->positive
,((struct twoInts
*)data
)->negative
);
32 void *mergeTwoInts(int *size
, void *data
, void **remote
, int count
) {
34 struct twoInts
*local
= (struct twoInts
*)data
;
35 DebugPrintf("[%d] called mergeTwoInts with local(%d/%d), %d remote(",CmiMyPe(),
36 local
->positive
,local
->negative
,count
);
37 for (i
=0; i
<count
; ++i
) {
38 struct twoInts
*d
= (struct twoInts
*)remote
[i
];
39 DebugPrintf("%d/%d,",d
->positive
,d
->negative
);
40 local
->positive
+= d
->positive
;
41 local
->negative
+= d
->negative
;
47 CpvDeclare(int, broadcast_msg_idx
);
48 CpvDeclare(int, reduction_msg_idx
);
49 CpvDeclare(int, broadcast_struct_idx
);
50 CpvDeclare(int, reduction_struct_idx
);
52 void * addMessage(int *size
, void *data
, void **remote
, int count
) {
53 mesg msg
= (mesg
)data
;
55 DebugPrintf("[%d] called addMessage with local(%d), %d remote(",CmiMyPe(),msg
->sum
,count
);
56 for (i
=0; i
<count
; ++i
) {
57 DebugPrintf("%d,",((mesg
)remote
[i
])->sum
);
58 msg
->sum
+= ((mesg
)remote
[i
])->sum
;
64 void reduction_msg(mesg m
) {
66 CmiAssert(CmiMyPe() == 0);
67 for (i
=0; i
<CmiNumPes(); ++i
) sum
+= i
+1;
69 CmiPrintf("Sum not matching: received %d, expecting %d\n", m
->sum
, sum
);
72 CmiSetHandler(m
, CpvAccess(broadcast_struct_idx
));
73 CmiSyncBroadcastAllAndFree(sizeof(struct _mesg
),m
);
76 void broadcast_msg(mesg m
) {
78 CmiSetHandler(m
, CpvAccess(reduction_msg_idx
));
79 CmiReduce(m
, sizeof(struct _mesg
), addMessage
);
82 void reduction_struct(void *data
) {
84 struct twoInts
*two
= (struct twoInts
*)data
;
85 CmiAssert(CmiMyPe() == 0);
86 for (i
=0; i
<CmiNumPes(); ++i
) sum
+= i
+1;
87 if (two
->positive
!= sum
|| two
->negative
!= -2*sum
) {
88 CmiPrintf("Sum not matching: received %d/%d, expecting %d/%d\n",
89 two
->positive
, two
->negative
, sum
, -2*sum
);
92 Cpm_megacon_ack(CpmSend(0));
95 void broadcast_struct(mesg m
) {
96 struct twoInts
*two
= (struct twoInts
*)malloc(sizeof(struct twoInts
));
98 DebugPrintf("[%d] allocated struct %p\n",CmiMyPe(),two
);
99 two
->positive
= CmiMyPe()+1;
100 two
->negative
= -2*(CmiMyPe()+1);
101 CmiReduceStruct(two
, pupTwoInts
, mergeTwoInts
, reduction_struct
, deleteTwoInts
);
104 void reduction_init(void)
106 mesg msg
= (mesg
)CmiAlloc(sizeof(struct _mesg
));
107 CmiSetHandler(msg
, CpvAccess(broadcast_msg_idx
));
108 CmiSyncBroadcastAllAndFree(sizeof(struct _mesg
),msg
);
111 void reduction_moduleinit()
113 CpvInitialize(int, broadcast_msg_idx
);
114 CpvInitialize(int, reduction_msg_idx
);
115 CpvInitialize(int, broadcast_struct_idx
);
116 CpvAccess(broadcast_msg_idx
) = CmiRegisterHandler((CmiHandler
)broadcast_msg
);
117 CpvAccess(reduction_msg_idx
) = CmiRegisterHandler((CmiHandler
)reduction_msg
);
118 CpvAccess(broadcast_struct_idx
) = CmiRegisterHandler((CmiHandler
)broadcast_struct
);