Merge remote-tracking branch 'origin/release-v4.6.1'
[WRF.git] / hydro / MPP / module_mpp_ReachLS.F90
blobef027c1c338437fb2f8fce7873bbff10b9e578b3
1 !  Program Name:
2 !  Author(s)/Contact(s):
3 !  Abstract:
4 !  History Log:
6 !  Usage:
7 !  Parameters: <Specify typical arguments passed>
8 !  Input Files:
9 !        <list file names and briefly describe the data they include>
10 !  Output Files:
11 !        <list file names and briefly describe the information they include>
13 !  Condition codes:
14 !        <list exit condition or error codes returned >
15 !        If appropriate, descriptive troubleshooting instructions or
16 !        likely causes for failures could be mentioned here with the
17 !        appropriate error code
19 !  User controllable options: <if applicable>
21 !   This is used as a coupler with the WRF model.
22 MODULE MODULE_mpp_ReachLS
24   use module_mpp_land, only:  io_id, my_id, mpp_status, mpp_land_max_int1, mpp_land_sync, HYDRO_COMM_WORLD
25   use hashtable
26   use mpi
27   implicit none
30   TYPE Grid2ReachMap
31       real,allocatable, dimension(:) :: sv
32       real,allocatable, dimension(:) :: rv
33       real,allocatable, dimension(:) :: rvId
34       real,allocatable, dimension(:) :: snId
35   end TYPE Grid2ReachMap
37   interface ReachLS_decomp
38      module procedure ReachLS_decompReal
39      module procedure ReachLS_decompReal8
40      module procedure ReachLS_decompInt
41      module procedure ReachLS_decompInt8
42      module procedure ReachLS_decompChar
43   end interface
45   interface ReachLS_write_io
46      module procedure ReachLS_wReal
47      module procedure ReachLS_wReal2
48      module procedure ReachLS_wReal8
49      module procedure ReachLS_wInt
50      module procedure ReachLS_wInt2
51      module procedure ReachLS_wInt8
52      module procedure ReachLS_wChar
53   end interface
55   interface gBcastValue
56      module procedure gbcastReal
57      module procedure gbcastInt
58      module procedure gbcastReal2
59      module procedure gbcastInt8
60   end interface
62   interface updateLinkV
63      module procedure updateLinkV8_mem
64      module procedure updateLinkV4_mem
65   end interface
70   integer,allocatable,dimension(:) :: sDataRec  ! sending data size
71   integer,allocatable,dimension(:) :: rDataRec  ! receiving data size
72   integer,allocatable,dimension(:) :: linkls_s  ! receiving data size
73   integer,allocatable,dimension(:) :: linkls_e  ! receiving data size
74   integer,allocatable,dimension(:) :: ToInd  ! size of toInd
76   integer ::  numprocs
77   integer, allocatable, dimension(:) :: LLINKIDINDX, aLinksl
78   integer :: LLINKLEN, gNlinksl, tmpnlinksl, l_nlinksl, max_nlinkSL
80   contains
83   subroutine updateLinkV8_mem(LinkV, outV)
84 ! for big memory data
85      implicit none
86      real, dimension(:) :: outV
87      real*8, dimension(:) :: LinkV
88      real, allocatable, dimension(:) :: gLinkV_r4
89      real*8, allocatable,dimension(:) ::  tmpBuf, gLinkV_r8
90      integer :: ierr, i, tag, k,m,lsize
91      integer, allocatable,dimension(:) :: lindex
92      if(my_id .eq. io_id) then
93            allocate(gLinkV_r4(gnlinksl))
94            allocate(gLinkV_r8(gnlinksl))
95            gLinkV_r4 = 0.0
96            gLinkV_r8 = 0.0
97            do i = 1, LLINKLEN
98                gLinkV_r8(LLINKIDINDX(i)) = LinkV(i)
99            end do
100      endif
102      if(my_id .ne. IO_id) then
104           tag = 101
105           call mpi_send(LLINKLEN,1,MPI_INTEGER, IO_id,     &
106                 tag,HYDRO_COMM_WORLD,ierr)
107           if(LLINKLEN .gt. 0) then
108               tag = 102
109               call mpi_send(LLINKIDINDX,LLINKLEN,MPI_INTEGER, IO_id,     &
110                     tag,HYDRO_COMM_WORLD,ierr)
111               tag = 103
112               call mpi_send(LinkV,LLINKLEN,MPI_DOUBLE_PRECISION, IO_id,     &
113                    tag,HYDRO_COMM_WORLD,ierr)
114           endif
115       else
116           do i = 0, numprocs - 1
117             if(i .ne. IO_id) then
118                 tag = 101
119                 call mpi_recv(lsize,1,MPI_INTEGER, i,     &
120                             tag,HYDRO_COMM_WORLD,mpp_status,ierr)
121                 if(lsize .gt. 0) then
122                       allocate(lindex(lsize) )
123                       allocate(tmpBuf(lsize) )
124                       tag = 102
125                       call mpi_recv(lindex,lsize,MPI_INTEGER, i,     &
126                             tag,HYDRO_COMM_WORLD,mpp_status,ierr)
127                       tag = 103
128                       call mpi_recv(tmpBuf,lsize,&
129                             MPI_DOUBLE_PRECISION,i,tag,HYDRO_COMM_WORLD,mpp_status,ierr)
130                       do k = 1, lsize
131                           gLinkV_r8(lindex(k)) = gLinkV_r8(lindex(k)) + tmpBuf(k)
132                       end do
133                       if(allocated(lindex)) deallocate(lindex)
134                       if(allocated(tmpBuf)) deallocate(tmpBuf)
135                endif
136             end if
137           end do
138           gLinkV_r4 = gLinkV_r8
139           if(allocated(gLinkV_r8)) deallocate(gLinkV_r8)
140       end if
142       call ReachLS_decompReal(gLinkV_r4,outV)
144       if(my_id .eq. io_id) then
145          if(allocated(gLinkV_r4))  deallocate(gLinkV_r4)
146       endif
147   end subroutine updateLinkV8_mem
149   subroutine updateLinkV4_mem(LinkV, outV)
150 ! for big memory data
151      implicit none
152      real, dimension(:) :: outV
153      real, dimension(:) :: LinkV
154      real, allocatable, dimension(:) :: gLinkV_r4
155      real, allocatable,dimension(:) ::  tmpBuf
156      integer :: ierr, i, tag, k,m,lsize
157      integer, allocatable,dimension(:) :: lindex
158      if(my_id .eq. io_id) then
159            allocate(gLinkV_r4(gnlinksl))
160            gLinkV_r4 = 0.0
161            do i = 1, LLINKLEN
162                gLinkV_r4(LLINKIDINDX(i)) = LinkV(i)
163            end do
164      endif
166      if(my_id .ne. IO_id) then
168           tag = 101
169           call mpi_send(LLINKLEN,1,MPI_INTEGER, IO_id,     &
170                 tag,HYDRO_COMM_WORLD,ierr)
171           if(LLINKLEN .gt. 0) then
172               tag = 102
173               call mpi_send(LLINKIDINDX,LLINKLEN,MPI_INTEGER, IO_id,     &
174                     tag,HYDRO_COMM_WORLD,ierr)
175               tag = 103
176               call mpi_send(LinkV,LLINKLEN,MPI_REAL, IO_id,     &
177                    tag,HYDRO_COMM_WORLD,ierr)
178           endif
179       else
180           do i = 0, numprocs - 1
181             if(i .ne. IO_id) then
182                 tag = 101
183                 call mpi_recv(lsize,1,MPI_INTEGER, i,     &
184                             tag,HYDRO_COMM_WORLD,mpp_status,ierr)
185                 if(lsize .gt. 0) then
186                       allocate(lindex(lsize) )
187                       allocate(tmpBuf(lsize) )
188                       tag = 102
189                       call mpi_recv(lindex,lsize,MPI_INTEGER, i,     &
190                             tag,HYDRO_COMM_WORLD,mpp_status,ierr)
191                       tag = 103
192                       call mpi_recv(tmpBuf,lsize,&
193                             MPI_REAL,i,tag,HYDRO_COMM_WORLD,mpp_status,ierr)
194                       do k = 1, lsize
195                           gLinkV_r4(lindex(k)) = gLinkV_r4(lindex(k)) + tmpBuf(k)
196                       end do
197                       if(allocated(lindex)) deallocate(lindex)
198                       if(allocated(tmpBuf)) deallocate(tmpBuf)
199                endif
200             end if
201           end do
202       end if
204       call ReachLS_decompReal(gLinkV_r4,outV)
206       if(my_id .eq. io_id) then
207           if(allocated(gLinkV_r4)) deallocate(gLinkV_r4)
208       endif
209   end subroutine updateLinkV4_mem
212   subroutine updateLinkV8(LinkV, outV)
213      implicit none
214      real, dimension(:) :: outV
215      real*8, dimension(:) :: LinkV
216      real*8, dimension(gNlinksl) :: gLinkV,gLinkV_r
217      real, dimension(gNlinksl) :: gLinkV_r4
218      integer :: ierr, i, tag
219      gLinkV = 0.0
220      gLinkV_r = 0.0
221      do i = 1, LLINKLEN
222          gLinkV(LLINKIDINDX(i)) = LinkV(i)
223      end do
225      if(my_id .ne. IO_id) then
226           tag = 102
227           call mpi_send(gLinkV,gnlinksl,MPI_DOUBLE_PRECISION, IO_id,     &
228                 tag,HYDRO_COMM_WORLD,ierr)
229       else
230           gLinkV_r = gLinkV
231           do i = 0, numprocs - 1
232             if(i .ne. IO_id) then
233                tag = 102
234                call mpi_recv(gLinkV,gnlinksl,&
235                    MPI_DOUBLE_PRECISION,i,tag,HYDRO_COMM_WORLD,mpp_status,ierr)
236                gLinkV_r = gLinkV_r + gLinkV
237             end if
238           end do
239       end if
240       gLinkV_r4 = gLinkV_r
242       call ReachLS_decompReal(gLinkV_r4,outV)
243   end subroutine updateLinkV8
245   subroutine updateLinkV4(LinkV, outV)
246      implicit none
247      real, dimension(:) :: outV
248      real, dimension(:) :: LinkV
249      real, dimension(gNlinksl) :: gLinkV,gLinkV_r
250      real, dimension(gNlinksl) :: gLinkV_r4
251      integer :: ierr, i, tag
252      gLinkV = 0.0
253      gLinkV_r = 0.0
254      do i = 1, LLINKLEN
255          gLinkV(LLINKIDINDX(i)) = LinkV(i)
256      end do
258      if(my_id .ne. IO_id) then
259           tag = 102
260           call mpi_send(gLinkV,gnlinksl,MPI_REAL, IO_id,     &
261                 tag,HYDRO_COMM_WORLD,ierr)
262       else
263           gLinkV_r = gLinkV
264           do i = 0, numprocs - 1
265             if(i .ne. IO_id) then
266                tag = 102
267                call mpi_recv(gLinkV,gnlinksl,&
268                    MPI_REAL,i,tag,HYDRO_COMM_WORLD,mpp_status,ierr)
269                gLinkV_r = gLinkV_r + gLinkV
270             end if
271           end do
272       end if
273       gLinkV_r4 = gLinkV_r
274       call ReachLS_decompReal(gLinkV_r4,outV)
275   end subroutine updateLinkV4
277   subroutine gbcastReal(inV, outV)
278      implicit none
279      real, dimension(:) :: outV
280      real, dimension(:) :: inV
281      integer :: ierr
282      call ReachLS_write_io(inV,outV)
283      call mpi_bcast(outV(1:gnlinksl),gnlinksl,MPI_REAL,   &
284             IO_id,HYDRO_COMM_WORLD,ierr)
285   end subroutine gbcastReal
287   subroutine gbcastReal2_old(index,size1,inV, insize, outV)
288      implicit none
289      integer :: size1, insize
290      integer,dimension(:) :: index
291      real, dimension(:) :: outV
292      real, dimension(:) :: inV
293      real, dimension(max_nlinkSL) :: tmpV
294      integer :: ierr, k, i, m, j, bsize
295      outV = 0
296      do i = 0, numprocs -1
297             bsize = linkls_e(i+1) - linkls_s(i+1) + 1
298          if(linkls_e(i+1) .gt. 0) then
299             if(my_id .eq. i) tmpV(1:bsize) = inV(1:bsize)
300             call mpi_bcast(tmpV(1:bsize),bsize,MPI_REAL,   &
301                 i,HYDRO_COMM_WORLD,ierr)
302             do j = 1, size1
303                 do k = 1, bsize
304                    if(index(j) .eq. (linkls_s(i+1) + k -1) ) then
305                       outV(j) = tmpV(k)
306                       goto  100
307                    endif
308                 end do
309  100            continue
310             end do
312          endif
313      end do
314   end subroutine gbcastReal2_old
316   subroutine gbcastReal2(index,size1,inV, insize, outV)
317      implicit none
318      integer :: size1, insize
319      integer,dimension(:) :: index
320      real, dimension(:) :: outV
321      real, dimension(:) :: inV
322 !     real, dimension(max_nlinkSL) :: tmpV
323      real, dimension(gnlinksl) :: gbuf
324      integer :: ierr, k, i, m, j, bsize
325      outV = 0
326      call ReachLS_write_io(inV,gbuf)
327      call mpi_bcast(gbuf,gnlinksl,MPI_REAL,   &
328             IO_id,HYDRO_COMM_WORLD,ierr)
329      do j = 1, size1
330         outV(j) = gbuf(index(j))
331      end do
332   end subroutine gbcastReal2
337   subroutine gbcastInt(inV, outV)
338      implicit none
339      integer, dimension(:) :: outV
340      integer, dimension(:) :: inV
341      integer :: ierr
342      call ReachLS_write_io(inV,outV)
343      call mpi_bcast(outV(1:gnlinksl),gnlinksl,MPI_INTEGER,   &
344             IO_id,HYDRO_COMM_WORLD,ierr)
345   end subroutine gbcastInt
347   subroutine gbcastInt8(inV, outV)
348       implicit none
349       integer(kind=int64), dimension(:) :: outV
350       integer(kind=int64), dimension(:) :: inV
351       integer :: ierr
352       call ReachLS_write_io(inV,outV)
353       call mpi_bcast(outV(1:gnlinksl),gnlinksl,MPI_INTEGER8,   &
354               IO_id,HYDRO_COMM_WORLD,ierr)
355   end subroutine gbcastInt8
357   subroutine getLocalIndx(glinksl,LINKID, LLINKID)
358        implicit none
359        integer(kind=int64), dimension(:) :: LINKID, LLINKID
360        integer :: i,k, glinksl, ierr
361        integer(kind=int64) :: gLinkId(glinksl)
363        LLINKLEN = size(LLINKID,1)
364        allocate(LLINKIDINDX(LLINKLEN))
365        LLINKIDINDX = 0
366        gNlinksl = glinksl
368        call ReachLS_write_io(LINKID,gLinkId)
370        call mpi_bcast(gLinkId(1:glinksl),glinksl,MPI_INTEGER8,   &
371             IO_id,HYDRO_COMM_WORLD,ierr)
373        ! The following loops are replaced by a hashtable-based algorithm
374        !        do i = 1, LLINKLEN
375        !           do k = 1, glinksl
376        !              if(LLINKID(i) .eq. gLinkId(k)) then
377        !                 LLINKIDINDX(i) = k
378        !                 goto 1001
379        !              endif
380        !           end do
381        ! 1001      continue
382        !     end do
384        block
385          type(hash_t) :: hash_table
386          integer(kind=int64) :: val,it
387          logical :: found
389          call hash_table%set_all_idx(LLINKID,LLINKLEN)
390          do it=1, glinksl
391             call hash_table%get(gLinkId(it), val, found)
392             if(found .eqv. .true.) then
393                llinkidindx(val) = it
394             end if
395          end do
396          call hash_table%clear()
397        end block
399        call mpp_land_sync()
400   end subroutine getLocalIndx
402   subroutine ReachLS_ini(glinksl,nlinksl,linklsS, linklsE)
403      implicit none
404      integer, intent(in) :: glinksl
405      integer, intent(out) :: nlinksl, linklsS, linklsE
406      integer :: i, ii, ierr
408 ! get my_id and numprocs
409      call MPI_COMM_RANK( HYDRO_COMM_WORLD, my_id, ierr )
410      call MPI_COMM_SIZE( HYDRO_COMM_WORLD, numprocs, ierr )
413      nlinksl = glinksl / numprocs
414      allocate(linkls_s(numprocs))
415      allocate(linkls_e(numprocs))
416      allocate(aLinksl(numprocs))
417      allocate(ToInd(numprocs))
419      ToInd = -1
421      linkls_s(1) = 1
422      linkls_e(1) = nlinksl
423      aLinksl = nlinksl
425      do i = 2, mod(glinksl, numprocs)+1
426          aLinksl(i) = aLinksl(i) + 1
427      end do
428      do i = 2, numprocs
429         linkls_s(i) = linkls_e(i-1)+1
430         linkls_e(i) = linkls_s(i) + aLinksl(i)-1
431      end do
433      nlinksl = aLinksl(my_id+1)
435      linklsS = linkls_s(my_id+1)
436      linklsE = linkls_e(my_id+1)
437      tmpnlinksl = aLinksl(my_id+1)
438      l_nlinksl = nlinksl
440      max_nlinksl = l_nlinksl
441      call mpp_land_max_int1(max_nlinksl)
443      gNlinksl = glinksl
444   end subroutine ReachLS_ini
446   subroutine MapGrid2ReachIni(in2d)
447      implicit none
448      integer, intent(in),dimension(:,:) :: in2d
449      integer :: ix, jx, i,j,n,ntotal, ierr
450      integer, dimension(numprocs) :: tmpS
452      allocate(sDataRec(numprocs))
453      allocate(rDataRec(numprocs))
455      ntotal = 0
456      sDataRec = 0
457      rDataRec = 0
458      ix = size(in2d,1)
459      jx = size(in2d,2)
460      do j = 1, jx
461         do i = 1, ix
462            if(in2d(i,j) .gt. 0) then
463               do n = 1, numprocs
464                   if((in2d(i,j) .ge. linkls_s(n)) .and. (in2d(i,j) .le. linkls_e(n)) ) then
465                               sDataRec(n) = sDataRec(n) + 1
466                   endif
467               end do
468            endif
469         enddo
470      enddo
472      do n = 1, numprocs
473          if(my_id .eq. n-1) then
474              tmpS = sDataRec
475          endif
476          call mpi_bcast(tmpS,numprocs,MPI_INTEGER,   &
477             n-1,HYDRO_COMM_WORLD,ierr)
478          rDataRec(n) = tmpS(n)
479      enddo
481   end subroutine MapGrid2ReachIni
484   subroutine ReachLS_decompReal(inV,outV)
485       implicit none
486       real,INTENT(in),dimension(:) :: inV
487       real,INTENT(out),dimension(:) :: outV
488       integer ::  i, ierr, tag
489       tag = 11
490       if(my_id .eq. io_id) then
491          do i = 1, numprocs
492             if(i-1 .eq. io_id) then
493                 if(alinksl(i) .gt. 0) then
494                    outV(1:(linkls_e(i)-linkls_s(i)+1) ) = inV(linkls_s(i):linkls_e(i))
495                 endif
496             else
497                 if(aLinksl(i) .gt. 0) then
498                     call mpi_send(inV(linkls_s(i):linkls_e(i)), &
499                         aLinksl(i), &
500                         MPI_REAL, i-1 ,tag,HYDRO_COMM_WORLD,ierr)
501                 endif
502             endif
503          end do
504       else
505          if(aLinksl(my_id+1) .gt. 0) then
506              call mpi_recv(outV(1:(linkls_e(my_id+1)-linkls_s(my_id+1)+1) ), &  !! this one has +1!
507               aLinksl(my_id+1),                                        &
508               MPI_REAL, io_id,tag,HYDRO_COMM_WORLD,mpp_status,ierr)
509          endif
510       endif
511       call mpp_land_sync()
512   END subroutine ReachLS_decompReal
514   subroutine ReachLS_decompReal8(inV,outV)
515       implicit none
516       real*8,intent(in),dimension(:) :: inV
517       real*8,intent(out),dimension(:) :: outV
518       integer ::  i, ierr, tag
519       tag = 11
520       if(my_id .eq. io_id) then
521          do i = 1, numprocs
522             if(i-1 .eq. io_id) then
523                 if(alinksl(i) .gt. 0) then
524                    outV(1:(linkls_e(i)-linkls_s(i)+1) ) = inV(linkls_s(i):linkls_e(i))
525                 endif
526             else
527                 if(aLinksl(i) .gt. 0) then
528                     call mpi_send(inV(linkls_s(i):linkls_e(i)), &
529                         aLinksl(i), &
530                         MPI_REAL8, i-1 ,tag,HYDRO_COMM_WORLD,ierr)
531                 endif
532             endif
533          end do
534       else
535          if(aLinksl(my_id+1) .gt. 0) then
536              call mpi_recv(outV(1:(linkls_e(my_id+1)-linkls_s(my_id+1)+1) ), &  !! this one has +1!
537               aLinksl(my_id+1),                                        &
538               MPI_REAL8, io_id,tag,HYDRO_COMM_WORLD,mpp_status,ierr)
539          endif
540       endif
541       call mpp_land_sync()
542       end subroutine ReachLS_decompReal8
544   subroutine ReachLS_decompInt(inV,outV)
545       implicit none
546       integer,INTENT(in),dimension(:) :: inV
547       integer,INTENT(out),dimension(:) :: outV
548       integer ::  i, ierr, tag
549       tag = 11
550       if(my_id .eq. io_id) then
551          do i = 1, numprocs
552             if(i-1 .eq. io_id) then
553                 if(alinksl(i) .gt. 0) then
554                     outV(1:linkls_e(i)-linkls_s(i)+1) = inV(linkls_s(i):linkls_e(i))
555                 endif
556             else
557                if(aLinksl(i) .gt. 0) then
558                   call mpi_send(inV(linkls_s(i):linkls_e(i)), &
559                       aLinksl(i),                &
560                       MPI_INTEGER, i-1,tag,HYDRO_COMM_WORLD,ierr)
561                endif
562             endif
563          end do
564       else
565           if(aLinksl(my_id+1) .gt. 0) then
566                call mpi_recv(outV(1:linkls_e(my_id+1)-linkls_s(my_id+1)+1), &
567                     alinksl(my_id+1),                           &
568                     MPI_INTEGER, io_id,tag,HYDRO_COMM_WORLD,mpp_status,ierr)
569           endif
570      endif
572       call mpp_land_sync()
574   END subroutine ReachLS_decompInt
576   subroutine ReachLS_decompInt8(inV,outV)
577       implicit none
578       integer(kind=int64), INTENT(in),  dimension(:) :: inV
579       integer(kind=int64), INTENT(out), dimension(:) :: outV
580       integer ::  i, ierr, tag
581       tag = 11
582       if(my_id .eq. io_id) then
583           do i = 1, numprocs
584               if(i-1 .eq. io_id) then
585                   if(alinksl(i) .gt. 0) then
586                       outV(1:linkls_e(i)-linkls_s(i)+1) = inV(linkls_s(i):linkls_e(i))
587                   endif
588               else
589                   if(aLinksl(i) .gt. 0) then
590                       call mpi_send(inV(linkls_s(i):linkls_e(i)), &
591                               aLinksl(i),                &
592                               MPI_INTEGER8, i-1,tag,HYDRO_COMM_WORLD,ierr)
593                   endif
594               endif
595           end do
596       else
597           if(aLinksl(my_id+1) .gt. 0) then
598               call mpi_recv(outV(1:linkls_e(my_id+1)-linkls_s(my_id+1)+1), &
599                       alinksl(my_id+1),                           &
600                       MPI_INTEGER8, io_id,tag,HYDRO_COMM_WORLD,mpp_status,ierr)
601           endif
602       endif
604       call mpp_land_sync()
606   END subroutine ReachLS_decompInt8
609   subroutine ReachLS_decompChar(inV,outV)
610      implicit none
611      character(len=*),intent(in), dimension(:) :: inV
612      character(len=*),intent(out),dimension(:) :: outV
613      integer ::  i, ierr, tag
614      integer :: strLen
615      strLen = len(inV(1))
616      tag = 11
617      if(my_id .eq. io_id) then
618         do i = 1, numprocs
619            if(i-1 .eq. io_id) then
620               if(alinksl(i) .gt. 0) then
621                  outV(1:(linkls_e(i)-linkls_s(i)+1)) = inV(linkls_s(i):linkls_e(i))
622               endif
623            else
624               if(aLinksl(i) .gt. 0) then
625                  ! The mpi_send takes what you give it and THEN treats each caracter as an array element.
626                  call mpi_send(inV(linkls_s(i):linkls_e(i)),       &
627                       strLen*aLinksl(i),                           &
628                       MPI_CHARACTER, i-1, tag, HYDRO_COMM_WORLD, ierr)
629               endif
630            endif
631         end do
632      else
633         if(aLinksl(my_id+1) .gt. 0) then
634            ! The mpi_recv treats each caracter as an array element.
635            call mpi_recv(outV(1 : (linkls_e(my_id+1)-linkls_s(my_id+1)+1) ), &  !jlm should have +1
636                 strLen*alinksl(my_id+1),                                              &
637                 MPI_CHARACTER, io_id, tag, HYDRO_COMM_WORLD, mpp_status,ierr          )
638         endif
639      endif
640      call mpp_land_sync()
641   end subroutine ReachLS_decompChar
644   subroutine ReachLS_wReal(inV,outV)
645       implicit none
646       real,INTENT(in),dimension(:) :: inV
647       real,INTENT(out),dimension(:) :: outV
648       integer :: i, ierr, tag, ss  , mm
649       outV = 0
650       if(my_id .eq. io_id) then
651          do i = 1, numprocs
652             tag = 12
653             if(i-1 .eq. io_id) then
654                 if(alinksl(i) .gt. 0) then
655                    outV(linkls_s(i):linkls_e(i)) = inV(1:linkls_e(i)-linkls_s(i)+1)
656                 endif
657             else
658                 if(aLinksl(i) .gt. 0) then
660                     call mpi_recv(outV(linkls_s(i):linkls_e(i)), &
661                          aLinksl(i),                            &
662                          MPI_REAL,i-1,tag,HYDRO_COMM_WORLD,mpp_status,ierr)
663                 endif
664             endif
665          end do
666       else
667           if(aLinksl(my_id+1) .gt. 0) then
668                tag = 12
669                ss = size(inv,1)
670                call mpi_send(inV(1:aLinksl(my_id+1) ), &
671                       aLinksl(my_id+1),                      &
672                       MPI_REAL,io_id,tag,HYDRO_COMM_WORLD,ierr)
673           endif
674       endif
675       call mpp_land_sync()
676   END subroutine ReachLS_wReal
678   subroutine ReachLS_wReal8(inV,outV)
679       implicit none
680       real*8,intent(in),dimension(:)  :: inV
681       real*8,intent(out),dimension(:) :: outV
682       integer :: i, ierr, tag, ss  , mm
683       outV = 0
684       if(my_id .eq. io_id) then
685          do i = 1, numprocs
686             tag = 12
687             if(i-1 .eq. io_id) then
688                 if(alinksl(i) .gt. 0) then
689                    outV(linkls_s(i):linkls_e(i)) = inV(1:linkls_e(i)-linkls_s(i)+1)
690                 endif
691             else
692                 if(aLinksl(i) .gt. 0) then
694                     call mpi_recv(outV(linkls_s(i):linkls_e(i)), &
695                          aLinksl(i),                            &
696                          MPI_REAL8,i-1,tag,HYDRO_COMM_WORLD,mpp_status,ierr)
697                 endif
698             endif
699          end do
700       else
701           if(aLinksl(my_id+1) .gt. 0) then
702                tag = 12
703                ss = size(inv,1)
704                call mpi_send(inV(1:aLinksl(my_id+1) ), &
705                       aLinksl(my_id+1),                      &
706                       MPI_REAL8,io_id,tag,HYDRO_COMM_WORLD,ierr)
707           endif
708       endif
709       call mpp_land_sync()
710   END subroutine ReachLS_wReal8
713   subroutine ReachLS_wInt(inV,outV)
714       implicit none
715       integer,INTENT(in),dimension(:) :: inV
716       integer,INTENT(out),dimension(:) :: outV
717       integer :: i, ierr, tag
718       outV = 0
719       if(my_id .eq. io_id) then
720          do i = 1, numprocs
721             if(i-1 .eq. io_id) then
722                 if(alinksl(i) .gt. 0) then
723                    outV(linkls_s(i):linkls_e(i)) = inV(1:linkls_e(i)-linkls_s(i)+1)
724                 endif
725             else
726                if(aLinksl(i) .gt. 0) then
727                   tag = 12
728                   call mpi_recv(outV(linkls_s(i):linkls_e(i)), &
729                        aLinksl(i),                             &
730                        MPI_INTEGER,i-1,tag,HYDRO_COMM_WORLD,mpp_status,ierr)
731                endif
732             endif
733          end do
734       else
735            if(aLinksl(my_id+1) .gt. 0) then
736                 tag = 12
737                 call mpi_send(inV(1:aLinksl(my_id+1) ), &
738                       aLinksl(my_id+1),                      &
739                       MPI_INTEGER,io_id,tag,HYDRO_COMM_WORLD,ierr)
740            endif
741       endif
742       call mpp_land_sync()
743   END subroutine ReachLS_wInt
745   subroutine ReachLS_wInt8(inV,outV)
746       implicit none
747       integer(kind=int64), intent(in), dimension(:)  :: inV
748       integer(kind=int64), intent(out), dimension(:) :: outV
749       integer :: i, ierr, tag
750       outV = 0
751       if(my_id .eq. io_id) then
752           do i = 1, numprocs
753               if(i-1 .eq. io_id) then
754                   if(alinksl(i) .gt. 0) then
755                       outV(linkls_s(i):linkls_e(i)) = inV(1:linkls_e(i)-linkls_s(i)+1)
756                   endif
757               else
758                   if(aLinksl(i) .gt. 0) then
759                       tag = 12
760                       call mpi_recv(outV(linkls_s(i):linkls_e(i)), &
761                               aLinksl(i),                             &
762                               MPI_INTEGER8,i-1,tag,HYDRO_COMM_WORLD,mpp_status,ierr)
763                   endif
764               endif
765           end do
766       else
767           if(aLinksl(my_id+1) .gt. 0) then
768               tag = 12
769               call mpi_send(inV(1:aLinksl(my_id+1) ), &
770                       aLinksl(my_id+1),                      &
771                       MPI_INTEGER8,io_id,tag,HYDRO_COMM_WORLD,ierr)
772           endif
773       endif
774       call mpp_land_sync()
775   END subroutine ReachLS_wInt8
777   subroutine ReachLS_wInt2(inV,outV,len,glen)
778       implicit none
779       integer  :: len, glen
780       integer,INTENT(in),dimension(len) :: inV
781       integer,INTENT(out),dimension(glen) :: outV
782       integer :: i, ierr, tag
783       outV = 0
784       if(my_id .eq. io_id) then
785          do i = 1, numprocs
786             if(i-1 .eq. io_id) then
787                 if(alinksl(i) .gt. 0) then
788                    outV(linkls_s(i):linkls_e(i)) = inV(1:linkls_e(i)-linkls_s(i)+1)
789                 endif
790             else
791                if(aLinksl(i) .gt. 0) then
792                   tag = 12
793                   call mpi_recv(outV(linkls_s(i):linkls_e(i)), &
794                        aLinksl(i),                             &
795                        MPI_INTEGER,i-1,tag,HYDRO_COMM_WORLD,mpp_status,ierr)
796                endif
797             endif
798          end do
799       else
800            if(aLinksl(my_id+1) .gt. 0) then
801                 tag = 12
802                 call mpi_send(inV(1:aLinksl(my_id+1) ), &
803                       aLinksl(my_id+1),                      &
804                       MPI_INTEGER,io_id,tag,HYDRO_COMM_WORLD,ierr)
805            endif
806       endif
807       call mpp_land_sync()
808   END subroutine ReachLS_wInt2
810   subroutine ReachLS_wReal2(inV,outV,len,glen)
811       implicit none
812       integer :: len, glen
813       real,INTENT(in),dimension(len) :: inV
814       real,INTENT(out),dimension(glen) :: outV
815       integer :: i, ierr, tag
816       outV = 0
817       if(my_id .eq. io_id) then
818          do i = 1, numprocs
819             if(i-1 .eq. io_id) then
820                 if(alinksl(i) .gt. 0) then
821                    outV(linkls_s(i):linkls_e(i)) = inV(1:linkls_e(i)-linkls_s(i)+1)
822                 endif
823             else
824                 if(aLinksl(i) .gt. 0) then
825                     tag = 12
826                     call mpi_recv(outV(linkls_s(i):linkls_e(i)), &
827                          aLinksl(i),                            &
828                          MPI_REAL,i-1,tag,HYDRO_COMM_WORLD,mpp_status,ierr)
829                 endif
830             endif
831          end do
832       else
833           if(aLinksl(my_id+1) .gt. 0) then
834                tag = 12
835                call mpi_send(inV(1:aLinksl(my_id+1) ), &
836                       aLinksl(my_id+1),                      &
837                       MPI_REAL,io_id,tag,HYDRO_COMM_WORLD,ierr)
838           endif
839       endif
840       call mpp_land_sync()
841   END subroutine ReachLS_wReal2
843   subroutine ReachLS_wChar(inV,outV)
844      implicit none
845      character(len=*), intent(in), dimension(:)  :: inV
846      character(len=*) ,intent(out),dimension(:) :: outV
847      integer :: i, ierr, tag
848      integer :: strLen
849      strLen = len(inV(1))
850      if(my_id .eq. io_id) then
851         do i = 1, numprocs
852            if(i-1 .eq. io_id) then
853               if(alinksl(i) .gt. 0) then
854                  outV(linkls_s(i):linkls_e(i)) = inV(1:linkls_e(i)-linkls_s(i)+1)
855               endif
856            else
857               if(aLinksl(i) .gt. 0) then
858                  tag = 12
859                  ! ? seems asymmetric with ReachLS_decompChar
860                  call mpi_recv(outV( linkls_s(i) : linkls_e(i) ), &
861 !                 call mpi_recv(outV( ((linkls_s(i)-1)+1) : (linkls_e(i)) ), &
862                       aLinksl(i),                                                  &
863                       MPI_CHARACTER, i-1, tag, HYDRO_COMM_WORLD, mpp_status, ierr           )
864               endif
865            endif
866         end do
867      else
868         if(aLinksl(my_id+1) .gt. 0) then
869            tag = 12
870            ! The mpi_send takes what you give it and THEN treats each caracter as an array element.
871            call mpi_send(inV(1:aLinksl(my_id+1)),              &
872                 aLinksl(my_id+1),                       &
873                 MPI_CHARACTER, io_id, tag, HYDRO_COMM_WORLD, ierr)
874         endif
875      endif
876      call mpp_land_sync()
877   end subroutine ReachLS_wChar
880   subroutine getFromInd(linkid,to,ind,indLen)
881       integer,dimension(:) :: linkid, to
882       integer, allocatable, dimension(:) ::ind
883       integer :: k, m, kk, mm,indLen
884       integer, dimension(gnlinksl) :: glinkid
885       call ReachLS_write_io(linkid,glinkid)
886       mm = size(to,1)
887       kk = 0
888       do k = 1, gnlinksl
889           do m = 1, mm
890              if(glinkid(k) .eq. to(m) ) then
891                  kk = kk +1
892                  goto 2001
893              endif
894           end do
895 2001      continue
896       end do
897       allocate(ind(kk))
898       kk = 0
899       do k = 1, gnlinksl
900           do m = 1, mm
901              if(glinkid(k) .eq. to(m) ) then
902                  kk = kk +1
903                  ind(kk) = glinkid(k)
904                  goto 2002
905              endif
906           end do
907 2002      continue
908       end do
909       indLen = kk
911   end subroutine getFromInd
913   subroutine getToInd(from,to,ind,indLen,gToNodeOut)
914       implicit none
916     integer(kind=int64),dimension(:) :: from, to
917     integer, allocatable, dimension(:) ::ind
918     integer(kind=int64), allocatable, dimension(:,:) :: gToNodeOut
919     integer :: k, m, kk, mm,indLen, i, ierr
920     integer(kind=int64), dimension(gnlinksl) :: gto
921     integer :: maxNum, num
923     call gBcastValue(to, gto)
925     !      mm = size(from,1)
926     mm = l_nlinksl
928     kk = 0
929     maxNum = 0
931     ! The following loops are replaced by a hashtable-based algorithm
932     ! do m = 1, mm
933     !    num = 0
934     !    do k = 1, gnlinksl
935     !       if(gto(k) .eq. from(m) ) then
936     !          kk = kk +1
937     !          num = num + 1
938     !       endif
939     !    end do
940     !    if(num .gt. maxNum) maxNum = num
941     ! end do
943     block
944       type(hash_t) :: hash_table
945       integer(kind=int64) :: val,it
946       integer(kind=int64), allocatable :: num_a(:)
947       logical :: found
949       allocate(num_a(mm))
950       num_a = 0
951       kk = 0
953       call hash_table%set_all_idx(from, mm)
954       do it=1, gnlinksl
955          call hash_table%get(gto(it), val, found)
956          if(found .eqv. .true.) then
957             kk = kk + 1
958             num_a(val) = num_a(val) + 1
959          end if
960       end do
961       maxNum = maxval(num_a)
964       allocate(ind(kk))
965       allocate(gToNodeOut(mm,maxNum+1))
966       gToNodeOut = -99
968       indLen = kk
970       kk = 0
971       num_a = 1
973       ! The following loops are replaced by a hashtable-based algorithm
974       ! do m = 1, mm
975       !    num = 1
976       !    do k = 1, gnlinksl
977       !        if(gto(k) .eq. from(m) ) then
978       !            kk = kk +1
979       !            !yw ind(kk) = gto(k)
980       !            ind(kk) = k
981       !            !! gToNodeOut(m,num+1) = gto(k)
982       !            gToNodeOut(m,num+1) = kk
983       !            gToNodeOut(m,1) = num
984       !            num = num + 1
985       !        endif
986       !     end do
987       ! end do
989       do it=1, gnlinksl
990          call hash_table%get(gto(it), val, found)
991          if(found .eqv. .true.) then
992             kk = kk + 1
993             ind(kk) = it
994             gToNodeOut(val,num_a(val)+1) = kk
995             gToNodeOut(val,1) = num_a(val)
996             num_a(val) = num_a(val) + 1
997          end if
998       end do
1000       deallocate(num_a)
1001       call hash_table%clear()
1003     end block
1005     ToInd(my_id+1) = kk
1006     do i = 0, numprocs - 1
1007        call mpi_bcast(ToInd(i+1),1,MPI_INTEGER8,   &
1008             i,HYDRO_COMM_WORLD,ierr)
1009     end do
1011   end subroutine getToInd
1013   subroutine com_decomp1dInt(inV,gsize,outV,lsize)
1014 !     output outV and lsize
1015       implicit none
1016       integer,INTENT(in),dimension(:) :: inV
1017       integer,allocatable,dimension(:) :: outV
1018       integer ::  i, ierr, tag, imod, ncomsize
1019       integer :: lsize, ssize,start, gsize, end
1020       tag = 19
1021       ncomsize = gsize/numprocs
1022       imod = mod(gsize,numprocs)
1025       if(my_id .eq. io_id) then
1026          start = 0
1027          end = 0
1028          do i = 1, numprocs
1029             if(i-1 .lt. imod) then
1030                   ssize = ncomsize + 1
1031             else
1032                   ssize = ncomsize
1033             endif
1035             start = end + 1
1036             end = start + ssize - 1
1038             if(i-1 .eq. io_id) then
1039                 if(ssize .gt. 0) then
1040                     allocate(outV(ssize) )
1041                     outV(1:ssize) = inV(1:ssize)
1042                     lsize = ssize
1043                 else
1044                     lsize = 0
1045                 endif
1046             else
1047                 if(ssize .gt. 0 ) then
1048                    call mpi_send(inV(start:start+ssize-1), ssize,       &
1049                       MPI_INTEGER, i-1,tag,HYDRO_COMM_WORLD,ierr)
1050                 endif
1051             endif
1052          end do
1053       else
1054               if(my_id .lt. imod) then
1055                    lsize = ncomsize + 1
1056               else
1057                    lsize = ncomsize
1058               endif
1059               if( lsize .gt. 0) then
1060                   allocate(outV(lsize) )
1061                   call mpi_recv(outV,lsize,                           &
1062                         MPI_INTEGER, io_id,tag,HYDRO_COMM_WORLD,mpp_status,ierr)
1063               endif
1064       endif
1065       call mpp_land_sync()
1068   END subroutine com_decomp1dInt
1070   subroutine com_decomp1dInt8(inV,gsize,outV,lsize)
1071       !     output outV and lsize
1072       implicit none
1073       integer(kind=int64),intent(in),dimension(:) :: inV
1074       integer(kind=int64),allocatable,dimension(:) :: outV
1075       integer ::  i, ierr, tag, imod, ncomsize
1076       integer :: lsize, ssize,start, gsize, end
1077       tag = 19
1078       ncomsize = gsize/numprocs
1079       imod = mod(gsize,numprocs)
1082       if(my_id .eq. io_id) then
1083           start = 0
1084           end = 0
1085           do i = 1, numprocs
1086               if(i-1 .lt. imod) then
1087                   ssize = ncomsize + 1
1088               else
1089                   ssize = ncomsize
1090               endif
1092               start = end + 1
1093               end = start + ssize - 1
1095               if(i-1 .eq. io_id) then
1096                   if(ssize .gt. 0) then
1097                       allocate(outV(ssize) )
1098                       outV(1:ssize) = inV(1:ssize)
1099                       lsize = ssize
1100                   else
1101                       lsize = 0
1102                   endif
1103               else
1104                   if(ssize .gt. 0 ) then
1105                       call mpi_send(inV(start:start+ssize-1), ssize,       &
1106                               MPI_INTEGER8, i-1,tag,HYDRO_COMM_WORLD,ierr)
1107                   endif
1108               endif
1109           end do
1110       else
1111           if(my_id .lt. imod) then
1112               lsize = ncomsize + 1
1113           else
1114               lsize = ncomsize
1115           endif
1116           if( lsize .gt. 0) then
1117               allocate(outV(lsize) )
1118               call mpi_recv(outV,lsize,                           &
1119                       MPI_INTEGER8, io_id,tag,HYDRO_COMM_WORLD,mpp_status,ierr)
1120           endif
1121       endif
1122       call mpp_land_sync()
1125   END subroutine com_decomp1dInt8
1127   subroutine com_write1dInt(inV,lsize,outV,gsize)
1128 !     output outV and lsize
1129       implicit none
1130       integer,INTENT(in),dimension(:) :: inV
1131       integer,dimension(:) :: outV
1132       integer ::  i, ierr, tag, imod, ncomsize
1133       integer :: lsize, rsize,start, gsize, end
1134       tag = 18
1135       ncomsize = gsize/numprocs
1136       imod = mod(gsize,numprocs)
1138       if(my_id .eq. io_id) then
1139             start = 0
1140             end = 0
1141          do i = 1, numprocs
1142             if(i-1 .lt. imod) then
1143                   rsize = ncomsize + 1
1144             else
1145                   rsize = ncomsize
1146             endif
1148             start = end + 1
1149             end = start + rsize - 1
1151             if(i-1 .eq. io_id) then
1152                 if(rsize .gt. 0) then
1153                     outV(1:rsize) = inV(1:rsize)
1154                 endif
1155             else
1156                 if(rsize .gt. 0 ) then
1157                   call mpi_recv(outV(start:start+rsize-1), rsize,          &
1158                         MPI_INTEGER, i-1,tag,HYDRO_COMM_WORLD,mpp_status,ierr)
1159                 endif
1160             endif
1161          end do
1162       else
1163               if(my_id .lt. imod) then
1164                    lsize = ncomsize + 1
1165               else
1166                    lsize = ncomsize
1167               endif
1168               if( lsize .gt. 0) then
1169                    call mpi_send(inV, lsize,       &
1170                       MPI_INTEGER, io_id,tag,HYDRO_COMM_WORLD,ierr)
1171               endif
1172       endif
1174       call mpp_land_sync()
1176   END subroutine com_write1dInt
1178   subroutine com_write1dInt8(inV,lsize,outV,gsize)
1179       !     output outV and lsize
1180       implicit none
1181       integer(kind=int64), intent(in),dimension(:) :: inV
1182       integer(kind=int64), dimension(:) :: outV
1183       integer ::  i, ierr, tag, imod, ncomsize
1184       integer :: lsize, rsize,start, gsize, end
1185       tag = 18
1186       ncomsize = gsize/numprocs
1187       imod = mod(gsize,numprocs)
1189       if(my_id .eq. io_id) then
1190           start = 0
1191           end = 0
1192           do i = 1, numprocs
1193               if(i-1 .lt. imod) then
1194                   rsize = ncomsize + 1
1195               else
1196                   rsize = ncomsize
1197               endif
1199               start = end + 1
1200               end = start + rsize - 1
1202               if(i-1 .eq. io_id) then
1203                   if(rsize .gt. 0) then
1204                       outV(1:rsize) = inV(1:rsize)
1205                   endif
1206               else
1207                   if(rsize .gt. 0 ) then
1208                       call mpi_recv(outV(start:start+rsize-1), rsize,          &
1209                               MPI_INTEGER8, i-1,tag,HYDRO_COMM_WORLD,mpp_status,ierr)
1210                   endif
1211               endif
1212           end do
1213       else
1214           if(my_id .lt. imod) then
1215               lsize = ncomsize + 1
1216           else
1217               lsize = ncomsize
1218           endif
1219           if( lsize .gt. 0) then
1220               call mpi_send(inV, lsize,       &
1221                       MPI_INTEGER8, io_id,tag,HYDRO_COMM_WORLD,ierr)
1222           endif
1223       endif
1225       call mpp_land_sync()
1227   END subroutine com_write1dInt8
1229   subroutine pack_decomp_int(g1bufid, ndata, nprocs_map, lnsizes, istart,bufid)
1230      implicit none
1231      integer :: ndata
1232      integer, dimension(:) :: g1bufid,  nprocs_map, lnsizes, bufid
1233      integer :: i,j,k, tag, ierr
1234      integer, allocatable,dimension(:) :: buf
1235      integer, dimension(:) :: istart
1236      integer, dimension(numprocs) :: count
1237      ! pack data
1240      if(my_id .eq. io_id) then
1241          allocate(buf(ndata))
1242          count = 0
1243          do i = 1, ndata
1244             k = nprocs_map(i)
1245             if( k .gt. 0) then
1246                buf(istart(k) + count(k)) = g1bufid(i)
1247                count(k) = count(k) + 1
1248             end if
1249          end do
1250 !         write(6,*) " count = ", count
1251 !         write(6,*) " istart = ", istart
1252 !         write(6,*) " lnsizes = ", lnsizes
1253       end if
1254       !finish packing
1256       call mpp_land_sync()
1257 !      call hydro_finish()
1259       if(my_id .ne. IO_id) then
1260           tag = 72
1261           if(lnsizes(my_id + 1) .gt. 0) then
1262              call mpi_recv(bufid,lnsizes(my_id + 1),&
1263                    MPI_INTEGER,io_id,tag,HYDRO_COMM_WORLD,mpp_status,ierr)
1264           endif
1265       else
1266           do i = 0, numprocs - 1
1267             if(i .ne. my_id) then
1268                tag = 72
1269                if(lnsizes(i+1) .gt. 0) then
1270                   call mpi_send(buf(istart(i+1):istart(i+1)+lnsizes(i+1)-1),  &
1271                       lnsizes(i+1),MPI_INTEGER,i, tag,HYDRO_COMM_WORLD,ierr)
1272                endif
1273             else
1274                 if(lnsizes(i+1) .gt. 0) then
1275                    bufid = buf(istart(i+1):istart(i+1)+lnsizes(i+1)-1)
1276                 endif
1277             end if
1278           end do
1279        end if
1280        if(my_id .eq. io_id) then
1281           if(allocated(buf)) deallocate(buf)
1282        endif
1283   end subroutine pack_decomp_int
1285   subroutine pack_decomp_int8(g1bufid, ndata, nprocs_map, lnsizes, istart,bufid)
1286       implicit none
1287       integer :: ndata
1288       integer, dimension(:) :: nprocs_map, lnsizes
1289       integer :: i,j,k, tag, ierr
1290       integer(kind=int64), allocatable,dimension(:) :: buf, bufid, g1bufid
1291       integer, dimension(:) :: istart
1292       integer, dimension(numprocs) :: count
1293       ! pack data
1296       if(my_id .eq. io_id) then
1297           allocate(buf(ndata))
1298           count = 0
1299           do i = 1, ndata
1300               k = nprocs_map(i)
1301               if( k .gt. 0) then
1302                   buf(istart(k) + count(k)) = g1bufid(i)
1303                   count(k) = count(k) + 1
1304               end if
1305           end do
1306           !         write(6,*) " count = ", count
1307           !         write(6,*) " istart = ", istart
1308           !         write(6,*) " lnsizes = ", lnsizes
1309       end if
1310       !finish packing
1312       call mpp_land_sync()
1313       !      call hydro_finish()
1315       if(my_id .ne. IO_id) then
1316           tag = 72
1317           if(lnsizes(my_id + 1) .gt. 0) then
1318               call mpi_recv(bufid,lnsizes(my_id + 1),&
1319                       MPI_INTEGER8,io_id,tag,HYDRO_COMM_WORLD,mpp_status,ierr)
1320           endif
1321       else
1322           do i = 0, numprocs - 1
1323               if(i .ne. my_id) then
1324                   tag = 72
1325                   if(lnsizes(i+1) .gt. 0) then
1326                       call mpi_send(buf(istart(i+1):istart(i+1)+lnsizes(i+1)-1),  &
1327                               lnsizes(i+1),MPI_INTEGER8,i, tag,HYDRO_COMM_WORLD,ierr)
1328                   endif
1329               else
1330                   if(lnsizes(i+1) .gt. 0) then
1331                       bufid = buf(istart(i+1):istart(i+1)+lnsizes(i+1)-1)
1332                   endif
1333               end if
1334           end do
1335       end if
1336       if(my_id .eq. io_id) then
1337           if(allocated(buf)) deallocate(buf)
1338       endif
1339   end subroutine pack_decomp_int8
1341   subroutine pack_decomp_real8(g1bufid, ndata, nprocs_map, lnsizes, istart,bufid)
1342      implicit none
1343      integer :: ndata
1344      real*8, dimension(:) :: g1bufid, bufid
1345      integer,dimension(:) ::  nprocs_map, lnsizes
1346      integer :: i,j,k, tag, ierr
1347      real*8, allocatable,dimension(:) :: buf
1348      integer, dimension(:) :: istart
1349      integer, dimension(numprocs) :: count
1350      ! pack data
1351      if(my_id .eq. io_id) then
1352          allocate(buf(ndata))
1353          count = 0
1354          do i = 1, ndata
1355             k = nprocs_map(i)
1356             if( k .gt. 0) then
1357               buf(istart(k) + count(k)) = g1bufid(i)
1358               count(k) = count(k) + 1
1359             endif
1360          end do
1361       end if
1362        call mpp_land_sync()
1363       if(my_id .ne. IO_id) then
1364           tag = 72
1365           if(lnsizes(my_id + 1) .gt. 0) then
1366              call mpi_recv(bufid,lnsizes(my_id + 1),&
1367                    MPI_DOUBLE_PRECISION,io_id,tag,HYDRO_COMM_WORLD,mpp_status,ierr)
1368           endif
1369       else
1370           do i = 0, numprocs - 1
1371             if(i .ne. my_id) then
1372                tag = 72
1373                if(lnsizes(i+1) .gt. 0) then
1374                   call mpi_send(buf(istart(i+1):istart(i+1)+lnsizes(i+1)-1),  &
1375                       lnsizes(i+1),MPI_DOUBLE_PRECISION,i, tag,HYDRO_COMM_WORLD,ierr)
1376                endif
1377             else
1378                 if(lnsizes(my_id + 1) .gt. 0) then
1379                    bufid = buf(istart(i + 1):istart(i+1)+lnsizes(i+1)-1)
1380                 endif
1381             end if
1382           end do
1383        end if
1384      if(my_id .eq. io_id) then
1385          if(allocated(buf))  deallocate(buf)
1386      endif
1387   end subroutine pack_decomp_real8
1389 ! this is used for nhdPlus with Lake.
1390 ! resolve the data from TO_NODE grids, and update back to NLINKSL grids.
1391   subroutine TONODE2RSL (ind,inVar,size,gNLINKSL,NLINKSL,ioVar,flag)
1392     implicit none
1393     integer,intent(in) :: size,gNLINKSL,NLINKSL  !
1394     integer,intent(in) , dimension(size) :: ind, inVar
1395     integer,intent(inout), dimension(nlinksl) :: ioVar
1396     integer, allocatable, dimension(:) :: gvar, buf, tmpInd
1397     integer :: i,j,k, tag, ierr, tmpSize, flag
1399     if(gNLINKSL .le. 0) return
1401     if(my_id .eq. io_id) then
1402        allocate(gvar(gNLINKSL))
1403     else
1404        allocate(gvar(1))
1405     endif
1406     call ReachLS_wInt(ioVar,gvar)
1408       if(my_id .eq. io_id) then
1409          do i = 1, numprocs
1410             if(i-1 .eq. io_id) then
1411                 do k = 1, size
1412                    if(inVar(k) .ne. flag) then
1413                       gvar(ind(k)) = inVar(k)
1414                    endif
1415                 end do
1416             else
1417                   tag = 82
1418                   call mpi_recv(tmpSize,1,MPI_INTEGER,i-1,tag,HYDRO_COMM_WORLD,mpp_status,ierr)
1419                   if(tmpSize .gt. 0) then
1420                       allocate(buf(tmpSize))
1421                       allocate(tmpInd(tmpSize))
1422                       tag = 83
1423                       call mpi_recv(tmpInd, tmpSize , &
1424                            MPI_INTEGER,i-1,tag,HYDRO_COMM_WORLD,mpp_status,ierr)
1425                       tag = 84
1426                       call mpi_recv(buf, tmpSize , &
1427                            MPI_INTEGER,i-1,tag,HYDRO_COMM_WORLD,mpp_status,ierr)
1428                       do k = 1, tmpSize
1429                          if(buf(k) .ne. flag) then
1430                              gvar(tmpInd(k)) = buf(k)
1431                          endif
1432                       end do
1433                       if(allocated(buf))  deallocate(buf)
1434                       if(allocated(tmpInd)) deallocate(tmpInd)
1435                   endif
1436             endif
1437          end do
1438       else
1439           tag = 82
1440           call mpi_send(size,1,MPI_INTEGER,io_id,tag,HYDRO_COMM_WORLD,ierr)
1441           if(size .gt. 0) then
1442              tag = 83
1443              call mpi_send(ind(1:size),size, &
1444                  MPI_INTEGER,io_id,tag,HYDRO_COMM_WORLD,ierr)
1445              tag = 84
1446              call mpi_send(inVar(1:size),size, &
1447                  MPI_INTEGER,io_id,tag,HYDRO_COMM_WORLD,ierr)
1448           endif
1449       endif
1450       call ReachLS_decomp(gvar, ioVar)
1451       if(allocated(gvar)) deallocate(gvar)
1452   end subroutine TONODE2RSL
1454   subroutine TONODE2RSL8 (ind,inVar,size,gNLINKSL,NLINKSL,ioVar,flag)
1455       implicit none
1456       integer, intent(in) :: size,gNLINKSL,NLINKSL  !
1457       integer, intent(in) , dimension(size) :: ind
1458       integer(kind=int64), intent(in) , dimension(size) ::inVar
1459       integer(kind=int64), intent(inout), dimension(nlinksl) :: ioVar
1460       integer, allocatable, dimension(:) :: tmpInd
1461       integer(kind=int64), allocatable, dimension(:) :: gvar, buf
1462       integer :: i,j,k, tag, ierr, tmpSize, flag
1464       if(gNLINKSL .le. 0) return
1466       if(my_id .eq. io_id) then
1467           allocate(gvar(gNLINKSL))
1468       else
1469           allocate(gvar(1))
1470       endif
1471       call ReachLS_wInt8(ioVar,gvar)
1473       if(my_id .eq. io_id) then
1474           do i = 1, numprocs
1475               if(i-1 .eq. io_id) then
1476                   do k = 1, size
1477                       if(inVar(k) .ne. flag) then
1478                           gvar(ind(k)) = inVar(k)
1479                       endif
1480                   end do
1481               else
1482                   tag = 82
1483                   call mpi_recv(tmpSize,1,MPI_INTEGER,i-1,tag,HYDRO_COMM_WORLD,mpp_status,ierr)
1484                   if(tmpSize .gt. 0) then
1485                       allocate(buf(tmpSize))
1486                       allocate(tmpInd(tmpSize))
1487                       tag = 83
1488                       call mpi_recv(tmpInd, tmpSize , &
1489                               MPI_INTEGER,i-1,tag,HYDRO_COMM_WORLD,mpp_status,ierr)
1490                       tag = 84
1491                       call mpi_recv(buf, tmpSize , &
1492                               MPI_INTEGER8,i-1,tag,HYDRO_COMM_WORLD,mpp_status,ierr)
1493                       do k = 1, tmpSize
1494                           if(buf(k) .ne. flag) then
1495                               gvar(tmpInd(k)) = buf(k)
1496                           endif
1497                       end do
1498                       if(allocated(buf))  deallocate(buf)
1499                       if(allocated(tmpInd)) deallocate(tmpInd)
1500                   endif
1501               endif
1502           end do
1503       else
1504           tag = 82
1505           call mpi_send(size,1,MPI_INTEGER,io_id,tag,HYDRO_COMM_WORLD,ierr)
1506           if(size .gt. 0) then
1507               tag = 83
1508               call mpi_send(ind(1:size),size, &
1509                       MPI_INTEGER,io_id,tag,HYDRO_COMM_WORLD,ierr)
1510               tag = 84
1511               call mpi_send(inVar(1:size),size, &
1512                       MPI_INTEGER8,io_id,tag,HYDRO_COMM_WORLD,ierr)
1513           endif
1514       endif
1515       call ReachLS_decomp(gvar, ioVar)
1516       if(allocated(gvar)) deallocate(gvar)
1517   end subroutine TONODE2RSL8
1519 END MODULE MODULE_mpp_ReachLS