SUBROUTINE P8D(MYPE,NUMPES,N,K,T_PARAM,T,A,D,B,TIMES,AA,Z, $ ALLTIMES) c cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc c CVS Info c $Date: 2005/01/10 21:55:51 $ c $Revision: 1.2 $ c $RCSfile: p8dp.f,v $ c $Name: rel_5 $ c c Benchmark #8 -- Dynamic Program c Small dense matrices c Parallel Version c c PARAMETERS c c Provided by the calling routine: c MYPE = PE ID c NUMPES= Number of processors c A = K long array of N by N matrices c D = T long array of integers between 1 and K used to c select the appropriate A matrix c N = The size of the A matrices c K = The number of A matrices c T = The part of D divisible by 3*NUMPES minus the residual steps c T_PARAM= The length of D (includes residual steps) c TIMES = Related to the formation of A c AA = With TIMES related to the formation of A c c RETURNED by this routine: c B = T+1 long array containing the best path c Z = The probability of this best path c cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc c c Basic Algorithm c c The basic algorithm is given in the problem. c c There are a number of ideas to improve this algorithm. First, the c floating point numbers involved tend to go out of bounds. This is c solved by noting that the i which maximizes c Y[t](j) = max Y[t-1](i)*A[k](i,j) c also maximizes c Log (Y[t](j)) = max Log (Y[t-1](i)) + Log (A[k](i,j)) c (The matrices are non-negative.) Thus, all work will be done in the c log domain. Zeros in the matrices, if any, will be replaced with c very small values. c c The second is the idea that we can pre-compute all possible products c of pairs or even trios of matrices, and then double or even triple c step through the Y vectors. Even if we do not wish to compute c all possible trios of matrices, because of the nature of the matrices, c they are the result of multiplying a single matrix by the K different c vectors in array TIMES, we can still precompute A[D(t)]*A[D(t+1)]*A c and triple step with work proportional to N*N + N. c c The third idea is to note that the Y vectors are not output. Thus, c we need to keep only the last and current copies of Y at any time. c If memory becomes a problem, we can also pack the P array, but this c will not be done here. c c The fourth idea (and the difference between this version and the c version in p8d.f, results in increased work and memory requirements, c but also increased parallelism. Note that we can compute c A[D(t)]*A[D(t+1)]*A[D(t+2]*...*A[D(t+j)] c and then from Y[t-1] obtain Y[t+j] in one step. However, the c work required to do this is N times greater (the work to compute c A[D(t)]*A[D(t+1)] is order N^3, where the work to compute c Y[t-1]*A[D(t)] is order N^2.) However, we can divide the range c T into numproc sub-regions, and compute in parallel the product c of the A's for each sub-region. In a final, relatively cheap c but sequential, section of work, Y[T] can be computed in order c N^2 * numproc operations. The computation of B(t) can be computed c in parallel also. c cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc C--MPI defines include 'mpif.h' C--CVS variable declaration TYPE CVS sequence character( 160 ) string integer stringend END TYPE CVS C--CVS initilaize variables TYPE( CVS ),save :: CVS_INFO = $ CVS("BMARKGRP $Date: 2005/01/10 21:55:51 $ $Revision: 1.2 $" // $ "$RCSfile: p8dp.f,v $ $Name: rel_5 $", 0) C--The maximum size of the matrices and vectors PARAMETER (MN=30) C--The maximum number of matrices PARAMETER (MK=50) C--The maximum length of the D stream PARAMETER (MT = 100 000) C--The number of processors being used INTEGER NUMPES,MYPE C--Name for the first processor INTEGER IONODE common /MPISTUFF/IONODE C--The size of the matrices and vectors INTEGER N C--The number of matrices INTEGER K C--The K NxN matrices REAL A(N,N,K) C--T_PARAM & T = number of total steps; T3 = num steps per sub region INTEGER T_PARAM,T, T3 C--the vector specifying which of the K matrices to use in each step INTEGER D(T_PARAM) C--The vectors formed after each step REAL Y(MN,0:1) C--The "best" path through the steps; The local "best" path INTEGER B(0:T_PARAM) INTEGER LOCB(0:T_PARAM) C--The vectors of which element of the matrix formed the maximum INTEGER P(MN,MT) C--The lookup table of precomputed pseudo products of all permutations of C--the K matrices taken in groups of two or three REAL TAB(MN,MN,MK,MK) C--A table of the best path through each precomputed matrix INTEGER TABP(MN,MN,MK,MK,2) C--A table of positions in each sub-region currently being computed, C--and the value in D around this position INTEGER,POINTER:: IPOS(:), K1(:), K2(:), K3(:), JPOS(:) C--The partial tables of products of A over each sub-region, for C--parallelism. (Plays the role of Y in the MATRIX MATRIX products) REAL,POINTER:: ALLREG(:,:,:) C--Local REG on each of the nodes REAL,POINTER:: LOCREG(:,:,:) C--Local PATHS on each node INTEGER,POINTER::LOCPATHS(:,:,:) C--Error INTEGER IERR,IBUF24,ONE C--Array sizes for MPI calls INTEGER REGSIZE,PATHSSIZE,YSIZE,PSIZE,BSIZE C--Base matrix to form the K actual matrices REAL TIMES(N,K) C--Second base matrix to form the K actual matrices REAL AA(N,N) C--The user-supplied minimum allowed floating point number PARAMETER (FPMIN=1.17549435e-38) C--Used to time the various subportions of this program REAL CPUTIME,WALL,X0,Y0 REAL ALLTIMES(14) allocate(ALLREG(N,N,NUMPES),STAT=IERR) allocate(LOCPATHS(N,N,(T/(3*NUMPES))),STAT=IERR) allocate(LOCREG(N,N,0:1),STAT=IERR) allocate(IPOS(NUMPES),STAT=IERR) allocate(JPOS(NUMPES),STAT=IERR) allocate(K1(NUMPES),STAT=IERR) allocate(K2(NUMPES),STAT=IERR) allocate(K3(NUMPES),STAT=IERR) C--Check the input parameters to see that there is sufficient space IF(MN .LT. N) THEN PRINT 5,MN,N 5 FORMAT('Insufficient space in working arrays for P8D.',/, $ 'Size available from parameter MN = ',I5,/, $ 'Needed = ',I6) STOP ENDIF IF(MK .LT. K) THEN PRINT 10,MK,K 10 FORMAT('Insufficient space in working arrays for P8D.',/, $ 'Size available from parameter MK = ',I5,/, $ 'Needed = ',I6) STOP ENDIF IF(MT .LT. T) THEN PRINT 15,MT,T 15 FORMAT('Insufficient space in working arrays for P8D.',/, $ 'Size available from parameter MT = ',I10,/, $ 'Needed = ',I10) STOP ENDIF C@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ C NOTE: The code for timings and region identification are C provided for internal instrumentation only. To use, just C uncomment the code. Do not include these times in the final C report. C@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ C--Convert matrices to logs and generate tables for three-stepping. X0 = CPUTIME() Y0 = WALL() CALL G8D(A,AA,TAB,TABP,TIMES,N,K,T,MN,MK,FPMIN) ALLTIMES(5) = ALLTIMES(5) + (CPUTIME() - X0) ALLTIMES(6) = ALLTIMES(6) + (WALL() -Y0) C--T3 is the length of each sub-region T3 = T/(3*NUMPES) IF((T .LE. (3*K*K*N)) .OR. (T3 .LT. 20)) THEN PRINT 23, T, 3*K*K*N, 60*NUMPES 23 FORMAT('T = ',I7,' is probably too small for this approach.',/, $ 'It should be at least the larger of ',I7,' and ',I7) STOP ENDIF C--Start timing the Parallel Matrix Matrix operations X0 = CPUTIME() Y0 = WALL() C--compute correct index into arrays for my processor MYPE1=MYPE+1 C--Number of sequential steps left over. I1 = T_PARAM - T C--Generate a vector of start points for each sub-region IPOS(1) = I1+1 JPOS(1) = IPOS(1) Do ip = 2, NUMPES IPOS(ip) = T + I1 + 1 - (1+NUMPES - ip)*3*T3 JPOS(ip) = IPOS(ip) Enddo ! ip C--Generate Initial REG Values K1(MYPE1) = D( IPOS(MYPE1)) K2(MYPE1) = D(1+IPOS(MYPE1)) K3(MYPE1) = D(2+IPOS(MYPE1)) IPOS(MYPE1) = IPOS(MYPE1) + 3 K1ip = K1(MYPE1) K2ip = K2(MYPE1) K3ip = K3(MYPE1) Do I = 1,N Do J = 1,N LOCREG(I,J,0) = TAB(J,I,K1ip,K2ip) + TIMES(J,K3ip) Enddo ! J Enddo ! I IBUF = 1 IBUF2 = 0 C--Compute product of matrices in each sub-region Do It = 2, T3 ip = MYPE1 K1(ip) = D( IPOS(ip)) K2(ip) = D(1+IPOS(ip)) K3(ip) = D(2+IPOS(ip)) IPOS(ip) = IPOS(ip) + 3 K1ip = K1(ip) K2ip = K2(ip) K3ip = K3(ip) Do I = 1,N Do J = 1,N LOCREG(J,I,IBUF) = LOCREG(J,1,IBUF2) + TAB(I,1,K1ip,K2ip) LOCPATHS(J,I,It)=1 Do M = 2,N TEMP = LOCREG(J,M,IBUF2) + TAB(I,M,K1ip,K2ip) If ( TEMP.GT.LOCREG(J,I,IBUF) ) Then LOCREG(J,I,IBUF) = TEMP LOCPATHS(J,I,It) = M Endif Enddo ! Maximum LOCREG(J,I,IBUF) = LOCREG(J,I,IBUF) + TIMES(I,K3ip) Enddo ! J Enddo ! I IBUF2 = IBUF IBUF = 1 - IBUF Enddo ! It = 2, T3 ALLTIMES(7) = ALLTIMES(7) + (CPUTIME() - X0) ALLTIMES(8) = ALLTIMES(8) + (WALL() -Y0) C--Send LOCREG back to IONODE for processing C--compiled with autodbl so all reals are twice the size X0 = CPUTIME() Y0 = WALL() c-nobarr CALL MPI_BARRIER(MPI_COMM_WORLD,IERR) REGSIZE = N*N*1 CALL MPI_ALLGATHER(LOCREG(1,1,IBUF2),REGSIZE,MPI_REAL8, x ALLREG,REGSIZE,MPI_REAL8, x MPI_COMM_WORLD,IERR) C--Broadcast ALLREG to all nodes c-nobarr CALL MPI_BARRIER(MPI_COMM_WORLD,IERR) c REGSIZE = REGSIZE*NUMPES c CALL MPI_BCAST(ALLREG,REGSIZE,MPI_REAL8,IONODE, c x MPI_COMM_WORLD,IERR) c-nobarr CALL MPI_BARRIER(MPI_COMM_WORLD,IERR) c Check time and publish results for matrix products c and the gather/broadcasts ALLTIMES(9) = ALLTIMES(9) + (CPUTIME() - X0) ALLTIMES(10) = ALLTIMES(10) + (WALL() - Y0) c PRINT 97,I1,3*T3,NUMPES c 97 FORMAT('Single step Region = ',I7,/, c $ 'Parallel Sub-region = ',I7,/, c $ 'Vector Length = ',I7) C--Now start the sequential portion C--Initialize the initial Y array X0 = CPUTIME() Y0 = WALL() Do I = 1,N Y(I,0) = 0.0 Enddo ! I JBUF = 1 JBUF2 = 0 If (I1 .NE. 0) Then Do It =1,I1 Do J = 1,N Y(J,JBUF) = Y(1,JBUF2) + A(1,J,D(It)) P(J,It) = 1 Enddo ! J Do J = 2,N Do M = 1,N TEMP = Y(J,JBUF2) + A(J,M,D(It)) If (TEMP .GT. Y(M,JBUF) ) Then Y(M,JBUF) = TEMP P(M, It) = J ENDIF Enddo ! Maximum Enddo ! J JBUF2 = JBUF JBUF = 1 - JBUF Enddo ! residual single steps Endif ! residual single steps C--Giant step over each sub-region Do ip = 1,NUMPES Do J = 1,N Y(J,JBUF) = Y(1,JBUF2) + ALLREG(1,J,ip) P(J,I1+ip) = 1 Enddo ! J Do J = 2,N Do M = 1,N TEMP = Y(J,JBUF2) + ALLREG(J,M,ip) IF(TEMP .GT. Y(M,JBUF)) THEN Y(M,JBUF) = TEMP P(M,I1+ip) = J ENDIF Enddo ! Maximum Enddo ! J C--Update buffer pointers JBUF2 = JBUF JBUF = 1 - JBUF Enddo ! ip ALLTIMES(11) = ALLTIMES(11) + (CPUTIME() - X0) ALLTIMES(12) = ALLTIMES(12) + (WALL() -Y0) C--Now find the end of the best path C--fragment and parallelize this part X0 = CPUTIME() Y0 = WALL() C--initialize B for debug Do It = 0,T_PARAM LOCB(It) = -1 Enddo C--Find the max value Z = Y(1,JBUF2) IJ = 1 Do M = 2,N If (Y(M,JBUF2) .GT. Z) Then Z = Y(M,JBUF2) IJ = M ENDIF Enddo ! Maximum LOCB(T_PARAM) = IJ C--Giant step over each parallel sub-region Do ip = NUMPES,1,-1 IJ = P(IJ,I1+ip) LOCB(JPOS(ip)-1) = IJ Enddo ! ip C--Step over initial sequential region If (I1 .NE. 0) Then Do It = I1,1,-1 LOCB(It-1) = P(LOCB(It),It) Enddo ! It Endif C--Now step through each parallel region ip = MYPE1 K1ip = LOCB(JPOS(ip) - 1) K2ip = LOCB(JPOS(ip) + T3*3 - 1) JTEMP = JPOS(ip) - 1 ip = MYPE1 Do It = T3-1,1,-1 LOCB(3*It + JTEMP) = LOCPATHS(K1ip,K2ip,It+1) K2ip = LOCB(3*It + JTEMP) Enddo ! It C--Now gather the B results back on the IONODE c-nobarr CALL MPI_BARRIER(MPI_COMM_WORLD,IERR) C--Move the residual steps onto B first IF(MYPE .EQ. IONODE) THEN Do I=0,I1-1 B(I) = LOCB(I) Enddo ! single steps ENDIF C--Gather from LOCB to B BSIZE = 3*T3 CALL MPI_GATHER(LOCB(JPOS(ip)-1),BSIZE,MPI_INTEGER8, x B(JPOS(ip)-1) ,BSIZE,MPI_INTEGER8, x IONODE,MPI_COMM_WORLD,IERR) C--Empty slots are filled in backwards on B C--make sure last value is correct B(T_PARAM) = LOCB(T_PARAM) C--fill in slots on the IONODE IF ( MYPE .EQ. IONODE ) Then Do It = T_PARAM-2,I1+1,-3 J = B(It-1) I = B(It+2) KK1 = D(It) KK2 = D(It+1) B(It+1) = TABP(I,J,KK1,KK2,2) B(It ) = TABP(I,J,KK1,KK2,1) Enddo ! It Endif ! MYPE = IONODE ALLTIMES(13) = ALLTIMES(13) + (CPUTIME() - X0) ALLTIMES(14) = ALLTIMES(14) + (WALL() -Y0) deallocate(ALLREG) deallocate(LOCPATHS) deallocate(LOCREG) deallocate(IPOS) deallocate(JPOS) deallocate(K1) deallocate(K2) deallocate(K3) RETURN END SUBROUTINE G8D(A,AA,TAB,TABP,TIMES,N,K,T,MN,MK,FPMIN) c cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc c c Subroutine to take the log of the A matrices, and to c generate the tables to be used in the 3-stepping. c cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc c c The maximum size of the matrices and vectors PARAMETER (MM=30) REAL A(N,N,K) REAL AA(N,N) REAL TAB(MN,MN,MK,MK) INTEGER TABP(MN,MN,MK,MK,2) REAL TIMES(N,K) REAL VTEMP(MM,MM) INTEGER PTEMP(MM,MM) INTEGER T C--Some checking to make sure that arrays are large enough. IF(MM .LT. N) THEN PRINT 5,MM,N 5 FORMAT('Insufficient space in working arrays for G8D.',/, $ 'Size available from parameter MM = ',I5,/, $ 'Needed = ',I6) STOP ENDIF C--Scale all the matrices, and the array TIMES DO 30 J = 1,K DO 20 I = 1,N IF(TIMES(I,J) .GT. FPMIN) THEN TIMES(I,J) = ALOG(TIMES(I,J)) ELSE TIMES(I,J) = ALOG(FPMIN) ENDIF DO 10 II = 1,N IF(A(II,I,J) .GT. FPMIN) THEN A(II,I,J) = ALOG(A(II,I,J)) ELSE A(II,I,J) = ALOG(FPMIN) ENDIF 10 CONTINUE 20 CONTINUE 30 CONTINUE DO 50 I=1,N DO 40 J=1,N IF(AA(I,J) .GT. FPMIN) THEN AA(I,J) = ALOG(AA(I,J)) ELSE AA(I,J) = ALOG(FPMIN) ENDIF 40 CONTINUE 50 CONTINUE C--If N, K and T warrant, multiply together all K*K permutations of the C--K input matrices. IF(T .LE. (3*K*K*N)) RETURN DO 190 I = 1,K DO 180 J = 1,K DO 130 KK = 1,N DO 100 L = 1,N VTEMP(KK,L) = A(KK,1,I) + A(1,L,J) PTEMP(KK,L) = 1 100 CONTINUE DO 120 M = 2,N DO 110 L = 1,N TMP = A(KK,M,I)+A(M,L,J) IF(TMP .GT. VTEMP(KK,L)) THEN VTEMP(KK,L) = TMP PTEMP(KK,L) = M ENDIF 110 CONTINUE 120 CONTINUE 130 CONTINUE C--Multiply all K*K matrices in the table by the first A matrix. Then, C--to step by three, one just has to multiply by the appropriate matrix C--in the table using the first two steps as an index, and then using C--the multiplication factors of the third step to readjust the columns C--of the output. DO 170 KK = 1,N DO 140 L = 1,N TAB(L,KK,I,J) = VTEMP(KK,1) + AA(1,L) TABP(L,KK,I,J,1) = PTEMP(KK,1) TABP(L,KK,I,J,2) = 1 140 CONTINUE DO 160 M = 2,N DO 150 L = 1,N TEMP = VTEMP(KK,M) + AA(M,L) IF(TEMP .GT. TAB(L,KK,I,J)) THEN TAB(L,KK,I,J) = TEMP TABP(L,KK,I,J,1) = PTEMP(KK,M) TABP(L,KK,I,J,2) = M ENDIF 150 CONTINUE 160 CONTINUE 170 CONTINUE 180 CONTINUE 190 CONTINUE RETURN END