fix: change `fms_diag_accept_data` into a subroutine (#1610)
[FMS.git] / mpp / mpp_efp.F90
blobcdfc7728bd10558ddeab1925609545e2b8ec7d5b
1 !***********************************************************************
2 !*                   GNU Lesser General Public License
3 !*
4 !* This file is part of the GFDL Flexible Modeling System (FMS).
5 !*
6 !* FMS is free software: you can redistribute it and/or modify it under
7 !* the terms of the GNU Lesser General Public License as published by
8 !* the Free Software Foundation, either version 3 of the License, or (at
9 !* your option) any later version.
11 !* FMS is distributed in the hope that it will be useful, but WITHOUT
12 !* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 !* FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 !* for more details.
16 !* You should have received a copy of the GNU Lesser General Public
17 !* License along with FMS.  If not, see <http://www.gnu.org/licenses/>.
18 !***********************************************************************
19 !> @defgroup mpp_efp_mod mpp_efp_mod
20 !> @ingroup mpp
21 !> @brief This module provides interfaces to the non-domain-oriented communication
22 !! subroutines.
24 !> Mainly includes interfaces and type definitions for reproducing operations with extended
25 !! fixed point data.
27 !> @addtogroup mpp_efp_mod
28 !> @{
29 module mpp_efp_mod
31 use mpp_mod, only : mpp_error, FATAL, WARNING, NOTE
32 use mpp_mod, only : mpp_pe, mpp_root_pe, mpp_npes
33 use mpp_mod, only : mpp_sum
34 use platform_mod
36 implicit none ; private
38 public :: mpp_reproducing_sum, mpp_efp_list_sum_across_PEs
39 public :: mpp_efp_plus, mpp_efp_minus, mpp_efp_to_real, mpp_real_to_efp, mpp_efp_real_diff
40 public :: operator(+), operator(-), assignment(=)
41 public :: mpp_query_efp_overflow_error, mpp_reset_efp_overflow_error
43 integer, parameter :: NUMBIT = 46  !< number of bits used in the 64-bit signed integer representation.
44 integer, parameter :: NUMINT = 6   !< The number of long integers to use to represent
45                                    !! a real number.
47 integer(i8_kind), parameter :: prec=2_8**NUMBIT !< The precision of each integer.
48 real(r8_kind), parameter :: r_prec=2.0_8**NUMBIT !< A real version of prec.
49 real(r8_kind), parameter :: I_prec=1.0_8/(2.0_8**NUMBIT) !< The inverse of prec.
50 integer, parameter :: max_count_prec=2**(63-NUMBIT)-1  !< The number of values that can be added together
51                               !! with the current value of prec before there will
52                               !! be roundoff problems.
54 real(r8_kind), parameter, dimension(NUMINT) :: &
55   pr = (/ r_prec**2, r_prec, 1.0_8, 1.0_8/r_prec, 1.0_8/r_prec**2, 1.0_8/r_prec**3 /)
56 real(r8_kind), parameter, dimension(NUMINT) :: &
57   I_pr = (/ 1.0_8/r_prec**2, 1.0_8/r_prec, 1.0_8, r_prec, r_prec**2, r_prec**3 /)
59 logical :: overflow_error = .false., NaN_error = .false.
60 logical :: debug = .false.    !< Making this true enables debugging output.
62 !> @}
64 !> This interface uses a conversion to an integer representation
65 !! of real numbers to give order-invariant sums that will reproduce
66 !! across PE count.
68 !! This idea comes from R. Hallberg and A. Adcroft.
70 !> @ingroup mpp_efp_mod
71 interface mpp_reproducing_sum
72   module procedure mpp_reproducing_sum_r8_2d
73   module procedure mpp_reproducing_sum_r8_3d
74   module procedure mpp_reproducing_sum_r4_2d
75 end interface mpp_reproducing_sum
77 !> The Extended Fixed Point (mpp_efp) type provides a public interface for doing
78 !! sums and taking differences with this type.
79 !> @ingroup mpp_efp_mod
80 type, public :: mpp_efp_type
81   private
82   integer(i8_kind), dimension(NUMINT) :: v
83 end type mpp_efp_type
86 !> Operator override interface for mpp_efp_type
87 !> @ingroup mpp_efp_mod
88 interface operator (+); module procedure mpp_efp_plus  ; end interface
89 !> Operator override interface for mpp_efp_type
90 !> @ingroup mpp_efp_mod
91 interface operator (-); module procedure mpp_efp_minus ; end interface
92 !> Assignment override interface for mpp_efp_type
93 !> @ingroup mpp_efp_mod
94 interface assignment(=); module procedure mpp_efp_assign ; end interface
96 !> @addtogroup mpp_efp_mod
97 !> @{
99 contains
101 !> @brief Calculates a reproducing sum for a 2D, 8-byte real array
102 function mpp_reproducing_sum_r8_2d(array, isr, ier, jsr, jer, EFP_sum, reproducing, &
103                             overflow_check, err) result(sum)
104   real(r8_kind), dimension(:,:), intent(in) :: array
105   integer,        optional,          intent(in) :: isr, ier, jsr, jer
106   type(mpp_efp_type), optional,     intent(out) :: EFP_sum
107   logical,        optional,          intent(in) :: reproducing
108   logical,        optional,          intent(in) :: overflow_check
109   integer,        optional,         intent(out) :: err
110   real(r8_kind)                             :: sum  !< Result
112   integer(i8_kind), dimension(NUMINT)  :: ints_sum
113   integer(i8_kind) :: ival, prec_error
114   real(r8_kind)  :: rsum(1), rs
115   real(r8_kind)  :: max_mag_term
116   logical :: repro, over_check
117   character(len=256) :: mesg
118   integer :: i, j, n, is, ie, js, je, sgn
120   if (mpp_npes() > max_count_prec) call mpp_error(FATAL, &
121     "mpp_reproducing_sum: Too many processors are being used for the value of "//&
122     "prec.  Reduce prec to (2^63-1)/mpp_npes.")
124   prec_error = (2_8**62 + (2_8**62 - 1)) / mpp_npes()
126   is = 1 ; ie = size(array,1) ; js = 1 ; je = size(array,2 )
127   if (present(isr)) then
128     if (isr < is) call mpp_error(FATAL, &
129       "Value of isr too small in mpp_reproducing_sum_2d.")
130     is = isr
131   endif
132   if (present(ier)) then
133     if (ier > ie) call mpp_error(FATAL, &
134       "Value of ier too large in mpp_reproducing_sum_2d.")
135     ie = ier
136   endif
137   if (present(jsr)) then
138     if (jsr < js) call mpp_error(FATAL, &
139       "Value of jsr too small in mpp_reproducing_sum_2d.")
140     js = jsr
141   endif
142   if (present(jer)) then
143     if (jer > je) call mpp_error(FATAL, &
144       "Value of jer too large in mpp_reproducing_sum_2d.")
145     je = jer
146   endif
148   repro = .true. ; if (present(reproducing)) repro = reproducing
149   over_check = .true. ; if (present(overflow_check)) over_check = overflow_check
151   if (repro) then
152     overflow_error = .false. ; NaN_error = .false. ; max_mag_term = 0.0
153     ints_sum(:) = 0
154     if (over_check) then
155       if ((je+1-js)*(ie+1-is) < max_count_prec) then
156         do j=js,je ; do i=is,ie
157           call increment_ints_faster(ints_sum, array(i,j), max_mag_term);
158         enddo ; enddo
159         call carry_overflow(ints_sum, prec_error)
160       elseif ((ie+1-is) < max_count_prec) then
161         do j=js,je
162           do i=is,ie
163             call increment_ints_faster(ints_sum, array(i,j), max_mag_term);
164           enddo
165           call carry_overflow(ints_sum, prec_error)
166         enddo
167       else
168         do j=js,je ; do i=is,ie
169           call increment_ints(ints_sum, real_to_ints(array(i,j), prec_error), &
170                               prec_error);
171         enddo ; enddo
172       endif
173     else
174       do j=js,je ; do i=is,ie
175         sgn = 1 ; if (array(i,j)<0.0) sgn = -1
176         rs = abs(array(i,j))
177         do n=1,NUMINT
178           ival = int(rs*I_pr(n), 8)
179           rs = rs - ival*pr(n)
180           ints_sum(n) = ints_sum(n) + sgn*ival
181         enddo
182       enddo ; enddo
183       call carry_overflow(ints_sum, prec_error)
184     endif
186     if (present(err)) then
187       err = 0
188       if (overflow_error) &
189         err = err+2
190       if (NaN_error) &
191         err = err+4
192       if (err > 0) then ; do n=1,NUMINT ; ints_sum(n) = 0 ; enddo ; endif
193     else
194       if (NaN_error) then
195         call mpp_error(FATAL, "NaN in input field of mpp_reproducing_sum(_2d), this indicates numerical instability")
196       endif
197       if (abs(max_mag_term) >= prec_error*pr(1)) then
198         write(mesg, '(ES13.5)') max_mag_term
199         call mpp_error(FATAL,"Overflow in mpp_reproducing_sum(_2d) conversion of "//trim(mesg))
200       endif
201       if (overflow_error) then
202         call mpp_error(FATAL, "Overflow in mpp_reproducing_sum(_2d).")
203       endif
204     endif
206     call mpp_sum(ints_sum, NUMINT)
208     call regularize_ints(ints_sum)
209     sum = ints_to_real(ints_sum)
210   else
211     rsum(1) = 0.0
212     do j=js,je ; do i=is,ie
213       rsum(1) = rsum(1) + array(i,j);
214     enddo ; enddo
215     call mpp_sum(rsum,1)
216     sum = rsum(1)
218     if (present(err)) then ; err = 0 ; endif
220     if (debug .or. present(EFP_sum)) then
221       overflow_error = .false.
222       ints_sum = real_to_ints(sum, prec_error, overflow_error)
223       if (overflow_error) then
224         if (present(err)) then
225           err = err + 2
226         else
227           write(mesg, '(ES13.5)') sum
228           call mpp_error(FATAL,"Repro_sum_2d: Overflow in real_to_ints conversion of "//trim(mesg))
229         endif
230       endif
231     endif
232   endif
234   if (present(EFP_sum)) EFP_sum%v(:) = ints_sum(:)
236   if (debug) then
237     write(mesg,'("2d RS: ", ES24.16, 6 Z17.16)') sum, ints_sum(1:NUMINT)
238     if(mpp_pe() == mpp_root_pe()) call mpp_error(NOTE, mesg)
239   endif
241 end function mpp_reproducing_sum_r8_2d
243 function mpp_reproducing_sum_r4_2d(array, isr, ier, jsr, jer, EFP_sum, reproducing, &
244                             overflow_check, err) result(sum)
245   real(r4_kind), dimension(:,:), intent(in) :: array
246   integer,        optional,          intent(in) :: isr, ier, jsr, jer
247   type(mpp_efp_type), optional,     intent(out) :: EFP_sum
248   logical,        optional,          intent(in) :: reproducing
249   logical,        optional,          intent(in) :: overflow_check
250   integer,        optional,         intent(out) :: err
251   real(r4_kind)                             :: sum  !< Result
253   real(r8_kind) :: array_r8(size(array,1), size(array,2))
255   array_r8 = array
257   sum = real(mpp_reproducing_sum_r8_2d(array_r8, isr, ier, jsr, jer, EFP_sum, reproducing, &
258                             overflow_check, err), r4_kind)
260   return
262 end function mpp_reproducing_sum_r4_2d
264 !> @brief Reproducing sum for 3d arrays of 8-bit reals
266 !> This function uses a conversion to an integer representation
267 !! of real numbers to give order-invariant sums that will reproduce
268 !! across PE count.  This idea comes from R. Hallberg and A. Adcroft.
269 function mpp_reproducing_sum_r8_3d(array, isr, ier, jsr, jer, sums, EFP_sum, err) &
270                             result(sum)
271   real(r8_kind), dimension(:,:,:),        intent(in) :: array
272   integer,    optional,                       intent(in) :: isr, ier, jsr, jer
273   real(r8_kind), dimension(:), optional, intent(out) :: sums
274   type(mpp_efp_type),     optional,          intent(out) :: EFP_sum
275   integer,            optional,              intent(out) :: err
276   real(r8_kind)                                      :: sum  !< Result
278   real(r8_kind)    :: max_mag_term
279   integer(i8_kind), dimension(NUMINT)  :: ints_sum
280   integer(i8_kind), dimension(NUMINT,size(array,3))  :: ints_sums
281   integer(i8_kind) :: prec_error
282   character(len=256) :: mesg
283   integer :: i, j, k, is, ie, js, je, ke, isz, jsz, n
285   if (mpp_npes() > max_count_prec) call mpp_error(FATAL, &
286     "mpp_reproducing_sum: Too many processors are being used for the value of "//&
287     "prec.  Reduce prec to (2^63-1)/mpp_npes.")
289   prec_error = (2_8**62 + (2_8**62 - 1)) / mpp_npes()
290   max_mag_term = 0.0
292   is = 1 ; ie = size(array,1) ; js = 1 ; je = size(array,2) ; ke = size(array,3)
293   if (present(isr)) then
294     if (isr < is) call mpp_error(FATAL, &
295       "Value of isr too small in mpp_reproducing_sum(_3d).")
296     is = isr
297   endif
298   if (present(ier)) then
299     if (ier > ie) call mpp_error(FATAL, &
300       "Value of ier too large in mpp_reproducing_sum(_3d).")
301     ie = ier
302   endif
303   if (present(jsr)) then
304     if (jsr < js) call mpp_error(FATAL, &
305       "Value of jsr too small in mpp_reproducing_sum(_3d).")
306     js = jsr
307   endif
308   if (present(jer)) then
309     if (jer > je) call mpp_error(FATAL, &
310       "Value of jer too large in mpp_reproducing_sum(_3d).")
311     je = jer
312   endif
313   jsz = je+1-js; isz = ie+1-is
315   if (present(sums)) then
316     if (size(sums) > ke) call mpp_error(FATAL, "Sums is smaller than "//&
317       "the vertical extent of array in mpp_reproducing_sum(_3d).")
318     ints_sums(:,:) = 0
319     overflow_error = .false. ; NaN_error = .false. ; max_mag_term = 0.0
320     if (jsz*isz < max_count_prec) then
321       do k=1,ke
322         do j=js,je ; do i=is,ie
323           call increment_ints_faster(ints_sums(:,k), array(i,j,k), max_mag_term);
324         enddo ; enddo
325         call carry_overflow(ints_sums(:,k), prec_error)
326       enddo
327     elseif (isz < max_count_prec) then
328       do k=1,ke ; do j=js,je
329         do i=is,ie
330           call increment_ints_faster(ints_sums(:,k), array(i,j,k), max_mag_term);
331         enddo
332         call carry_overflow(ints_sums(:,k), prec_error)
333       enddo ; enddo
334     else
335       do k=1,ke ; do j=js,je ; do i=is,ie
336         call increment_ints(ints_sums(:,k), &
337                             real_to_ints(array(i,j,k), prec_error), prec_error);
338       enddo ; enddo ; enddo
339     endif
340     if (present(err)) then
341       err = 0
342       if (abs(max_mag_term) >= prec_error*pr(1)) err = err+1
343       if (overflow_error) err = err+2
344       if (NaN_error) err = err+2
345       if (err > 0) then ; do k=1,ke ; do n=1,NUMINT ; ints_sums(n,k) = 0 ; enddo ; enddo ; endif
346     else
347       if (NaN_error) call mpp_error(FATAL, &
348              "NaN in input field of mpp_reproducing_sum(_3d), this indicates numerical instability")
349       if (abs(max_mag_term) >= prec_error*pr(1)) then
350         write(mesg, '(ES13.5)') max_mag_term
351         call mpp_error(FATAL,"Overflow in mpp_reproducing_sum(_3d) conversion of "//trim(mesg))
352       endif
353       if (overflow_error) call mpp_error(FATAL, "Overflow in mpp_reproducing_sum(_3d).")
354     endif
356     call mpp_sum(ints_sums(:,1:ke), NUMINT*ke)
358     sum = 0.0
359     do k=1,ke
360       call regularize_ints(ints_sums(:,k))
361       sums(k) = ints_to_real(ints_sums(:,k))
362       sum = sum + sums(k)
363     enddo
365     if (present(EFP_sum)) then
366       EFP_sum%v(:) = 0
367       do k=1,ke ; call increment_ints(EFP_sum%v(:), ints_sums(:,k)) ; enddo
368     endif
370     if (debug) then
371       do n=1,NUMINT ; ints_sum(n) = 0 ; enddo
372       do k=1,ke ; do n=1,NUMINT ; ints_sum(n) = ints_sum(n) + ints_sums(n,k) ; enddo ; enddo
373       write(mesg,'("3D RS: ", ES24.16, 6 Z17.16)') sum, ints_sum(1:NUMINT)
374       if(mpp_pe()==mpp_root_pe()) call mpp_error(NOTE, mesg)
375     endif
376   else
377     ints_sum(:) = 0
378     overflow_error = .false. ; NaN_error = .false. ; max_mag_term = 0.0
379     if (jsz*isz < max_count_prec) then
380       do k=1,ke
381         do j=js,je ; do i=is,ie
382           call increment_ints_faster(ints_sum, array(i,j,k), max_mag_term);
383         enddo ; enddo
384         call carry_overflow(ints_sum, prec_error)
385       enddo
386     elseif (isz < max_count_prec) then
387       do k=1,ke ; do j=js,je
388         do i=is,ie
389           call increment_ints_faster(ints_sum, array(i,j,k), max_mag_term);
390         enddo
391         call carry_overflow(ints_sum, prec_error)
392       enddo ; enddo
393     else
394       do k=1,ke ; do j=js,je ; do i=is,ie
395         call increment_ints(ints_sum, real_to_ints(array(i,j,k), prec_error), &
396                             prec_error);
397       enddo ; enddo ; enddo
398     endif
399     if (present(err)) then
400       err = 0
401       if (abs(max_mag_term) >= prec_error*pr(1)) err = err+1
402       if (overflow_error) err = err+2
403       if (NaN_error) err = err+2
404       if (err > 0) then ; do n=1,NUMINT ; ints_sum(n) = 0 ; enddo ; endif
405     else
406       if (NaN_error) call mpp_error(FATAL, &
407           "NaN in input field of mpp_reproducing_sum(_3d), this indicates numerical instability")
408       if (abs(max_mag_term) >= prec_error*pr(1)) then
409         write(mesg, '(ES13.5)') max_mag_term
410         call mpp_error(FATAL,"Overflow in mpp_reproducing_sum(_3d) conversion of "//trim(mesg))
411       endif
412       if (overflow_error) call mpp_error(FATAL, "Overflow in mpp_reproducing_sum(_3d).")
413     endif
415     call mpp_sum(ints_sum, NUMINT)
417     call regularize_ints(ints_sum)
418     sum = ints_to_real(ints_sum)
420     if (present(EFP_sum)) EFP_sum%v(:) = ints_sum(:)
422     if (debug) then
423       write(mesg,'("3d RS: ", ES24.16, 6 Z17.16)') sum, ints_sum(1:NUMINT)
424       if(mpp_pe()==mpp_root_pe()) call mpp_error(NOTE, mesg)
425     endif
426   endif
428 end function mpp_reproducing_sum_r8_3d
430 !> @brief This function converts a real number to an equivalent representation
431 !! using several long integers.
432 function real_to_ints(r, prec_error, overflow) result(ints)
433   real(r8_kind),            intent(in) :: r
434   integer(i8_kind), optional, intent(in) :: prec_error
435   logical,   optional,       intent(inout) :: overflow
436   integer(i8_kind),    dimension(NUMINT) :: ints
438   real(r8_kind) :: rs
439   character(len=80) :: mesg
440   integer(i8_kind) :: ival, prec_err
441   integer :: sgn, i
443   prec_err = prec ; if (present(prec_error)) prec_err = prec_error
444   ints(:) = 0_8
445   if ((r >= 1e30) .eqv. (r < 1e30)) then ; NaN_error = .true. ; return ; endif
447   sgn = 1 ; if (r<0.0) sgn = -1
448   rs = abs(r)
450   if (present(overflow)) then
451     if (.not.(rs < prec_err*pr(1))) overflow = .true.
452     if ((r >= 1e30) .eqv. (r < 1e30)) overflow = .true.
453   elseif (.not.(rs < prec_err*pr(1))) then
454     write(mesg, '(ES13.5)') r
455     call mpp_error(FATAL,"Overflow in real_to_ints conversion of "//trim(mesg))
456   endif
458   do i=1,NUMINT
459     ival = int(rs*I_pr(i), 8)
460     rs = rs - ival*pr(i)
461     ints(i) = sgn*ival
462   enddo
464 end function real_to_ints
466 !> @brief This function reverses the conversion in real_to_ints.
467 function ints_to_real(ints) result(r)
468   integer(i8_kind), dimension(NUMINT), intent(in) :: ints
469   real(r8_kind) :: r
471   integer :: i
473   r = 0.0
474   do i=1,NUMINT ; r = r + pr(i)*ints(i) ; enddo
475 end function ints_to_real
477 !> @brief This subroutine increments a number with another, both using the integer
478 !! representation in real_to_ints.
479 subroutine increment_ints(int_sum, int2, prec_error)
480   integer(i8_kind), dimension(NUMINT), intent(inout) :: int_sum
481   integer(i8_kind), dimension(NUMINT), intent(in)    :: int2
482   integer(i8_kind), optional,      intent(in)    :: prec_error
484   integer :: i
486   do i=NUMINT,2,-1
487     int_sum(i) = int_sum(i) + int2(i)
488     ! Carry the local overflow.
489     if (int_sum(i) > prec) then
490       int_sum(i) = int_sum(i) - prec
491       int_sum(i-1) = int_sum(i-1) + 1
492     elseif (int_sum(i) < -prec) then
493       int_sum(i) = int_sum(i) + prec
494       int_sum(i-1) = int_sum(i-1) - 1
495     endif
496   enddo
497   int_sum(1) = int_sum(1) + int2(1)
498   if (present(prec_error)) then
499     if (abs(int_sum(1)) > prec_error) overflow_error = .true.
500   else
501     if (abs(int_sum(1)) > prec) overflow_error = .true.
502   endif
504 end subroutine increment_ints
506 !> @brief This subroutine increments a number with another, both using the integer
507 !! representation in real_to_ints, but without doing any carrying of overflow.
508 !! The entire operation is embedded in a single call for greater speed.
509 subroutine increment_ints_faster(int_sum, r, max_mag_term)
510   integer(i8_kind), dimension(NUMINT), intent(inout) :: int_sum
511   real(r8_kind),                        intent(in) :: r
512   real(r8_kind),                     intent(inout) :: max_mag_term
514   real(r8_kind) :: rs
515   integer(i8_kind) :: ival
516   integer :: sgn, i
518   if ((r >= 1e30) .eqv. (r < 1e30)) then ; NaN_error = .true. ; return ; endif
519   sgn = 1 ; if (r<0.0) sgn = -1
520   rs = abs(r)
521   if (rs > abs(max_mag_term)) max_mag_term = r
523   do i=1,NUMINT
524     ival = int(rs*I_pr(i), 8)
525     rs = rs - ival*pr(i)
526     int_sum(i) = int_sum(i) + sgn*ival
527   enddo
529 end subroutine increment_ints_faster
531 !> @brief This subroutine handles carrying of the overflow.
532 subroutine carry_overflow(int_sum, prec_error)
533   integer(i8_kind), dimension(NUMINT), intent(inout) :: int_sum
534   integer(i8_kind),                intent(in)    :: prec_error
536   integer :: i, num_carry
538   do i=NUMINT,2,-1 ; if (abs(int_sum(i)) > prec) then
539     num_carry = int(int_sum(i) * I_prec)
540     int_sum(i) = int_sum(i) - num_carry*prec
541     int_sum(i-1) = int_sum(i-1) + num_carry
542   endif ; enddo
543   if (abs(int_sum(1)) > prec_error) then
544     overflow_error = .true.
545   endif
547 end subroutine carry_overflow
549 !> @brief This subroutine carries the overflow, and then makes sure that
550 !! all integers are of the same sign as the overall value.
551 subroutine regularize_ints(int_sum)
552   integer(i8_kind), dimension(NUMINT), intent(inout) :: int_sum
554   logical :: positive
555   integer :: i, num_carry
557   do i=NUMINT,2,-1 ; if (abs(int_sum(i)) > prec) then
558     num_carry = int(int_sum(i) * I_prec)
559     int_sum(i) = int_sum(i) - num_carry*prec
560     int_sum(i-1) = int_sum(i-1) + num_carry
561   endif ; enddo
563   ! Determine the sign of the final number.
564   positive = .true.
565   do i=1,NUMINT
566     if (abs(int_sum(i)) > 0) then
567       if (int_sum(i) < 0) positive = .false.
568       exit
569     endif
570   enddo
572   if (positive) then
573     do i=NUMINT,2,-1 ; if (int_sum(i) < 0) then
574       int_sum(i) = int_sum(i) + prec
575       int_sum(i-1) = int_sum(i-1) - 1
576     endif ; enddo
577   else
578     do i=NUMINT,2,-1 ; if (int_sum(i) > 0) then
579       int_sum(i) = int_sum(i) - prec
580       int_sum(i-1) = int_sum(i-1) + 1
581     endif ; enddo
582   endif
584 end subroutine regularize_ints
586 function mpp_query_efp_overflow_error()
587   logical :: mpp_query_efp_overflow_error
588   mpp_query_efp_overflow_error = overflow_error
589 end function mpp_query_efp_overflow_error
591 subroutine mpp_reset_efp_overflow_error()
592   overflow_error = .false.
593 end subroutine mpp_reset_efp_overflow_error
595 function mpp_efp_plus(EFP1, EFP2)
596   type(mpp_efp_type)             :: mpp_efp_plus
597   type(mpp_efp_type), intent(in) :: EFP1, EFP2
599   mpp_efp_plus = EFP1
601   call increment_ints(mpp_efp_plus%v(:), EFP2%v(:))
602 end function mpp_efp_plus
604 function mpp_efp_minus(EFP1, EFP2)
605   type(mpp_efp_type)             :: mpp_efp_minus
606   type(mpp_efp_type), intent(in) :: EFP1, EFP2
607   integer :: i
609   do i=1,NUMINT ; mpp_efp_minus%v(i) = -1*EFP2%v(i) ; enddo
611   call increment_ints(mpp_efp_minus%v(:), EFP1%v(:))
612 end function mpp_efp_minus
614 !> @brief This subroutine assigns all components of the extended fixed point type
615 !! variable on the RHS (EFP2) to the components of the variable on the LHS
616 !! (EFP1).
617 subroutine mpp_efp_assign(EFP1, EFP2)
618   type(mpp_efp_type), intent(out) :: EFP1
619   type(mpp_efp_type), intent(in)  :: EFP2
620   integer i
622   do i=1,NUMINT ; EFP1%v(i) = EFP2%v(i) ; enddo
623 end subroutine mpp_efp_assign
625 function mpp_efp_to_real(EFP1)
626   type(mpp_efp_type), intent(inout) :: EFP1
627   real(r8_kind) :: mpp_efp_to_real
629   call regularize_ints(EFP1%v)
630   mpp_efp_to_real = ints_to_real(EFP1%v)
631 end function mpp_efp_to_real
633 function mpp_efp_real_diff(EFP1, EFP2)
634   type(mpp_efp_type), intent(in) :: EFP1, EFP2
635   real(r8_kind) :: mpp_efp_real_diff
637   type(mpp_efp_type)             :: EFP_diff
639   EFP_diff = EFP1 - EFP2
640   mpp_efp_real_diff = mpp_efp_to_real(EFP_diff)
642 end function mpp_efp_real_diff
644 function mpp_real_to_efp(val, overflow)
645   real(r8_kind),    intent(in) :: val
646   logical, optional, intent(inout) :: overflow
647   type(mpp_efp_type)               :: mpp_real_to_efp
649   logical :: over
650   character(len=80) :: mesg
652   if (present(overflow)) then
653     mpp_real_to_efp%v(:) = real_to_ints(val, overflow=overflow)
654   else
655     over = .false.
656     mpp_real_to_efp%v(:) = real_to_ints(val, overflow=over)
657     if (over) then
658       write(mesg, '(ES13.5)') val
659       call mpp_error(FATAL,"Overflow in mpp_real_to_efp conversion of "//trim(mesg))
660     endif
661   endif
663 end function mpp_real_to_efp
665 !> This subroutine does a sum across PEs of a list of EFP variables,
666 !! returning the sums in place, with all overflows carried.
667 subroutine mpp_efp_list_sum_across_PEs(EFPs, nval, errors)
668   type(mpp_efp_type), dimension(:), intent(inout) :: EFPs
669   integer, intent(in) :: nval
670   logical, dimension(:), optional, intent(out) :: errors
672   integer(i8_kind), dimension(NUMINT,nval) :: ints
673   integer(i8_kind) :: prec_error
674   logical :: error_found
675   character(len=256) :: mesg
676   integer :: i, n
678   if (mpp_npes() > max_count_prec) call mpp_error(FATAL, &
679     "mpp_efp_list_sum_across_PEs: Too many processors are being used for the value of "//&
680     "prec.  Reduce prec to (2^63-1)/mpp_npes.")
682   prec_error = (2_8**62 + (2_8**62 - 1)) / mpp_npes()
683   ! overflow_error is an overflow error flag for the whole module.
684   overflow_error = .false. ; error_found = .false.
686   do i=1,nval ; do n=1,NUMINT ; ints(n,i) = EFPs(i)%v(n) ; enddo ; enddo
688   call mpp_sum(ints(:,:), NUMINT*nval)
690   if (present(errors)) errors(:) = .false.
691   do i=1,nval
692     overflow_error = .false.
693     call carry_overflow(ints(:,i), prec_error)
694     do n=1,NUMINT ; EFPs(i)%v(n) = ints(n,i) ; enddo
695     if (present(errors)) errors(i) = overflow_error
696     if (overflow_error) then
697       write (mesg,'("mpp_efp_list_sum_across_PEs error at ",i6," val was ",ES13.6, ", prec_error = ",ES13.6)') &
698              i, mpp_efp_to_real(EFPs(i)), real(prec_error)
699       if(mpp_pe()==mpp_root_pe()) call mpp_error(WARNING, mesg)
700     endif
701     error_found = error_found .or. overflow_error
702   enddo
703   if (error_found .and. .not.(present(errors))) then
704     call mpp_error(FATAL, "Overflow in mpp_efp_list_sum_across_PEs.")
705   endif
707 end subroutine mpp_efp_list_sum_across_PEs
709 end module mpp_efp_mod
710 !> @}
711 ! close documentation grouping