diff --git a/matlab/parallel/masterParallel.m b/matlab/parallel/masterParallel.m index 4fbe69a8c..42417a2a0 100644 --- a/matlab/parallel/masterParallel.m +++ b/matlab/parallel/masterParallel.m @@ -107,6 +107,18 @@ if nargin>8 && initialize==1 return end +% Determine the total number of available CPUs, and the number of threads +% to run on each CPU. + +[nCPU, totCPU, nBlockPerCPU, totSlaves] = distributeJobs(Parallel, fBlock, nBlock); + +parallel_recover = 0; + +if isfield(Parallel_info,'parallel_recover') && Parallel_info.parallel_recover + parallel_recover = 1; +end + +if parallel_recover ==0 if isfield(Parallel_info,'local_files') if isempty(NamFileInput) NamFileInput=Parallel_info.local_files; @@ -147,9 +159,9 @@ if isHybridMatlabOctave || isoctave end end -if Strategy==1 - totCPU=0; -end +% if Strategy==1 +% totCPU=0; +% end % Determine my hostname and my working directory. @@ -179,11 +191,6 @@ switch Strategy closeSlave(Parallel,PRCDir,-1); end - -% Determine the total number of available CPUs, and the number of threads -% to run on each CPU. - -[nCPU, totCPU, nBlockPerCPU, totSlaves] = distributeJobs(Parallel, fBlock, nBlock); for j=1:totSlaves PRCDirSnapshot{j}={}; end @@ -768,8 +775,13 @@ while (ForEver) end end - - +else + + for j=1:totSlaves + PRCDirSnapshot{j}={}; + end + flag_CloseAllSlaves = 0; +end % Load and format remote output. iscrash = 0; PRCDirSnapshot=dynareParallelGetNewFiles(PRCDir,Parallel(1:totSlaves),PRCDirSnapshot); @@ -902,4 +914,4 @@ switch Strategy end end end -end \ No newline at end of file +end diff --git a/matlab/posterior_sampler.m b/matlab/posterior_sampler.m index 62e8ccfe6..3fa23b3b7 100644 --- a/matlab/posterior_sampler.m +++ b/matlab/posterior_sampler.m @@ -117,7 +117,7 @@ end % User doesn't want to use parallel computing, or wants to compute a % single chain compute sequentially. -if isnumeric(options_.parallel) || (nblck-fblck)==0 +if isnumeric(options_.parallel) || (~isempty(fblck) && (nblck-fblck)==0) fout = posterior_sampler_core(localVars, fblck, nblck, 0); record = fout.record; % Parallel in Local or remote machine. @@ -142,6 +142,11 @@ else end % from where to get back results % NamFileOutput(1,:) = {[M_.dname,'/metropolis/'],'*.*'}; + if options_.mh_recover && isempty(fblck) + % here we just need to retrieve the output of the completed remote jobs + fblck=1; + options_.parallel_info.parallel_recover = 1; + end [fout, nBlockPerCPU, totCPU] = masterParallel(options_.parallel, fblck, nblck,NamFileInput,'posterior_sampler_core', localVars, globalVars, options_.parallel_info); for j=1:totCPU offset = sum(nBlockPerCPU(1:j-1))+fblck-1; @@ -151,7 +156,7 @@ else record.FunctionEvalPerIteration(offset+1:sum(nBlockPerCPU(1:j)))=fout(j).record.FunctionEvalPerIteration(offset+1:sum(nBlockPerCPU(1:j))); record.LastSeeds(offset+1:sum(nBlockPerCPU(1:j)))=fout(j).record.LastSeeds(offset+1:sum(nBlockPerCPU(1:j))); end - + options_.parallel_info.parallel_recover = 0; end irun = fout(1).irun; diff --git a/matlab/posterior_sampler_core.m b/matlab/posterior_sampler_core.m index b753391b4..65aa05ea6 100644 --- a/matlab/posterior_sampler_core.m +++ b/matlab/posterior_sampler_core.m @@ -137,14 +137,37 @@ for curr_block = fblck:nblck % If the state set by master is incompatible with the slave, we only reseed set_dynare_seed(options_.DynareRandomStreams.seed+curr_block); end + mh_recover_flag=0; if (options_.load_mh_file~=0) && (fline(curr_block)>1) && OpenOldFile(curr_block) %load previous draws and likelihood load([BaseName '_mh' int2str(NewFile(curr_block)) '_blck' int2str(curr_block) '.mat']) x2 = [x2;zeros(InitSizeArray(curr_block)-fline(curr_block)+1,npar)]; logpo2 = [logpo2;zeros(InitSizeArray(curr_block)-fline(curr_block)+1,1)]; OpenOldFile(curr_block) = 0; else - x2 = zeros(InitSizeArray(curr_block),npar); - logpo2 = zeros(InitSizeArray(curr_block),1); + if options_.mh_recover && exist([BaseName '_mh_tmp_blck' int2str(curr_block) '.mat'],'file')==2 + load([BaseName '_mh_tmp_blck' int2str(curr_block) '.mat']); + draw_iter = size(neval_this_chain,2)+1; + draw_index_current_file = draw_iter; + feval_this_chain = sum(sum(neval_this_chain)); + feval_this_file = sum(sum(neval_this_chain)); + if feval_this_chain>draw_iter-1 + % non Metropolis type of sampler + accepted_draws_this_chain = draw_iter-1; + accepted_draws_this_file = draw_iter-1; + else + accepted_draws_this_chain = 0; + accepted_draws_this_file = 0; + end + mh_recover_flag=1; + set_dynare_random_generator_state(LastSeeds.(['file' int2str(NewFile(curr_block))]).Unifor, LastSeeds.(['file' int2str(NewFile(curr_block))]).Normal); + last_draw(curr_block,:)=x2(draw_iter-1,:); + last_posterior(curr_block)=logpo2(draw_iter-1); + + else + + x2 = zeros(InitSizeArray(curr_block),npar); + logpo2 = zeros(InitSizeArray(curr_block),1); + end end %Prepare waiting bars if whoiam @@ -158,13 +181,15 @@ for curr_block = fblck:nblck hh = dyn_waitbar(0,[bar_title ' (' int2str(curr_block) '/' int2str(options_.mh_nblck) ')...']); set(hh,'Name',bar_title); end - accepted_draws_this_chain = 0; - accepted_draws_this_file = 0; - feval_this_chain = 0; - feval_this_file = 0; - draw_index_current_file = fline(curr_block); %get location of first draw in current block - draw_iter = 1; - + if mh_recover_flag==0 + accepted_draws_this_chain = 0; + accepted_draws_this_file = 0; + feval_this_chain = 0; + feval_this_file = 0; + draw_iter = 1; + draw_index_current_file = fline(curr_block); %get location of first draw in current block + end + sampler_options.curr_block = curr_block; while draw_iter <= nruns(curr_block) [par, logpost, accepted, neval] = posterior_sampler_iteration(TargetFun, last_draw(curr_block,:), last_posterior(curr_block), sampler_options,dataset_,dataset_info,options_,M_,estim_params_,bayestopt_,mh_bounds,oo_); @@ -173,6 +198,7 @@ for curr_block = fblck:nblck last_draw(curr_block,:) = par; logpo2(draw_index_current_file) = logpost; last_posterior(curr_block) = logpost; + neval_this_chain(:, draw_iter) = neval; feval_this_chain = feval_this_chain + sum(neval); feval_this_file = feval_this_file + sum(neval); accepted_draws_this_chain = accepted_draws_this_chain + accepted; @@ -187,7 +213,7 @@ for curr_block = fblck:nblck end if save_tmp_file [LastSeeds.(['file' int2str(NewFile(curr_block))]).Unifor, LastSeeds.(['file' int2str(NewFile(curr_block))]).Normal] = get_dynare_random_generator_state(); - save([BaseName '_mh_tmp_blck' int2str(curr_block) '.mat'],'x2','logpo2','LastSeeds'); + save([BaseName '_mh_tmp_blck' int2str(curr_block) '.mat'],'x2','logpo2','LastSeeds','neval_this_chain','accepted_draws_this_chain','accepted_draws_this_file','feval_this_chain','feval_this_file'); end end if (draw_index_current_file == InitSizeArray(curr_block)) || (draw_iter == nruns(curr_block)) % Now I save the simulations, either because the current file is full or the chain is done @@ -195,7 +221,7 @@ for curr_block = fblck:nblck if save_tmp_file delete([BaseName '_mh_tmp_blck' int2str(curr_block) '.mat']); end - save([BaseName '_mh' int2str(NewFile(curr_block)) '_blck' int2str(curr_block) '.mat'],'x2','logpo2','LastSeeds'); + save([BaseName '_mh' int2str(NewFile(curr_block)) '_blck' int2str(curr_block) '.mat'],'x2','logpo2','LastSeeds','accepted_draws_this_chain','accepted_draws_this_file','feval_this_chain','feval_this_file'); fidlog = fopen([MetropolisFolder '/metropolis.log'],'a'); fprintf(fidlog,['\n']); fprintf(fidlog,['%% Mh' int2str(NewFile(curr_block)) 'Blck' int2str(curr_block) ' (' datestr(now,0) ')\n']); @@ -239,10 +265,12 @@ for curr_block = fblck:nblck draw_iter=draw_iter+1; draw_index_current_file = draw_index_current_file + 1; end% End of the simulations for one mh-block. - record.AcceptanceRatio(curr_block) = accepted_draws_this_chain/(draw_iter-1); - record.FunctionEvalPerIteration(curr_block) = feval_this_chain/(draw_iter-1); dyn_waitbar_close(hh); - [record.LastSeeds(curr_block).Unifor, record.LastSeeds(curr_block).Normal] = get_dynare_random_generator_state(); + if nruns(curr_block) + record.AcceptanceRatio(curr_block) = accepted_draws_this_chain/(draw_iter-1); + record.FunctionEvalPerIteration(curr_block) = feval_this_chain/(draw_iter-1); + [record.LastSeeds(curr_block).Unifor, record.LastSeeds(curr_block).Normal] = get_dynare_random_generator_state(); + end OutputFileName(block_iter,:) = {[MetropolisFolder,filesep], [ModelName '_mh*_blck' int2str(curr_block) '.mat']}; end% End of the loop over the mh-blocks. @@ -250,4 +278,4 @@ end% End of the loop over the mh-blocks. myoutput.record = record; myoutput.irun = draw_index_current_file; myoutput.NewFile = NewFile; -myoutput.OutputFileName = OutputFileName; \ No newline at end of file +myoutput.OutputFileName = OutputFileName; diff --git a/matlab/posterior_sampler_initialization.m b/matlab/posterior_sampler_initialization.m index 364bf2b20..df580e2b3 100644 --- a/matlab/posterior_sampler_initialization.m +++ b/matlab/posterior_sampler_initialization.m @@ -376,28 +376,30 @@ elseif options_.mh_recover ExpectedNumberOfMhFiles = ExpectedNumberOfMhFilesPerBlock*NumberOfBlocks; % How many mh files do we actually have ? AllMhFiles = dir([BaseName '_mh*_blck*.mat']); - TotalNumberOfMhFiles = length(AllMhFiles); + TotalNumberOfMhFiles = length(AllMhFiles)-length(dir([BaseName '_mh_tmp*_blck*.mat'])); % Quit if no crashed mcmc chain can be found as there are as many files as expected - if (TotalNumberOfMhFiles==ExpectedNumberOfMhFiles) - disp('Estimation::mcmc: It appears that you don''t need to use the mh_recover option!') - disp(' You have to edit the mod file and remove the mh_recover option') - disp(' in the estimation command') - error('Estimation::mcmc: mh_recover option not required!') + if (TotalNumberOfMhFiles==ExpectedNumberOfMhFiles) + if isnumeric(options_.parallel) + disp('Estimation::mcmc: It appears that you don''t need to use the mh_recover option!') + disp(' You have to edit the mod file and remove the mh_recover option') + disp(' in the estimation command') + error('Estimation::mcmc: mh_recover option not required!') + end end % 2. Something needs to be done; find out what % Count the number of saved mh files per block. NumberOfMhFilesPerBlock = zeros(NumberOfBlocks,1); for b = 1:NumberOfBlocks BlckMhFiles = dir([BaseName '_mh*_blck' int2str(b) '.mat']); - NumberOfMhFilesPerBlock(b) = length(BlckMhFiles); + NumberOfMhFilesPerBlock(b) = length(BlckMhFiles)-length(dir([BaseName '_mh_tmp*_blck' int2str(b) '.mat'])); end % Find FirstBlock (First block), an integer targeting the crashed mcmc chain. FirstBlock = 1; %initialize + FBlock = zeros(NumberOfBlocks,1); while FirstBlock <= NumberOfBlocks if NumberOfMhFilesPerBlock(FirstBlock) < ExpectedNumberOfMhFilesPerBlock disp(['Estimation::mcmc: Chain ' int2str(FirstBlock) ' is not complete!']) - break - % The mh_recover session will start from chain FirstBlock. + FBlock(FirstBlock)=1; else disp(['Estimation::mcmc: Chain ' int2str(FirstBlock) ' is complete!']) end @@ -406,51 +408,84 @@ elseif options_.mh_recover %% 3. Overwrite default settings for % How many mh-files are saved in this block? - NumberOfSavedMhFilesInTheCrashedBlck = NumberOfMhFilesPerBlock(FirstBlock); - ExistingDrawsInLastMCFile=0; %initialize: no MCMC draws of current MCMC are in file from last run - % Check whether last present file is a file included in the last MCMC run - if ~LastFileFullIndicator - if NumberOfSavedMhFilesInTheCrashedBlck==NewFile(FirstBlock) %only that last file exists, but no files from current MCMC - loaded_results=load([BaseName '_mh' int2str(NewFile(FirstBlock)) '_blck' int2str(FirstBlock) '.mat']); - %check whether that last file was filled - if size(loaded_results.x2,1)==MAX_nruns %file is full - NewFile(FirstBlock)=NewFile(FirstBlock)+1; %set first file to be created to next one - FirstLine(FirstBlock) = 1; %use first line of next file - ExistingDrawsInLastMCFile=MAX_nruns-record.MhDraws(end-1,3); - else - ExistingDrawsInLastMCFile=0; + ExistingDrawsInLastMCFile=zeros(NumberOfBlocks,1); %initialize: no MCMC draws of current MCMC are in file from last run + % Check whether last present file is a file included in the last MCMC run + + update_record=0; + for k=1:NumberOfBlocks + FirstBlock = k; + if FBlock(k) + NumberOfSavedMhFilesInTheCrashedBlck=NumberOfMhFilesPerBlock(k); + if ~LastFileFullIndicator + if NumberOfSavedMhFilesInTheCrashedBlck==NewFile(FirstBlock) %only that last file exists, but no files from current MCMC + loaded_results=load([BaseName '_mh' int2str(NewFile(FirstBlock)) '_blck' int2str(FirstBlock) '.mat']); + %check whether that last file was filled + if size(loaded_results.x2,1)==MAX_nruns %file is full + NewFile(FirstBlock)=NewFile(FirstBlock)+1; %set first file to be created to next one + FirstLine(FirstBlock) = 1; %use first line of next file + ExistingDrawsInLastMCFile(FirstBlock)=MAX_nruns-record.MhDraws(end-1,3); + else + ExistingDrawsInLastMCFile(FirstBlock)=0; + end + end + elseif LastFileFullIndicator + ExistingDrawsInLastMCFile(FirstBlock)=0; + if NumberOfSavedMhFilesInTheCrashedBlck==NewFile(FirstBlock) %only the last file exists, but no files from current MCMC + NewFile(FirstBlock)=NewFile(FirstBlock)+1; %set first file to be created to next one + end + end + % % Correct the number of saved mh files if the crashed Metropolis was not the first session (so + % % that NumberOfSavedMhFilesInTheCrashedBlck is the number of saved mh files in the crashed chain + % % of the current session). + % if OldMhExists + % NumberOfSavedMhFilesInTheCrashedBlck = NumberOfSavedMhFilesInTheCrashedBlck - LastFileNumberInThePreviousMh; + % end + % NumberOfSavedMhFiles = NumberOfSavedMhFilesInTheCrashedBlck+LastFileNumberInThePreviousMh; + + % Correct initial conditions. + if NumberOfSavedMhFilesInTheCrashedBlck>0 && NumberOfSavedMhFilesInTheCrashedBlck