1 /* $Id: linkgraphschedule.cpp 26347 2014-02-16 18:42:59Z fonsinchen $ */
4 * This file is part of OpenTTD.
5 * OpenTTD is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, version 2.
6 * OpenTTD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
7 * See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with OpenTTD. If not, see <http://www.gnu.org/licenses/>.
10 /** @file linkgraphschedule.cpp Definition of link graph schedule used for cargo distribution. */
12 #include "../stdafx.h"
13 #include "linkgraphschedule.h"
17 #include "flowmapper.h"
18 #include "../command_func.h"
21 #include "../safeguards.h"
24 * Static instance of LinkGraphSchedule.
25 * Note: This instance is created on task start.
26 * Lazy creation on first usage results in a data race between the CDist threads.
28 /* static */ LinkGraphSchedule
LinkGraphSchedule::instance
;
31 * Start the next job(s) in the schedule.
33 * The cost estimate of a link graph job is C ~ N^2 log N, where
34 * N is the number of nodes in the job link graph.
35 * The cost estimate is summed for all running and scheduled jobs to form the total cost estimate T = sum C.
36 * The nominal cycle time (in recalc intervals) required to schedule all jobs is calculated as S = 1 + log_2 T.
37 * Hence the nominal duration of an individual job (in recalc intervals) is D = ceil(S * C / T)
38 * The cost budget for an individual call to this method is given by T / S.
40 * The purpose of this algorithm is so that overall responsiveness is not hindered by large numbers of small/cheap
41 * jobs which would previously need to be cycled through individually, but equally large/slow jobs have an extended
42 * duration in which to execute, to avoid unnecessary pauses.
44 void LinkGraphSchedule::SpawnNext()
46 if (this->schedule
.empty()) return;
48 GraphList schedule_to_back
;
49 uint64 total_cost
= 0;
50 for (auto iter
= this->schedule
.begin(); iter
!= this->schedule
.end();) {
53 const LinkGraph
*lg
= *current
;
56 schedule_to_back
.splice(schedule_to_back
.end(), this->schedule
, current
);
58 total_cost
+= lg
->CalculateCostEstimate();
61 for (auto &it
: this->running
) {
62 total_cost
+= it
->Graph().CalculateCostEstimate();
64 uint scaling
= 1 + FindLastBit(total_cost
);
65 uint64 cost_budget
= total_cost
/ scaling
;
66 uint64 used_budget
= 0;
67 std::vector
<LinkGraphJobGroup::JobInfo
> jobs_to_execute
;
68 while (used_budget
< cost_budget
&& !this->schedule
.empty()) {
69 LinkGraph
*lg
= this->schedule
.front();
70 assert(lg
== LinkGraph::Get(lg
->index
));
71 this->schedule
.pop_front();
72 uint64 cost
= lg
->CalculateCostEstimate();
74 if (LinkGraphJob::CanAllocateItem()) {
75 uint duration_multiplier
= CeilDivT
<uint64_t>(scaling
* cost
, total_cost
);
76 std::unique_ptr
<LinkGraphJob
> job(new LinkGraphJob(*lg
, duration_multiplier
));
77 jobs_to_execute
.emplace_back(job
.get(), cost
);
78 if (this->running
.empty() || job
->JoinDate() >= this->running
.back()->JoinDate()) {
79 this->running
.push_back(std::move(job
));
81 // find right place to insert
82 auto iter
= std::upper_bound(this->running
.begin(), this->running
.end(), job
->JoinDate(), [](Date a
, const std::unique_ptr
<LinkGraphJob
> &b
) {
83 return a
< b
->JoinDate();
85 this->running
.insert(iter
, std::move(job
));
92 this->schedule
.splice(this->schedule
.end(), schedule_to_back
);
94 LinkGraphJobGroup::ExecuteJobSet(std::move(jobs_to_execute
));
98 * Join the next finished job, if available.
100 bool LinkGraphSchedule::IsJoinWithUnfinishedJobDue() const
102 for (JobList::const_iterator it
= this->running
.begin(); it
!= this->running
.end(); ++it
) {
103 if (!((*it
)->IsFinished())) {
104 /* job is not due to be joined yet */
107 if (!((*it
)->IsJobCompleted())) {
108 /* job is due to be joined, but is not completed */
116 * Join the next finished job, if available.
118 void LinkGraphSchedule::JoinNext()
120 while (!(this->running
.empty())) {
121 if (!this->running
.front()->IsFinished()) return;
122 std::unique_ptr
<LinkGraphJob
> next
= std::move(this->running
.front());
123 this->running
.pop_front();
124 LinkGraphID id
= next
->LinkGraphIndex();
125 next
->FinaliseJob(); // joins the thread and finalises the job
126 assert(!next
->IsJobAborted());
128 if (LinkGraph::IsValidID(id
)) {
129 LinkGraph
*lg
= LinkGraph::Get(id
);
130 this->Unqueue(lg
); // Unqueue to avoid double-queueing recycled IDs.
137 * Run all handlers for the given Job. This method is tailored to
139 * @param j Pointer to a link graph job.
141 /* static */ void LinkGraphSchedule::Run(void *j
)
143 LinkGraphJob
*job
= (LinkGraphJob
*)j
;
144 for (uint i
= 0; i
< lengthof(instance
.handlers
); ++i
) {
145 if (job
->IsJobAborted()) return;
146 instance
.handlers
[i
]->Run(*job
);
150 * Note that this it not guaranteed to be an atomic write and there are no memory barriers or other protections.
151 * Readers of this variable in another thread may see an out of date value.
152 * However this is OK as this will only happen just as a job is completing, and the real synchronisation is provided
153 * by the thread join operation. In the worst case the main thread will be paused for longer than strictly necessary before
155 * This is just a hint variable to avoid performing the join excessively early and blocking the main thread.
158 #if defined(__GNUC__) && (__cplusplus >= 201103L || defined(__STDCXX_VERSION__) || defined(__GXX_EXPERIMENTAL_CXX0X__) || defined(__GXX_EXPERIMENTAL_CPP0X__))
159 __atomic_store_n(&(job
->job_completed
), true, __ATOMIC_RELAXED
);
161 job
->job_completed
= true;
166 * Start all threads in the running list. This is only useful for save/load.
167 * Usually threads are started when the job is created.
169 void LinkGraphSchedule::SpawnAll()
171 std::vector
<LinkGraphJobGroup::JobInfo
> jobs_to_execute
;
172 for (JobList::iterator i
= this->running
.begin(); i
!= this->running
.end(); ++i
) {
173 jobs_to_execute
.emplace_back(i
->get());
175 LinkGraphJobGroup::ExecuteJobSet(std::move(jobs_to_execute
));
179 * Clear all link graphs and jobs from the schedule.
181 /* static */ void LinkGraphSchedule::Clear()
183 for (JobList::iterator
i(instance
.running
.begin()); i
!= instance
.running
.end(); ++i
) {
186 instance
.running
.clear();
187 instance
.schedule
.clear();
191 * Shift all dates (join dates and edge annotations) of link graphs and link
192 * graph jobs by the number of days given.
193 * @param interval Number of days to be added or subtracted.
195 void LinkGraphSchedule::ShiftDates(int interval
)
198 FOR_ALL_LINK_GRAPHS(lg
) lg
->ShiftDates(interval
);
200 FOR_ALL_LINK_GRAPH_JOBS(lgj
) lgj
->ShiftJoinDate(interval
);
204 * Create a link graph schedule and initialize its handlers.
206 LinkGraphSchedule::LinkGraphSchedule()
208 this->handlers
[0].reset(new InitHandler
);
209 this->handlers
[1].reset(new DemandHandler
);
210 this->handlers
[2].reset(new MCFHandler
<MCF1stPass
>);
211 this->handlers
[3].reset(new FlowMapper(false));
212 this->handlers
[4].reset(new MCFHandler
<MCF2ndPass
>);
213 this->handlers
[5].reset(new FlowMapper(true));
217 * Delete a link graph schedule and its handlers.
219 LinkGraphSchedule::~LinkGraphSchedule()
224 LinkGraphJobGroup::LinkGraphJobGroup(constructor_token token
, std::vector
<LinkGraphJob
*> jobs
) :
225 jobs(std::move(jobs
)) { }
227 void LinkGraphJobGroup::SpawnThread() {
228 ThreadObject
*t
= nullptr;
231 * Spawn a thread if possible and run the link graph job in the thread. If
232 * that's not possible run the job right now in the current thread.
234 if (ThreadObject::New(&(LinkGraphJobGroup::Run
), this, &t
, "ottd:linkgraph")) {
235 this->thread
.reset(t
);
236 for (auto &it
: this->jobs
) {
237 it
->SetJobGroup(this->shared_from_this());
240 this->thread
.reset();
241 /* Of course this will hang a bit.
242 * On the other hand, if you want to play games which make this hang noticably
243 * on a platform without threads then you'll probably get other problems first.
245 * If someone comes and tells me that this hangs for him/her, I'll implement a
246 * smaller grained "Step" method for all handlers and add some more ticks where
247 * "Step" is called. No problem in principle. */
248 LinkGraphJobGroup::Run(this);
252 void LinkGraphJobGroup::JoinThread() {
253 if (!this->thread
|| this->joined_thread
) return;
254 this->thread
->Join();
255 this->joined_thread
= true;
259 * Run all jobs for the given LinkGraphJobGroup. This method is tailored to
261 * @param j Pointer to a LinkGraphJobGroup.
263 /* static */ void LinkGraphJobGroup::Run(void *group
)
265 LinkGraphJobGroup
*job_group
= (LinkGraphJobGroup
*)group
;
266 for (LinkGraphJob
*job
: job_group
->jobs
) {
267 LinkGraphSchedule::Run(job
);
271 /* static */ void LinkGraphJobGroup::ExecuteJobSet(std::vector
<JobInfo
> jobs
) {
272 const uint thread_budget
= 200000;
274 std::sort(jobs
.begin(), jobs
.end(), [](const JobInfo
&a
, const JobInfo
&b
) {
275 return a
.cost_estimate
< b
.cost_estimate
;
278 std::vector
<LinkGraphJob
*> bucket
;
279 uint bucket_cost
= 0;
280 auto flush_bucket
= [&]() {
281 if (!bucket_cost
) return;
282 auto group
= std::make_shared
<LinkGraphJobGroup
>(constructor_token(), std::move(bucket
));
283 group
->SpawnThread();
288 for (JobInfo
&it
: jobs
) {
289 if (bucket_cost
&& (bucket_cost
+ it
.cost_estimate
> thread_budget
)) flush_bucket();
290 bucket
.push_back(it
.job
);
291 bucket_cost
+= it
.cost_estimate
;
296 LinkGraphJobGroup::JobInfo::JobInfo(LinkGraphJob
*job
) :
297 job(job
), cost_estimate(job
->Graph().CalculateCostEstimate()) { }
301 * Pause the game if on the next _date_fract tick, we would do a join with the next
302 * link graph job, but it is still running.
303 * If we previous paused, unpause if the job is now ready to be joined with
305 void StateGameLoop_LinkGraphPauseControl()
307 if (_pause_mode
& PM_PAUSED_LINK_GRAPH
) {
308 /* We are paused waiting on a job, check the job every tick */
309 if (!LinkGraphSchedule::instance
.IsJoinWithUnfinishedJobDue()) {
310 DoCommandP(0, PM_PAUSED_LINK_GRAPH
, 0, CMD_PAUSE
);
312 } else if (_pause_mode
== PM_UNPAUSED
&& _date_fract
== LinkGraphSchedule::SPAWN_JOIN_TICK
- 1) {
313 if (_date
% _settings_game
.linkgraph
.recalc_interval
== _settings_game
.linkgraph
.recalc_interval
/ 2) {
314 /* perform check one _date_fract tick before we would join */
315 if (LinkGraphSchedule::instance
.IsJoinWithUnfinishedJobDue()) {
316 DoCommandP(0, PM_PAUSED_LINK_GRAPH
, 1, CMD_PAUSE
);
323 * Spawn or join a link graph job or compress a link graph if any link graph is
326 void OnTick_LinkGraph()
328 if (_settings_game
.economy
.daylength
== 1)
330 if (_date_fract
!= LinkGraphSchedule::SPAWN_JOIN_TICK
) return;
331 Date offset
= _date
% _settings_game
.linkgraph
.recalc_interval
;
333 LinkGraphSchedule::instance
.SpawnNext();
334 } else if (offset
== _settings_game
.linkgraph
.recalc_interval
/ 2) {
335 LinkGraphSchedule::instance
.JoinNext();
340 uint16 interval_in_ticks
= max
<int>(2, (_settings_game
.linkgraph
.recalc_interval
* DEFAULT_DAY_TICKS
));
341 Date offset
= _date_fract
% interval_in_ticks
;
344 LinkGraphSchedule::instance
.SpawnNext();
345 } else if (offset
== interval_in_ticks
/ 2) {
346 LinkGraphSchedule::instance
.JoinNext();