diff --git a/dynare++/integ/cc/quadrature.hh b/dynare++/integ/cc/quadrature.hh index ae10ea809..83a751498 100644 --- a/dynare++/integ/cc/quadrature.hh +++ b/dynare++/integ/cc/quadrature.hh @@ -121,7 +121,7 @@ public: smarter. */ void - operator()() override + operator()(std::mutex &mut) override { _Tpit beg = quad.begin(ti, tn, level); _Tpit end = quad.begin(ti+1, tn, level); @@ -138,7 +138,7 @@ public: } { - sthread::synchro syn(&outvec, "IntegrationWorker"); + std::unique_lock lk{mut}; outvec.add(1.0, tmpall); } } diff --git a/dynare++/kord/decision_rule.cc b/dynare++/kord/decision_rule.cc index 522760560..fc5dbce2e 100644 --- a/dynare++/kord/decision_rule.cc +++ b/dynare++/kord/decision_rule.cc @@ -480,12 +480,12 @@ IRFResults::writeMat(mat_t *fd, const char *prefix) const } void -SimulationWorker::operator()() +SimulationWorker::operator()(std::mutex &mut) { auto *esr = new ExplicitShockRealization(sr, np); TwoDMatrix *m = dr.simulate(em, np, st, *esr); { - sthread::synchro syn(&res, "simulation"); + std::unique_lock lk{mut}; res.addDataSet(m, esr); } } @@ -494,7 +494,7 @@ SimulationWorker::operator()() corresponding control, add the impulse, and simulate. */ void -SimulationIRFWorker::operator()() +SimulationIRFWorker::operator()(std::mutex &mut) { auto *esr = new ExplicitShockRealization(res.control.getShocks(idata)); @@ -504,13 +504,13 @@ SimulationIRFWorker::operator()() TwoDMatrix *m = dr.simulate(em, np, st, *esr); m->add(-1.0, res.control.getData(idata)); { - sthread::synchro syn(&res, "simulation"); + std::unique_lock lk{mut}; res.addDataSet(m, esr); } } void -RTSimulationWorker::operator()() +RTSimulationWorker::operator()(std::mutex &mut) { NormalConj nc(res.nc.getDim()); const PartitionY &ypart = dr.getYPart(); @@ -546,7 +546,7 @@ RTSimulationWorker::operator()() nc.update(y); } { - sthread::synchro syn(&res, "rtsimulation"); + std::unique_lock lk{mut}; res.nc.update(nc); if (res.num_per-ip > 0) { diff --git a/dynare++/kord/decision_rule.hh b/dynare++/kord/decision_rule.hh index 917fa9fb9..ef7210354 100644 --- a/dynare++/kord/decision_rule.hh +++ b/dynare++/kord/decision_rule.hh @@ -925,7 +925,7 @@ public: : res(sim_res), dr(dec_rule), em(emet), np(num_per), st(start), sr(shock_r) { } - void operator()() override; + void operator()(std::mutex &mut) override; }; /* This worker simulates a given impulse |imp| to a given shock @@ -951,7 +951,7 @@ public: idata(id), ishock(ishck), imp(impulse) { } - void operator()() override; + void operator()(std::mutex &mut) override; }; /* This class does the real time simulation job for @@ -977,7 +977,7 @@ public: : res(sim_res), dr(dec_rule), em(emet), np(num_per), ystart(start), sr(shock_r) { } - void operator()() override; + void operator()(std::mutex &mut) override; }; /* This class generates draws from Gaussian distribution with zero mean diff --git a/dynare++/tl/cc/stack_container.cc b/dynare++/tl/cc/stack_container.cc index a79c5bfa9..311f94b05 100644 --- a/dynare++/tl/cc/stack_container.cc +++ b/dynare++/tl/cc/stack_container.cc @@ -54,12 +54,12 @@ FoldedStackContainer::multAndAdd(int dim, const FGSContainer &c, FGSTensor &out) code@>|. */ void -WorkerFoldMAADense::operator()() +WorkerFoldMAADense::operator()(std::mutex &mut) { Permutation iden(dense_cont.num()); IntSequence coor(sym, iden.getMap()); const FGSTensor *g = dense_cont.get(sym); - cont.multAndAddStacks(coor, *g, out, &out); + cont.multAndAddStacks(coor, *g, out, mut); } WorkerFoldMAADense::WorkerFoldMAADense(const FoldedStackContainer &container, @@ -94,7 +94,7 @@ FoldedStackContainer::multAndAddSparse1(const FSSparseTensor &t, vertically narrow out accordingly. */ void -WorkerFoldMAASparse1::operator()() +WorkerFoldMAASparse1::operator()(std::mutex &mut) { const EquivalenceSet &eset = ebundle.get(out.dimen()); const PermutationSet &pset = tls.pbundle->get(t.dimen()); @@ -121,7 +121,7 @@ WorkerFoldMAASparse1::operator()() { FPSTensor fps(out.getDims(), it, slice, kp); { - sthread::synchro syn(&out, "WorkerUnfoldMAASparse1"); + std::unique_lock lk{mut}; fps.addTo(out); } } @@ -168,7 +168,7 @@ FoldedStackContainer::multAndAddSparse2(const FSSparseTensor &t, rows. */ void -WorkerFoldMAASparse2::operator()() +WorkerFoldMAASparse2::operator()(std::mutex &mut) { GSSparseTensor slice(t, cont.getStackSizes(), coor, TensorDimens(cont.getStackSizes(), coor)); @@ -181,10 +181,10 @@ WorkerFoldMAASparse2::operator()() int r2 = slice.getLastNonZeroRow(); FGSTensor dense_slice1(r1, r2-r1+1, dense_slice); FGSTensor out1(r1, r2-r1+1, out); - cont.multAndAddStacks(coor, dense_slice1, out1, &out); + cont.multAndAddStacks(coor, dense_slice1, out1, mut); } else - cont.multAndAddStacks(coor, slice, out, &out); + cont.multAndAddStacks(coor, slice, out, mut); } } @@ -257,12 +257,12 @@ FoldedStackContainer::multAndAddSparse4(const FSSparseTensor &t, FGSTensor &out) |multAndAddStacks|. */ void -WorkerFoldMAASparse4::operator()() +WorkerFoldMAASparse4::operator()(std::mutex &mut) { GSSparseTensor slice(t, cont.getStackSizes(), coor, TensorDimens(cont.getStackSizes(), coor)); if (slice.getNumNonZero()) - cont.multAndAddStacks(coor, slice, out, &out); + cont.multAndAddStacks(coor, slice, out, mut); } WorkerFoldMAASparse4::WorkerFoldMAASparse4(const FoldedStackContainer &container, @@ -284,7 +284,7 @@ WorkerFoldMAASparse4::WorkerFoldMAASparse4(const FoldedStackContainer &container void FoldedStackContainer::multAndAddStacks(const IntSequence &coor, const FGSTensor &g, - FGSTensor &out, const void *ad) const + FGSTensor &out, std::mutex &mut) const { const EquivalenceSet &eset = ebundle.get(out.dimen()); @@ -310,7 +310,7 @@ FoldedStackContainer::multAndAddStacks(const IntSequence &coor, kp.optimizeOrder(); FPSTensor fps(out.getDims(), it, sort_per, ug, kp); { - sthread::synchro syn(ad, "multAndAddStacks"); + std::unique_lock lk{mut}; fps.addTo(out); } } @@ -329,7 +329,7 @@ FoldedStackContainer::multAndAddStacks(const IntSequence &coor, void FoldedStackContainer::multAndAddStacks(const IntSequence &coor, const GSSparseTensor &g, - FGSTensor &out, const void *ad) const + FGSTensor &out, std::mutex &mut) const { const EquivalenceSet &eset = ebundle.get(out.dimen()); UFSTensor dummy_u(0, numStacks(), g.dimen()); @@ -351,7 +351,7 @@ FoldedStackContainer::multAndAddStacks(const IntSequence &coor, KronProdStack kp(sp, coor); FPSTensor fps(out.getDims(), it, sort_per, g, kp); { - sthread::synchro syn(ad, "multAndAddStacks"); + std::unique_lock lk{mut}; fps.addTo(out); } } @@ -404,12 +404,12 @@ UnfoldedStackContainer::multAndAdd(int dim, const UGSContainer &c, } void -WorkerUnfoldMAADense::operator()() +WorkerUnfoldMAADense::operator()(std::mutex &mut) { Permutation iden(dense_cont.num()); IntSequence coor(sym, iden.getMap()); const UGSTensor *g = dense_cont.get(sym); - cont.multAndAddStacks(coor, *g, out, &out); + cont.multAndAddStacks(coor, *g, out, mut); } WorkerUnfoldMAADense::WorkerUnfoldMAADense(const UnfoldedStackContainer &container, @@ -476,7 +476,7 @@ UnfoldedStackContainer::multAndAddSparse1(const FSSparseTensor &t, todo: vertically narrow slice and out according to the fill in t. */ void -WorkerUnfoldMAASparse1::operator()() +WorkerUnfoldMAASparse1::operator()(std::mutex &mut) { const EquivalenceSet &eset = ebundle.get(out.dimen()); const PermutationSet &pset = tls.pbundle->get(t.dimen()); @@ -503,7 +503,7 @@ WorkerUnfoldMAASparse1::operator()() { UPSTensor ups(out.getDims(), it, slice, kp); { - sthread::synchro syn(&out, "WorkerUnfoldMAASparse1"); + std::unique_lock lk{mut}; ups.addTo(out); } } @@ -561,7 +561,7 @@ UnfoldedStackContainer::multAndAddSparse2(const FSSparseTensor &t, |@<|WorkerFoldMAASparse2::operator()()| code@>|. */ void -WorkerUnfoldMAASparse2::operator()() +WorkerUnfoldMAASparse2::operator()(std::mutex &mut) { GSSparseTensor slice(t, cont.getStackSizes(), coor, TensorDimens(cont.getStackSizes(), coor)); @@ -574,7 +574,7 @@ WorkerUnfoldMAASparse2::operator()() UGSTensor dense_slice1(r1, r2-r1+1, dense_slice); UGSTensor out1(r1, r2-r1+1, out); - cont.multAndAddStacks(coor, dense_slice1, out1, &out); + cont.multAndAddStacks(coor, dense_slice1, out1, mut); } } @@ -604,7 +604,7 @@ WorkerUnfoldMAASparse2::WorkerUnfoldMAASparse2(const UnfoldedStackContainer &con void UnfoldedStackContainer::multAndAddStacks(const IntSequence &fi, const UGSTensor &g, - UGSTensor &out, const void *ad) const + UGSTensor &out, std::mutex &mut) const { const EquivalenceSet &eset = ebundle.get(out.dimen()); @@ -629,7 +629,7 @@ UnfoldedStackContainer::multAndAddStacks(const IntSequence &fi, kp.optimizeOrder(); UPSTensor ups(out.getDims(), it, sort_per, g, kp); { - sthread::synchro syn(ad, "multAndAddStacks"); + std::unique_lock lk{mut}; ups.addTo(out); } } diff --git a/dynare++/tl/cc/stack_container.hh b/dynare++/tl/cc/stack_container.hh index 218623bb6..37606a082 100644 --- a/dynare++/tl/cc/stack_container.hh +++ b/dynare++/tl/cc/stack_container.hh @@ -295,9 +295,9 @@ protected: void multAndAddSparse3(const FSSparseTensor &t, FGSTensor &out) const; void multAndAddSparse4(const FSSparseTensor &t, FGSTensor &out) const; void multAndAddStacks(const IntSequence &fi, const FGSTensor &g, - FGSTensor &out, const void *ad) const; + FGSTensor &out, std::mutex &mut) const; void multAndAddStacks(const IntSequence &fi, const GSSparseTensor &g, - FGSTensor &out, const void *ad) const; + FGSTensor &out, std::mutex &mut) const; }; class WorkerUnfoldMAADense; @@ -323,7 +323,7 @@ protected: void multAndAddSparse1(const FSSparseTensor &t, UGSTensor &out) const; void multAndAddSparse2(const FSSparseTensor &t, UGSTensor &out) const; void multAndAddStacks(const IntSequence &fi, const UGSTensor &g, - UGSTensor &out, const void *ad) const; + UGSTensor &out, std::mutex &mut) const; }; /* Here is the specialization of the |StackContainer|. We implement @@ -656,7 +656,7 @@ public: const Symmetry &s, const FGSContainer &dcontainer, FGSTensor &outten); - void operator()() override; + void operator()(std::mutex &mut) override; }; class WorkerFoldMAASparse1 : public sthread::detach_thread @@ -670,7 +670,7 @@ public: WorkerFoldMAASparse1(const FoldedStackContainer &container, const FSSparseTensor &ten, FGSTensor &outten, const IntSequence &c); - void operator()() override; + void operator()(std::mutex &mut) override; }; class WorkerFoldMAASparse2 : public sthread::detach_thread @@ -683,7 +683,7 @@ public: WorkerFoldMAASparse2(const FoldedStackContainer &container, const FSSparseTensor &ten, FGSTensor &outten, const IntSequence &c); - void operator()() override; + void operator()(std::mutex &mut) override; }; class WorkerFoldMAASparse4 : public sthread::detach_thread @@ -696,7 +696,7 @@ public: WorkerFoldMAASparse4(const FoldedStackContainer &container, const FSSparseTensor &ten, FGSTensor &outten, const IntSequence &c); - void operator()() override; + void operator()(std::mutex &mut) override; }; class WorkerUnfoldMAADense : public sthread::detach_thread @@ -710,7 +710,7 @@ public: const Symmetry &s, const UGSContainer &dcontainer, UGSTensor &outten); - void operator()() override; + void operator()(std::mutex &mut) override; }; class WorkerUnfoldMAASparse1 : public sthread::detach_thread @@ -724,7 +724,7 @@ public: WorkerUnfoldMAASparse1(const UnfoldedStackContainer &container, const FSSparseTensor &ten, UGSTensor &outten, const IntSequence &c); - void operator()() override; + void operator()(std::mutex &mut) override; }; class WorkerUnfoldMAASparse2 : public sthread::detach_thread @@ -737,7 +737,7 @@ public: WorkerUnfoldMAASparse2(const UnfoldedStackContainer &container, const FSSparseTensor &ten, UGSTensor &outten, const IntSequence &c); - void operator()() override; + void operator()(std::mutex &mut) override; }; #endif diff --git a/dynare++/tl/cc/sthread.cc b/dynare++/tl/cc/sthread.cc index c3db848a0..5c5c7963d 100644 --- a/dynare++/tl/cc/sthread.cc +++ b/dynare++/tl/cc/sthread.cc @@ -9,37 +9,6 @@ namespace sthread uniprocessor machine with hyper-threading */ int detach_thread_group::max_parallel_threads = 2; - /* The constructor acquires the mutex in the map. First it tries to - get an exclusive access to the map. Then it increases a number of - references of the mutex (if it does not exists, it inserts it). Then - unlocks the map, and finally tries to lock the mutex of the map. */ - synchro::synchro(const void *c, std::string id) - : caller{c}, iden{std::move(id)} - { - mutmap.lock_map(); - if (!mutmap.check(caller, iden)) - mutmap.insert(caller, iden); - mutmap.get(caller, iden).second++; - mutmap.unlock_map(); - mutmap.get(caller, iden).first.lock(); - } - - /* The destructor first locks the map. Then releases the lock, - and decreases a number of references. If it is zero, it removes the - mutex. */ - synchro::~synchro() - { - mutmap.lock_map(); - if (mutmap.check(caller, iden)) - { - mutmap.get(caller, iden).first.unlock(); - mutmap.get(caller, iden).second--; - if (mutmap.get(caller, iden).second == 0) - mutmap.remove(caller, iden); - } - mutmap.unlock_map(); - } - /* We cycle through all threads in the group, and in each cycle we wait for the change in the |counter|. If the counter indicates less than maximum parallel threads running, then a new thread is run, and the @@ -49,15 +18,15 @@ namespace sthread void detach_thread_group::run() { - std::unique_lock lk{m}; + std::unique_lock lk{mut_cv}; auto it = tlist.begin(); while (it != tlist.end()) { counter++; std::thread th{[&, it] { // The "it" variable is captured by value, because otherwise the iterator may move - (*it)->operator()(); - std::unique_lock lk2{m}; + (*it)->operator()(mut_threads); + std::unique_lock lk2{mut_cv}; counter--; std::notify_all_at_thread_exit(cv, std::move(lk2)); }}; diff --git a/dynare++/tl/cc/sthread.hh b/dynare++/tl/cc/sthread.hh index bf979e7ec..84608ccd9 100644 --- a/dynare++/tl/cc/sthread.hh +++ b/dynare++/tl/cc/sthread.hh @@ -15,15 +15,8 @@ are not joined, they are synchronized by means of a counter counting running threads. A change of the counter is checked by waiting on an associated condition. The number of maximum parallel - threads can be controlled. See below. - \li |synchro| object locks a piece of code to be executed only serially - for a given data and specified entry-point. It locks the code until it - is destructed. So, the typical use is to create the |synchro| object - on the stack of a function which is to be synchronized. The - synchronization can be subjected to specific data (then a pointer can - be passed to |synchro|'s constructor), and can be subjected to - specific entry-point (then |std::string| is passed to the - constructor). + threads can be controlled. See below. The group also provides a mutex to be + shared between the workers for their own synchronization purposes. \endunorderedlist The number of maximum parallel threads is controlled via a static @@ -42,104 +35,6 @@ namespace sthread { - using mmkey = std::pair; - - /* Here we define a map of mutexes keyed by a pair of address, and a - string. A purpose of the map of mutexes is that, if synchronizing, we - need to publish mutexes locking some piece of codes (characterized by - the string) accessing the data (characterized by the pointer). So, if - any thread needs to pass a |synchro| object, it creates its own with - the same address and string, and must look to some public storage to - unlock the mutex. If the |synchro| object is created for the first - time, the mutex is created and inserted to the map. We count the - references to the mutex (number of waiting threads) to know, when it - is save to remove the mutex from the map. This is the only purpose of - counting the references. Recall, that the mutex is keyed by an address - of the data, and without removing, the number of mutexes would only - grow. - - The map itself needs its own mutex to avoid concurrent insertions and - deletions. */ - - struct ltmmkey - { - bool - operator()(const mmkey &k1, const mmkey &k2) const - { - return k1.first < k2.first - || (k1.first == k2.first && k1.second < k2.second); - } - }; - - using mutex_int_map = std::map, ltmmkey>; - - class mutex_map : public mutex_int_map - { - using mmval = std::pair; - std::mutex m; - public: - mutex_map() = default; - void - insert(const void *c, std::string id) - { - // We cannot use emplace(), because std::mutex is neither copyable nor moveable - operator[](mmkey{c, std::move(id)}).second = 0; - } - bool - check(const void *c, std::string id) const - { - return find(mmkey{c, std::move(id)}) != end(); - } - /* This returns the pair of mutex and count reference number. */ - mmval & - get(const void *c, std::string id) - { - return operator[](mmkey{c, std::move(id)}); - } - - /* This removes unconditionally the mutex from the map regardless its - number of references. The only user of this class should be |synchro| - class, it implementation must not remove referenced mutex. */ - void - remove(const void *c, std::string id) - { - auto it = find(mmkey{c, std::string{id}}); - if (it != end()) - erase(it); - } - void - lock_map() - { - m.lock(); - } - void - unlock_map() - { - m.unlock(); - } - - }; - - - // The global map used by the synchro class - static mutex_map mutmap; - - /* This is the |synchro| class. The constructor of this class tries to - lock a mutex for a particular address (identification of data) and - string (identification of entry-point). If the mutex is already - locked, it waits until it is unlocked and then returns. The destructor - releases the lock. The typical use is to construct the object on the - stacked of the code being synchronized. */ - class synchro - { - private: - const void *caller; - const std::string iden; - public: - synchro(const void *c, std::string id); - ~synchro(); - }; - /* The detached thread is the same as joinable |thread|. We only re-implement |run| method to call |thread_traits::detach_run|, and add a method which installs a counter. The counter is increased and @@ -149,7 +44,7 @@ namespace sthread { public: virtual ~detach_thread() = default; - virtual void operator()() = 0; + virtual void operator()(std::mutex &mut) = 0; }; /* The detach thread group is (by interface) the same as @@ -159,9 +54,10 @@ namespace sthread class detach_thread_group { std::vector> tlist; - std::mutex m; // For the condition variable and the counter + std::mutex mut_cv; // For the condition variable and the counter std::condition_variable cv; int counter{0}; + std::mutex mut_threads; // Passed to the workers and shared between them public: static int max_parallel_threads;