1) Several bug fixes for making the remote threads work properly under the Always/Open mode;

2) small cosmethics;
time-shift
Marco Ratto 2010-04-14 17:19:21 +02:00
parent f743f0375d
commit 0e98d4266c
1 changed files with 104 additions and 79 deletions

View File

@ -47,36 +47,40 @@ function [fOutVar,nBlockPerCPU, totCPU] = masterParallelMan(Parallel,fBlock,nBlo
% %
% You should have received a copy of the GNU General Public License % You should have received a copy of the GNU General Public License
% along with Dynare. If not, see <http://www.gnu.org/licenses/>. % along with Dynare. If not, see <http://www.gnu.org/licenses/>.
% Delete the traces (if exists) of last section computations.
persistent initialize persistent initialize
if isempty(initialize), if isempty(initialize),
mydelete(['P_slave_*End.txt']); mydelete(['P_slave_*End.txt']);
mydelete(['slaveParallel_input*.mat']); mydelete(['slaveParallel_input*.mat']);
initialize = 0; initialize = 0;
pause(1), pause(1),
end end
totCPU=0; totCPU=0;
% Determine my hostname and my working directory % Determine my hostname and my working directory
DyMo=pwd; DyMo=pwd;
fInputVar.DyMo=DyMo; fInputVar.DyMo=DyMo;
if isunix || (~matlab_ver_less_than('7.4') && ismac) , if isunix || (~matlab_ver_less_than('7.4') && ismac) ,
% [tempo, MasterName]=system(['ifconfig | grep ''inet addr:''| grep -v ''127.0.0.1'' | cut -d: -f2 | awk ''{ print $1}''']); % [tempo, MasterName]=system(['ifconfig | grep ''inet addr:''| grep -v ''127.0.0.1'' | cut -d: -f2 | awk ''{ print $1}''']);
[tempo, MasterName]=system('hostname --fqdn'); [tempo, MasterName]=system('hostname --fqdn');
else else
[tempo, MasterName]=system('hostname'); [tempo, MasterName]=system('hostname');
end end
MasterName=deblank(MasterName); MasterName=deblank(MasterName);
fInputVar.MasterName = MasterName; fInputVar.MasterName = MasterName;
% Save input data for use by the slaves % Save input data for use by the slaves
if exist('fGlobalVar'), if exist('fGlobalVar'),
save(['temp_input.mat'],'fInputVar','fGlobalVar') save(['temp_input.mat'],'fInputVar','fGlobalVar')
else else
save(['temp_input.mat'],'fInputVar') save(['temp_input.mat'],'fInputVar')
end end
save(['temp_input.mat'],'Parallel','-append') save(['temp_input.mat'],'Parallel','-append')
% Determine the total number of available CPUs, and the number of threads to run on each CPU % Determine the total number of available CPUs, and the number of threads to run on each CPU
for j=1:length(Parallel), for j=1:length(Parallel),
@ -131,54 +135,54 @@ for j=1:totCPU,
end end
pause(1); % wait for possibly local alive CPU to start the new job or close by internal criteria pause(1); % wait for possibly local alive CPU to start the new job or close by internal criteria
newInstance = 0; newInstance = 0;
if isempty( dir(['P_slave_',int2str(j),'End.txt'])); % check if j CPU is already alive if isempty( dir(['P_slave_',int2str(j),'End.txt'])); % check if j CPU is already alive
fid1=fopen(['P_slave_',int2str(j),'End.txt'],'w+'); fid1=fopen(['P_slave_',int2str(j),'End.txt'],'w+');
fclose(fid1); fclose(fid1);
newInstance = 1; newInstance = 1;
storeGlobalVars( ['slaveParallel_input',int2str(j)]); storeGlobalVars( ['slaveParallel_input',int2str(j)]);
save( ['slaveParallel_input',int2str(j)],'Parallel','-append'); save( ['slaveParallel_input',int2str(j)],'Parallel','-append');
% prepare global vars for Slave % prepare global vars for Slave
end end
if Parallel(indPC).Local == 1 & newInstance, % run on the local machine if Parallel(indPC).Local == 1 & newInstance, % run on the local machine
if isunix || (~matlab_ver_less_than('7.4') && ismac), if isunix || (~matlab_ver_less_than('7.4') && ismac),
if exist('OCTAVE_VERSION') if exist('OCTAVE_VERSION')
% command1=['octave --eval fParallel\(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',\''',fname,'\''\) &']; % command1=['octave --eval fParallel\(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',\''',fname,'\''\) &'];
command1=['octave --eval slaveParallel\(',int2str(j),',',int2str(indPC),'\) &']; command1=['octave --eval slaveParallel\(',int2str(j),',',int2str(indPC),'\) &'];
else else
% command1=['matlab -nosplash -nodesktop -minimize -r fParallel\(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',\''',fname,'\''\) &']; % command1=['matlab -nosplash -nodesktop -minimize -r fParallel\(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',\''',fname,'\''\) &'];
command1=['matlab -nosplash -nodesktop -minimize -r slaveParallel\(',int2str(j),',',int2str(indPC),'\) &']; command1=['matlab -nosplash -nodesktop -minimize -r slaveParallel\(',int2str(j),',',int2str(indPC),'\) &'];
end end
else else
if exist('OCTAVE_VERSION') if exist('OCTAVE_VERSION')
% command1=['start /B psexec -W ',DyMo, ' -a ',int2str(Parallel(indPC).NumCPU(j-nCPU0)),' -low octave --eval fParallel(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',''',fname,''')']; % command1=['start /B psexec -W ',DyMo, ' -a ',int2str(Parallel(indPC).NumCPU(j-nCPU0)),' -low octave --eval fParallel(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',''',fname,''')'];
command1=['start /B psexec -W ',DyMo, ' -a ',int2str(Parallel(indPC).NumCPU(j-nCPU0)),' -low octave --eval slaveParallel(',int2str(j),',',int2str(indPC),')']; command1=['start /B psexec -W ',DyMo, ' -a ',int2str(Parallel(indPC).NumCPU(j-nCPU0)),' -low octave --eval slaveParallel(',int2str(j),',',int2str(indPC),')'];
else else
% command1=['start /B psexec -W ',DyMo, ' -a ',int2str(Parallel(indPC).NumCPU(j-nCPU0)),' -low matlab -nosplash -nodesktop -minimize -r fParallel(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',''',fname,''')']; % command1=['start /B psexec -W ',DyMo, ' -a ',int2str(Parallel(indPC).NumCPU(j-nCPU0)),' -low matlab -nosplash -nodesktop -minimize -r fParallel(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',''',fname,''')'];
command1=['start /B psexec -W ',DyMo, ' -a ',int2str(Parallel(indPC).NumCPU(j-nCPU0)),' -low matlab -nosplash -nodesktop -minimize -r slaveParallel(',int2str(j),',',int2str(indPC),')']; command1=['start /B psexec -W ',DyMo, ' -a ',int2str(Parallel(indPC).NumCPU(j-nCPU0)),' -low matlab -nosplash -nodesktop -minimize -r slaveParallel(',int2str(j),',',int2str(indPC),')'];
end end
end end
elseif Parallel(indPC).Local==0, elseif Parallel(indPC).Local==0,
if isunix || (~matlab_ver_less_than('7.4') && ismac), if isunix || (~matlab_ver_less_than('7.4') && ismac),
% [tempo, RemoteName]=system(['ssh ',Parallel(indPC).user,'@',Parallel(indPC).PcName,' "ifconfig | grep \''inet addr:\''| grep -v \''127.0.0.1\'' | cut -d: -f2 | awk \''{ print $1}\''"']); % [tempo, RemoteName]=system(['ssh ',Parallel(indPC).user,'@',Parallel(indPC).PcName,' "ifconfig | grep \''inet addr:\''| grep -v \''127.0.0.1\'' | cut -d: -f2 | awk \''{ print $1}\''"']);
[tempo, RemoteName]=system(['ssh ',Parallel(indPC).user,'@',Parallel(indPC).PcName,' "hostname --fqdn"']); [tempo, RemoteName]=system(['ssh ',Parallel(indPC).user,'@',Parallel(indPC).PcName,' "hostname --fqdn"']);
RemoteName=RemoteName(1:end-1); RemoteName=RemoteName(1:end-1);
RemoteFolder = Parallel(indPC).RemoteFolder; RemoteFolder = Parallel(indPC).RemoteFolder;
else else
RemoteName = Parallel(indPC).PcName; RemoteName = Parallel(indPC).PcName;
RemoteFolder = [Parallel(indPC).RemoteDrive,':\',Parallel(indPC).RemoteFolder]; RemoteFolder = [Parallel(indPC).RemoteDrive,':\',Parallel(indPC).RemoteFolder];
end end
remoteFlag=1; remoteFlag=1;
if strcmpi(RemoteName,MasterName), if strcmpi(RemoteName,MasterName),
if ~copyfile(['P_',fname,'_',int2str(j),'End.txt'],RemoteFolder), if ~copyfile(['P_',fname,'_',int2str(j),'End.txt'],RemoteFolder),
remoteFlag=0; remoteFlag=0;
end end
end end
if remoteFlag, if remoteFlag,
if j==nCPU0+1, if (j==nCPU0+1) & newInstance, % clean remote folder
if newInstance, % clean remote folder
if isunix || (~matlab_ver_less_than('7.4') && ismac), if isunix || (~matlab_ver_less_than('7.4') && ismac),
system(['ssh ',Parallel(indPC).user,'@',Parallel(indPC).PcName,' rm -fr ',Parallel(indPC).RemoteFolder,'/*']); system(['ssh ',Parallel(indPC).user,'@',Parallel(indPC).PcName,' rm -fr ',Parallel(indPC).RemoteFolder,'/*']);
else else
@ -191,60 +195,62 @@ for j=1:totCPU,
end end
end end
end end
end
if isunix || (~matlab_ver_less_than('7.4') && ismac),
% system(['scp ',fname,'_input.mat ',Parallel(indPC).user,'@',Parallel(indPC).PcName,':',Parallel(indPC).RemoteFolder]);
for jfil=1:size(NamFileInput,1)
if ~isempty(NamFileInput{jfil,1})
system(['ssh ',Parallel(indPC).user,'@',Parallel(indPC).PcName,' mkdir -p ',Parallel(indPC).RemoteFolder,'/',NamFileInput{jfil,1}])
end
system(['scp ',NamFileInput{jfil,1},NamFileInput{jfil,2},' ',Parallel(indPC).user,'@',Parallel(indPC).PcName,':',Parallel(indPC).RemoteFolder,'/',NamFileInput{jfil,1}]);
end end
system(['scp slaveJob',int2str(j),'.mat ',Parallel(indPC).user,'@',Parallel(indPC).PcName,':',Parallel(indPC).RemoteFolder]);
if isunix || (~matlab_ver_less_than('7.4') && ismac), if newInstance,
system(['scp ',fname,'_input.mat ',Parallel(indPC).user,'@',Parallel(indPC).PcName,':',Parallel(indPC).RemoteFolder]); system(['scp slaveParallel_input',int2str(j),'.mat ',Parallel(indPC).user,'@',Parallel(indPC).PcName,':',Parallel(indPC).RemoteFolder]);
for jfil=1:size(NamFileInput,1) end
if ~isempty(NamFileInput{jfil,1}) else
system(['ssh ',Parallel(indPC).user,'@',Parallel(indPC).PcName,' mkdir -p ',Parallel(indPC).RemoteFolder,'/',NamFileInput{jfil,1}]) % copyfile([fname,'_input.mat'], ['\\',Parallel(indPC).PcName,'\',Parallel(indPC).RemoteDrive,'$\',Parallel(indPC).RemoteFolder])
end for jfil=1:size(NamFileInput,1)
system(['scp ',NamFileInput{jfil,1},NamFileInput{jfil,2},' ',Parallel(indPC).user,'@',Parallel(indPC).PcName,':',Parallel(indPC).RemoteFolder,'/',NamFileInput{jfil,1}]); if ~isempty(NamFileInput{jfil,1})
end mkdir(['\\',Parallel(indPC).PcName,'\',Parallel(indPC).RemoteDrive,'$\',Parallel(indPC).RemoteFolder,'\',NamFileInput{jfil,1}]);
system(['scp slaveJob',int2str(j),'.mat ',Parallel(indPC).user,'@',Parallel(indPC).PcName,':',Parallel(indPC).RemoteFolder]);
if newInstance,
system(['scp slaveParallel_input',int2str(j),'.mat ',Parallel(indPC).user,'@',Parallel(indPC).PcName,':',Parallel(indPC).RemoteFolder]);
end
else
copyfile([fname,'_input.mat'], ['\\',Parallel(indPC).PcName,'\',Parallel(indPC).RemoteDrive,'$\',Parallel(indPC).RemoteFolder]);
for jfil=1:size(NamFileInput,1)
copyfile([NamFileInput{jfil,1},NamFileInput{jfil,2}],['\\',Parallel(indPC).PcName,'\',Parallel(indPC).RemoteDrive,'$\',Parallel(indPC).RemoteFolder,'\',NamFileInput{jfil,1}])
end
copyfile(['slaveJob',int2str(j),'.mat'], ['\\',Parallel(indPC).PcName,'\',Parallel(indPC).RemoteDrive,'$\',Parallel(indPC).RemoteFolder]);
if newInstance,
copyfile(['slaveParallel_input',int2str(j),'.mat'], ['\\',Parallel(indPC).PcName,'\',Parallel(indPC).RemoteDrive,'$\',Parallel(indPC).RemoteFolder]);
end end
copyfile([NamFileInput{jfil,1},NamFileInput{jfil,2}],['\\',Parallel(indPC).PcName,'\',Parallel(indPC).RemoteDrive,'$\',Parallel(indPC).RemoteFolder,'\',NamFileInput{jfil,1}])
end
copyfile(['slaveJob',int2str(j),'.mat'], ['\\',Parallel(indPC).PcName,'\',Parallel(indPC).RemoteDrive,'$\',Parallel(indPC).RemoteFolder]);
if newInstance,
copyfile(['slaveParallel_input',int2str(j),'.mat'], ['\\',Parallel(indPC).PcName,'\',Parallel(indPC).RemoteDrive,'$\',Parallel(indPC).RemoteFolder]);
end end
end end
end end
if newInstance, if newInstance,
if isunix || (~matlab_ver_less_than('7.4') && ismac), if isunix || (~matlab_ver_less_than('7.4') && ismac),
if exist('OCTAVE_VERSION'), if exist('OCTAVE_VERSION'),
command1=['ssh ',Parallel(indPC).user,'@',Parallel(indPC).PcName,' "cd ',Parallel(indPC).RemoteFolder, '; octave --eval fParallel\(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',\''',fname,'\''\);" &']; command1=['ssh ',Parallel(indPC).user,'@',Parallel(indPC).PcName,' "cd ',Parallel(indPC).RemoteFolder, '; octave --eval slaveParallel\(',int2str(j),',',int2str(indPC),'\);" &'];
else
command1=['ssh ',Parallel(indPC).user,'@',Parallel(indPC).PcName,' "cd ',Parallel(indPC).RemoteFolder, '; matlab -nosplash -nodesktop -minimize -r slaveParallel\(',int2str(j),',',int2str(indPC),'\);" &'];
end
else else
command1=['ssh ',Parallel(indPC).user,'@',Parallel(indPC).PcName,' "cd ',Parallel(indPC).RemoteFolder, '; matlab -nosplash -nodesktop -minimize -r fParallel\(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',\''',fname,'\''\);" &']; if ~strcmp(Parallel(indPC).PcName,MasterName), % run on a remote machine
end if exist('OCTAVE_VERSION'),
else command1=['start /B psexec \\',Parallel(indPC).PcName,' -e -u ',Parallel(indPC).user,' -p ',Parallel(indPC).passwd,' -W ',Parallel(indPC).RemoteDrive,':\',Parallel(indPC).RemoteFolder,'\ -a ',int2str(Parallel(indPC).NumCPU(j-nCPU0)), ...
if ~strcmp(Parallel(indPC).PcName,MasterName), % run on a remote machine ' -low octave --eval slaveParallel(',int2str(j),',',int2str(indPC),')'];
if exist('OCTAVE_VERSION'), else
command1=['start /B psexec \\',Parallel(indPC).PcName,' -e -u ',Parallel(indPC).user,' -p ',Parallel(indPC).passwd,' -W ',Parallel(indPC).RemoteDrive,':\',Parallel(indPC).RemoteFolder,'\ -a ',int2str(Parallel(indPC).NumCPU(j-nCPU0)), ... command1=['start /B psexec \\',Parallel(indPC).PcName,' -e -u ',Parallel(indPC).user,' -p ',Parallel(indPC).passwd,' -W ',Parallel(indPC).RemoteDrive,':\',Parallel(indPC).RemoteFolder,'\ -a ',int2str(Parallel(indPC).NumCPU(j-nCPU0)), ...
' -low octave --eval fParallel(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',''',fname,''')']; ' -low matlab -nosplash -nodesktop -minimize -r slaveParallel(',int2str(j),',',int2str(indPC),')'];
else end
command1=['start /B psexec \\',Parallel(indPC).PcName,' -e -u ',Parallel(indPC).user,' -p ',Parallel(indPC).passwd,' -W ',Parallel(indPC).RemoteDrive,':\',Parallel(indPC).RemoteFolder,'\ -a ',int2str(Parallel(indPC).NumCPU(j-nCPU0)), ... else % run on the local machine via the network
' -low matlab -nosplash -nodesktop -minimize -r fParallel(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',''',fname,''')']; if exist('OCTAVE_VERSION'),
end command1=['start /B psexec \\',Parallel(indPC).PcName,' -e -W ',Parallel(indPC).RemoteDrive,':\',Parallel(indPC).RemoteFolder,'\ -a ',int2str(Parallel(indPC).NumCPU(j-nCPU0)), ...
else % run on the local machine via the network ' -low octave --eval slaveParallel(',int2str(j),',',int2str(indPC),')'];
if exist('OCTAVE_VERSION'), else
command1=['start /B psexec \\',Parallel(indPC).PcName,' -e -W ',Parallel(indPC).RemoteDrive,':\',Parallel(indPC).RemoteFolder,'\ -a ',int2str(Parallel(indPC).NumCPU(j-nCPU0)), ... command1=['start /B psexec \\',Parallel(indPC).PcName,' -e -W ',Parallel(indPC).RemoteDrive,':\',Parallel(indPC).RemoteFolder,'\ -a ',int2str(Parallel(indPC).NumCPU(j-nCPU0)), ...
' -low octave --eval fParallel(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',''',fname,''')']; ' -low matlab -nosplash -nodesktop -minimize -r slaveParallel(',int2str(j),',',int2str(indPC),')'];
else end
command1=['start /B psexec \\',Parallel(indPC).PcName,' -e -W ',Parallel(indPC).RemoteDrive,':\',Parallel(indPC).RemoteFolder,'\ -a ',int2str(Parallel(indPC).NumCPU(j-nCPU0)), ...
' -low matlab -nosplash -nodesktop -minimize -r fParallel(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',''',fname,''')'];
end end
end end
end end
end
end end
fprintf(fid,'%s\n',command1); fprintf(fid,'%s\n',command1);
end end
@ -268,25 +274,28 @@ if exist('OCTAVE_VERSION'),
printf('\n'); printf('\n');
else else
hfigstatus = figure('name',['Parallel ',fname],... hfigstatus = figure('name',['Parallel ',fname],...
'MenuBar', 'none', ... 'MenuBar', 'none', ...
'NumberTitle','off'); 'NumberTitle','off');
vspace = 0.1; vspace = 0.1;
ncol = ceil(totCPU/10); ncol = ceil(totCPU/10);
hspace = 0.9/ncol; hspace = 0.9/ncol;
for j=1:totCPU, for j=1:totCPU,
jrow = mod(j-1,10)+1; jrow = mod(j-1,10)+1;
jcol = ceil(j/10); jcol = ceil(j/10);
hstatus(j) = axes('position',[0.05/ncol+(jcol-1)/ncol 0.92-vspace*(jrow-1) 0.9/ncol 0.03], ... hstatus(j) = axes('position',[0.05/ncol+(jcol-1)/ncol 0.92-vspace*(jrow-1) 0.9/ncol 0.03], ...
'box','on','xtick',[],'ytick',[],'xlim',[0 1],'ylim',[0 1]); 'box','on','xtick',[],'ytick',[],'xlim',[0 1],'ylim',[0 1]);
end end
cumBlockPerCPU = cumsum(nBlockPerCPU); cumBlockPerCPU = cumsum(nBlockPerCPU);
end end
pcerdone = NaN(1,totCPU); pcerdone = NaN(1,totCPU);
while (1) while (1)
waitbarString = ''; waitbarString = '';
statusString = ''; statusString = '';
pause(1) pause(1)
stax = dir(['comp_status_',fname,'*.mat']);
stax = dir(['comp_status_',fname,'*.mat']);
for j=1:length(stax), for j=1:length(stax),
try try
@ -295,8 +304,8 @@ while (1)
if exist('OCTAVE_VERSION'), if exist('OCTAVE_VERSION'),
statusString = [statusString, int2str(j), ' %3.f%% done! ']; statusString = [statusString, int2str(j), ' %3.f%% done! '];
else else
status_String{j} = waitbarString; status_String{j} = waitbarString;
status_Title{j} = waitbarTitle; status_Title{j} = waitbarTitle;
idCPU(j) = njob; idCPU(j) = njob;
end end
if prtfrc==1, delete(stax(j).name), end if prtfrc==1, delete(stax(j).name), end
@ -307,19 +316,35 @@ while (1)
if exist('OCTAVE_VERSION'), if exist('OCTAVE_VERSION'),
printf([statusString,'\r'], 100 .* pcerdone); printf([statusString,'\r'], 100 .* pcerdone);
else else
figure(hfigstatus), figure(hfigstatus),
for j=1:length(stax), try
axes(hstatus(idCPU(j))), for j=1:length(stax)
hpat = findobj(hstatus(idCPU(j)),'Type','patch');
if ~isempty(hpat), axes(hstatus(idCPU(j))),
set(hpat,'XData',[0 0 pcerdone(j) pcerdone(j)]) hpat = findobj(hstatus(idCPU(j)),'Type','patch');
else
patch([0 0 pcerdone(j) pcerdone(j)],[0 1 1 0],'r','EdgeColor','r')
if ~isempty(hpat),
set(hpat,'XData',[0 0 pcerdone(j) pcerdone(j)])
else
patch([0 0 pcerdone(j) pcerdone(j)],[0 1 1 0],'r','EdgeColor','r')
end
title([status_Title{j},' - ',status_String{j}]);
end end
title([status_Title{j},' - ',status_String{j}]); catch
E='ERRORE in while cycle masterParallelMann!'
j
if j>1
j=j-1
end
j
end end
end end
if isempty(dir(['P_',fname,'_*End.txt'])) if isempty(dir(['P_',fname,'_*End.txt']))
mydelete(['comp_status_',fname,'*.mat']) mydelete(['comp_status_',fname,'*.mat'])
if ~exist('OCTAVE_VERSION'), if ~exist('OCTAVE_VERSION'),
close(hfigstatus), close(hfigstatus),