work_fetch.cpp 35.8 KB
Newer Older
1
// This file is part of BOINC.
David Anderson's avatar
David Anderson committed
2
// http://boinc.berkeley.edu
3
// Copyright (C) 2008 University of California
David Anderson's avatar
David Anderson committed
4
//
5
6
7
8
// BOINC is free software; you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License
// as published by the Free Software Foundation,
// either version 3 of the License, or (at your option) any later version.
David Anderson's avatar
David Anderson committed
9
//
10
// BOINC is distributed in the hope that it will be useful,
David Anderson's avatar
David Anderson committed
11
12
13
14
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU Lesser General Public License for more details.
//
15
16
// You should have received a copy of the GNU Lesser General Public License
// along with BOINC.  If not, see <http://www.gnu.org/licenses/>.
David Anderson's avatar
David Anderson committed
17

18
19
20
21
22
23
24
25
#include "cpp.h"

#ifdef _WIN32
#include "boinc_win.h"
#else
#include "config.h"
#endif

26
27
#include "util.h"

28
#include "client_msgs.h"
29
30
31
#include "client_state.h"
#include "project.h"
#include "result.h"
32
#include "scheduler_op.h"
33

34
35
#include "work_fetch.h"

36
37
38
#if 0
#define DEBUG(x) x
#else
39
#define DEBUG(X)
40
#endif
41

David Anderson's avatar
David Anderson committed
42
using std::vector;
43

44
RSC_WORK_FETCH rsc_work_fetch[MAX_RSC];
45
46
WORK_FETCH work_fetch;

47
static inline bool dont_fetch(PROJECT* p, int rsc_type) {
48
    if (p->no_rsc_pref[rsc_type]) return true;
49
    if (p->no_rsc_config[rsc_type]) return true;
50
    if (p->no_rsc_apps[rsc_type]) return true;
51
    if (p->no_rsc_ams[rsc_type]) return true;
52
53
54
    return false;
}

55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// if the configuration file disallows the use of a GPU type
// for a project, set a flag to that effect
//
void set_no_rsc_config() {
    for (unsigned int i=0; i<gstate.projects.size(); i++) {
        PROJECT& p = *gstate.projects[i];
        for (int j=1; j<coprocs.n_rsc; j++) {
            bool allowed[MAX_COPROC_INSTANCES];
            memset(allowed, 0, sizeof(allowed));
            COPROC& c = coprocs.coprocs[j];
            for (int k=0; k<c.count; k++) {
                allowed[c.device_nums[k]] = true;
            }
            for (unsigned int k=0; k<config.exclude_gpus.size(); k++) {
                EXCLUDE_GPU& e = config.exclude_gpus[k];
                if (strcmp(e.url.c_str(), p.master_url)) continue;
71
                if (!e.type.empty() && strcmp(e.type.c_str(), c.type)) continue;
72
                if (!e.appname.empty()) continue;
73
                if (e.device_num < 0) {
74
75
76
                    memset(allowed, 0, sizeof(allowed));
                    break;
                }
77
                allowed[e.device_num] = false;
78
79
80
            }
            p.no_rsc_config[j] = true;
            for (int k=0; k<c.count; k++) {
81
                if (allowed[c.device_nums[k]]) {
82
83
84
85
86
87
88
89
                    p.no_rsc_config[j] = false;
                    break;
                }
            }
        }
    }
}

90
// does the (NCI) project have a job that's running or uploading?
91
// (don't request another job from NCI project if so)
92
//
93
static bool has_a_job_in_progress(PROJECT* p) {
94
95
96
    for (unsigned int j=0; j<gstate.results.size(); j++) {
        RESULT* rp = gstate.results[j];
        if (rp->project != p) continue;
97
        if (rp->state() < RESULT_FILES_UPLOADED) {
98
99
100
101
102
103
104
105
106
107
108
            return true;
        }
    }
    return false;
}

inline bool has_coproc_app(PROJECT* p, int rsc_type) {
    unsigned int i;
    for (i=0; i<gstate.app_versions.size(); i++) {
        APP_VERSION* avp = gstate.app_versions[i];
        if (avp->project != p) continue;
109
        if (avp->gpu_usage.rsc_type == rsc_type) return true;
110
111
    }
    return false;
112
113
}

114
115
///////////////  RSC_PROJECT_WORK_FETCH  ///////////////

116
bool RSC_PROJECT_WORK_FETCH::compute_may_have_work(PROJECT* p, int rsc_type) {
117
    if (dont_fetch(p, rsc_type)) return false;
118
    if (p->rsc_defer_sched[rsc_type]) return false;
119
120
121
    return (backoff_time < gstate.now);
}

122
123
void RSC_PROJECT_WORK_FETCH::rr_init(PROJECT* p, int rsc_type) {
    may_have_work = compute_may_have_work(p, rsc_type);
David Anderson's avatar
David Anderson committed
124
    fetchable_share = 0;
125
    n_runnable_jobs = 0;
David Anderson's avatar
David Anderson committed
126
    sim_nused = 0;
127
    nused_total = 0;
128
    deadlines_missed = 0;
129
130
}

131
void RSC_PROJECT_WORK_FETCH::resource_backoff(PROJECT* p, const char* name) {
132
133
    if (backoff_interval) {
        backoff_interval *= 2;
134
        if (backoff_interval > WF_MAX_BACKOFF_INTERVAL) backoff_interval = WF_MAX_BACKOFF_INTERVAL;
135
    } else {
136
        backoff_interval = WF_MIN_BACKOFF_INTERVAL;
137
    }
138
    double x = (.5 + drand())*backoff_interval;
139
140
141
    backoff_time = gstate.now + x;
    if (log_flags.work_fetch_debug) {
        msg_printf(p, MSG_INFO,
142
            "[work_fetch] backing off %s %.0f sec", name, x
143
144
145
146
147
148
149
        );
    }
}

///////////////  RSC_WORK_FETCH  ///////////////

RSC_PROJECT_WORK_FETCH& RSC_WORK_FETCH::project_state(PROJECT* p) {
150
    return p->rsc_pwf[rsc_type];
151
152
}

153
154
155
void RSC_WORK_FETCH::rr_init() {
    shortfall = 0;
    nidle_now = 0;
David Anderson's avatar
David Anderson committed
156
    sim_nused = 0;
157
    total_fetchable_share = 0;
158
    deadline_missed_instances = 0;
159
    saturated_time = 0;
160
    busy_time_estimator.reset();
161
    sim_used_instances = 0;
162
163
}

164
void RSC_WORK_FETCH::update_stats(double sim_now, double dt, double buf_end) {
David Anderson's avatar
David Anderson committed
165
    double idle = ninstances - sim_nused;
166
167
168
169
170
171
172
173
    if (idle > 1e-6 && sim_now < buf_end) {
        double dt2;
        if (sim_now + dt > buf_end) {
            dt2 = buf_end - sim_now;
        } else {
            dt2 = dt;
        }
        shortfall += idle*dt2;
174
    }
175
    if (idle < 1e-6) {
176
        saturated_time = sim_now + dt - gstate.now;
David Anderson's avatar
David Anderson committed
177
    }
178
179
180
}

void RSC_WORK_FETCH::update_busy_time(double dur, double nused) {
181
    busy_time_estimator.update(dur, nused);
David Anderson's avatar
David Anderson committed
182
183
}

184
static bool wacky_dcf(PROJECT* p) {
David Anderson's avatar
David Anderson committed
185
    if (p->dont_use_dcf) return false;
186
187
188
189
    double dcf = p->duration_correction_factor;
    return (dcf < 0.02 || dcf > 80.0);
}

190
// request this project's share of shortfall and instances.
191
// don't request anything if project is backed off.
192
//
193
void RSC_WORK_FETCH::set_request(PROJECT* p) {
194

195
196
197
198
199
200
201
    // if backup project, fetch 1 job per idle instance
    //
    if (p->resource_share == 0) {
        req_instances = nidle_now;
        req_secs = 1;
        return;
    }
202
203
204
205
206
    if (config.fetch_minimal_work) {
        req_instances = ninstances;
        req_secs = 1;
        return;
    }
207
    RSC_PROJECT_WORK_FETCH& w = project_state(p);
208
    double non_excl_inst = ninstances - w.ncoprocs_excluded;
209
    if (shortfall) {
210
        if (wacky_dcf(p)) {
211
212
213
214
215
            // if project's DCF is too big or small,
            // its completion time estimates are useless; just ask for 1 second
            //
            req_secs = 1;
        } else {
216
            req_secs = shortfall;
217
218
219
            if (w.ncoprocs_excluded) {
                req_secs *= non_excl_inst/ninstances;
            }
220
        }
221
222
    }

223
224
225
226
227
228
    double instance_share = ninstances*w.fetchable_share;
    if (instance_share > non_excl_inst) {
        instance_share = non_excl_inst;
    }
    instance_share -= w.nused_total;
    req_instances = std::max(nidle_now, instance_share);
229

230
    if (log_flags.work_fetch_debug) {
231
232
233
234
        msg_printf(p, MSG_INFO,
            "[work_fetch] set_request() for %s: ninst %d nused_total %f nidle_now %f fetch share %f req_inst %f req_secs %f",
            rsc_name(rsc_type), ninstances, w.nused_total, nidle_now,
            w.fetchable_share, req_instances, req_secs
235
236
        );
    }
237
238
239
    if (req_instances && !req_secs) {
        req_secs = 1;
    }
240
241
}

242
243
244
245
246
247
248
249
250
251
252
// We're fetching work because some instances are starved because
// of exclusions.
// See how many N of these instances are not excluded for this project.
// Ask for N instances and for N*work_buf_min seconds.
//
void RSC_WORK_FETCH::set_request_excluded(PROJECT* p) {
    RSC_PROJECT_WORK_FETCH& pwf = project_state(p);

    int inst_mask = sim_excluded_instances & pwf.non_excluded_instances;
    int n = 0;
    for (int i=0; i<ninstances; i++) {
253
        if ((1<<i) & inst_mask) {
254
255
256
            n++;
        }
    }
257
    DEBUG(msg_printf(p, MSG_INFO, "set_request_excluded() %d %d %d", sim_excluded_instances, pwf.non_excluded_instances, n));
258
259
260
261
262
263
264
265
    req_instances = n;
    if (p->resource_share == 0 || config.fetch_minimal_work) {
        req_secs = 1;
    } else {
        req_secs = n*gstate.work_buf_total();
    }
}

266
void RSC_WORK_FETCH::print_state(const char* name) {
267
    msg_printf(0, MSG_INFO, "[work_fetch] --- state for %s ---", name);
268
    msg_printf(0, MSG_INFO,
269
        "[work_fetch] shortfall %.2f nidle %.2f saturated %.2f busy %.2f",
270
271
        shortfall, nidle_now, saturated_time,
        busy_time_estimator.get_busy_time()
272
    );
273
    //msg_printf(0, MSG_INFO, "[work_fetch] sim used inst %d sim excl inst %d", sim_used_instances, sim_excluded_instances);
274
    for (unsigned int i=0; i<gstate.projects.size(); i++) {
275
        char buf[256];
276
        PROJECT* p = gstate.projects[i];
David Anderson's avatar
David Anderson committed
277
        if (p->non_cpu_intensive) continue;
278
        RSC_PROJECT_WORK_FETCH& pwf = project_state(p);
279
        bool no_rsc_pref = p->no_rsc_pref[rsc_type];
280
        bool no_rsc_config = p->no_rsc_config[rsc_type];
281
282
        bool no_rsc_apps = p->no_rsc_apps[rsc_type];
        bool no_rsc_ams = p->no_rsc_ams[rsc_type];
283
        double bt = pwf.backoff_time>gstate.now?pwf.backoff_time-gstate.now:0;
284
        if (bt) {
285
286
287
            sprintf(buf, " (resource backoff: %.2f, inc %.2f)",
                bt, pwf.backoff_interval
            );
288
289
290
        } else {
            strcpy(buf, "");
        }
291
        msg_printf(p, MSG_INFO,
292
293
294
            "[work_fetch] fetch share %.3f%s%s%s%s%s",
            pwf.fetchable_share,
            buf,
295
296
            no_rsc_pref?" (blocked by prefs)":"",
            no_rsc_apps?" (no apps)":"",
297
298
            no_rsc_ams?" (blocked by account manager)":"",
            no_rsc_config?" (blocked by configuration file)":""
299
        );
300
    }
David Anderson's avatar
David Anderson committed
301
302
}

303
304
305
306
307
308
309
void RSC_WORK_FETCH::clear_request() {
    req_secs = 0;
    req_instances = 0;
}

///////////////  PROJECT_WORK_FETCH  ///////////////

David Anderson's avatar
David Anderson committed
310
int PROJECT_WORK_FETCH::compute_cant_fetch_work_reason(PROJECT* p) {
311
312
313
314
315
316
317
    if (p->non_cpu_intensive) return CANT_FETCH_WORK_NON_CPU_INTENSIVE;
    if (p->suspended_via_gui) return CANT_FETCH_WORK_SUSPENDED_VIA_GUI;
    if (p->master_url_fetch_pending) return CANT_FETCH_WORK_MASTER_URL_FETCH_PENDING;
    if (p->dont_request_more_work) return CANT_FETCH_WORK_DONT_REQUEST_MORE_WORK;
    if (p->some_download_stalled()) return CANT_FETCH_WORK_DOWNLOAD_STALLED;
    if (p->some_result_suspended()) return CANT_FETCH_WORK_RESULT_SUSPENDED;
    if (p->too_many_uploading_results) return CANT_FETCH_WORK_TOO_MANY_UPLOADS;
318
319
320
321

    // this goes last
    //
    if (p->min_rpc_time > gstate.now) return CANT_FETCH_WORK_MIN_RPC_TIME;
David Anderson's avatar
David Anderson committed
322
    return 0;
323
324
325
}

void PROJECT_WORK_FETCH::reset(PROJECT* p) {
326
327
328
    for (int i=0; i<coprocs.n_rsc; i++) {
        p->rsc_pwf[i].reset();
    }
329
330
331
332
}

///////////////  WORK_FETCH  ///////////////

333
334
335
336
337
338
339
340
341
// mark the projects from which we can fetch work
//
void WORK_FETCH::compute_cant_fetch_work_reason() {
    for (unsigned int i=0; i<gstate.projects.size(); i++) {
        PROJECT* p = gstate.projects[i];
        p->pwf.cant_fetch_work_reason = p->pwf.compute_cant_fetch_work_reason(p);
    }
}

342
void WORK_FETCH::rr_init() {
343
344
345
    for (int i=0; i<coprocs.n_rsc; i++) {
        rsc_work_fetch[i].rr_init();
    }
346
    compute_cant_fetch_work_reason();
347
348
    for (unsigned int i=0; i<gstate.projects.size(); i++) {
        PROJECT* p = gstate.projects[i];
349
        p->pwf.n_runnable_jobs = 0;
350
351
        for (int j=0; j<coprocs.n_rsc; j++) {
            p->rsc_pwf[j].rr_init(p, j);
352
        }
353
    }
David Anderson's avatar
David Anderson committed
354
355
}

356
#if 0
357
358
359
360
// if the given project is highest-priority among the projects
// eligible for the resource, set request fields
//
void RSC_WORK_FETCH::supplement(PROJECT* pp) {
361
    double x = pp->sched_priority;
362
363
364
    for (unsigned i=0; i<gstate.projects.size(); i++) {
        PROJECT* p = gstate.projects[i];
        if (p == pp) continue;
365
        if (p->pwf.cant_fetch_work_reason) continue;
366
367
368
        if (!project_state(p).may_have_work) continue;
        RSC_PROJECT_WORK_FETCH& rpwf = project_state(p);
        if (rpwf.anon_skip) continue;
369
        if (p->sched_priority > x) {
370
371
372
373
374
375
            if (log_flags.work_fetch_debug) {
                msg_printf(pp, MSG_INFO,
                    "[work_fetch]: not requesting work for %s: %s has higher priority",
                    rsc_name(rsc_type), p->get_project_name()
                );
            }
376
377
378
379
            return;
        }
    }
    // didn't find a better project; ask for work
David Anderson's avatar
David Anderson committed
380
    //
381
    set_request(pp);
382
383
}

David Anderson's avatar
David Anderson committed
384
385
386
387
// we're going to ask the given project for work of the given type.
// (or -1 if none)
// Set requests for this type and perhaps other types
//
388
void WORK_FETCH::set_all_requests_hyst(PROJECT* p, int rsc_type) {
389
390
    for (int i=0; i<coprocs.n_rsc; i++) {
        if (i == rsc_type) {
391
            rsc_work_fetch[i].set_request(p);
392
        } else {
393
394
395
396
397
            // don't fetch work for a resource if the buffer is above max
            //
            if (rsc_work_fetch[i].saturated_time > gstate.work_buf_total()) {
                continue;
            }
398
399
400
401
402
403
            
            // don't fetch work if backup project and no idle instances
            //
            if (p->resource_share==0 && rsc_work_fetch[i].nidle_now==0) {
                continue;
            }
404
405
406

            if (i>0 && !gpus_usable) {
                continue;
407
            }
408
            rsc_work_fetch[i].supplement(p);
409
410
411
412
        }
    }
}

413
void WORK_FETCH::set_all_requests(PROJECT* p) {
414
415
    for (int i=0; i<coprocs.n_rsc; i++) {
        if (i==0 || gpus_usable) {
416
            rsc_work_fetch[i].set_request(p);
417
        }
418
    }
419
}
420
#endif
421

422
void WORK_FETCH::print_state() {
423
424
    msg_printf(0, MSG_INFO, "[work_fetch] ------- start work fetch state -------");
    msg_printf(0, MSG_INFO, "[work_fetch] target work buffer: %.2f + %.2f sec",
425
426
        gstate.work_buf_min(), gstate.work_buf_additional()
    );
427
    msg_printf(0, MSG_INFO, "[work_fetch] --- project states ---");
428
429
    for (unsigned int i=0; i<gstate.projects.size(); i++) {
        PROJECT* p = gstate.projects[i];
430
431
432
433
434
        char buf[256];
        if (p->pwf.cant_fetch_work_reason) {
            sprintf(buf, "can't req work: %s",
                cant_fetch_work_string(p->pwf.cant_fetch_work_reason)
            );
435
        } else {
436
            strcpy(buf, "can req work");
437
        }
438
439
440
441
442
        if (p->min_rpc_time > gstate.now) {
            char buf2[256];
            sprintf(buf2, " (backoff: %.2f sec)", p->min_rpc_time - gstate.now);
            strcat(buf, buf2);
        }
443
        msg_printf(p, MSG_INFO, "[work_fetch] REC %.3f prio %.6f %s",
444
445
            p->pwf.rec,
            p->sched_priority,
446
            buf
447
448
449
450
        );
    }
    for (int i=0; i<coprocs.n_rsc; i++) {
        rsc_work_fetch[i].print_state(rsc_name(i));
451
    }
452
    msg_printf(0, MSG_INFO, "[work_fetch] ------- end work fetch state -------");
453
454
455
}

void WORK_FETCH::clear_request() {
456
457
458
    for (int i=0; i<coprocs.n_rsc; i++) {
        rsc_work_fetch[i].clear_request();
    }
459
460
}

461
462
463
464
465
466
467
bool WORK_FETCH::requested_work() {
    for (int i=0; i<coprocs.n_rsc; i++) {
        if (rsc_work_fetch[i].req_secs) return true;
    }
    return false;
}

468
469
470
// We're going to contact this project for reasons other than work fetch
// (e.g., to report completed results, or at user request).
// Decide if we should "piggyback" a work fetch request.
471
//
472
void WORK_FETCH::piggyback_work_request(PROJECT* p) {
473
    DEBUG(msg_printf(p, MSG_INFO, "piggyback_work_request()");)
474
    clear_request();
475
    if (config.fetch_minimal_work && gstate.had_or_requested_work) return;
476
    if (p->non_cpu_intensive) {
477
        if (!has_a_job_in_progress(p) && !p->dont_request_more_work) {
478
            rsc_work_fetch[0].req_secs = 1;
479
480
481
482
        }
        return;
    }

483
484
485
486
487
488
489
490
491
492
    setup();

    switch (p->pwf.cant_fetch_work_reason) {
    case 0:
    case CANT_FETCH_WORK_MIN_RPC_TIME:
        break;
    default:
        return;
    }

493
    // if project was updated from manager and config says so,
494
495
    // fetch work for a resource even if there are higher-prio projects
    // able to fetch it
496
    //
497
    bool check_higher_priority_projects = true;
498
    if (p->sched_rpc_pending && config.fetch_on_update) {
499
        check_higher_priority_projects = false;
500
    }
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524

    // For each resource, scan projects in decreasing priority,
    // seeing if there's one that's higher-priority than this
    // able to fetch work for the resource.
    // If not, and the resource needs topping off, do so
    //
    for (int i=0; i<coprocs.n_rsc; i++) {
        DEBUG(msg_printf(p, MSG_INFO, "piggyback: resource %s", rsc_name(i));)
        if (i && !gpus_usable) continue;
        RSC_WORK_FETCH& rwf = rsc_work_fetch[i];
        if (!rwf.can_fetch(p)) {
            DEBUG(msg_printf(p, MSG_INFO, "piggyback: can't fetch %s", rsc_name(i));)
            continue;
        }
        bool buffer_low = (rwf.saturated_time < gstate.work_buf_total());
        if (!buffer_low && !rwf.uses_starved_excluded_instances(p)) {
            DEBUG(msg_printf(p, MSG_INFO, "piggyback: don't need %s", rsc_name(i));)
            continue;
        }
        if (check_higher_priority_projects) {
            PROJECT* p2 = NULL;
            for (unsigned int j=0; j<gstate.projects.size(); j++) {
                p2 = gstate.projects[j];
                if (p2 == p) break;
525
526
527
528
                if (p2->pwf.cant_fetch_work_reason) {
                    DEBUG(msg_printf(p, MSG_INFO, "piggyback: %s can't fetch work", p2->project_name);)
                    continue;
                }
529
                if (rwf.can_fetch(p2) && !rwf.backed_off(p2)) {
530
531
                    DEBUG(msg_printf(p, MSG_INFO, "piggyback: better proj %s", p2->project_name);)
                    break;
532
                }
David Anderson's avatar
David Anderson committed
533
            }
534
535
536
537
538
539
540
            if (p != p2) continue;
        }
        DEBUG(msg_printf(p, MSG_INFO, "piggyback: requesting %s", rsc_name(i));)
        if (buffer_low) {
            rwf.set_request(p);
        } else {
            rwf.set_request_excluded(p);
David Anderson's avatar
David Anderson committed
541
        }
542
    }
543
544
545
    if (!requested_work()) {
        p->pwf.cant_fetch_work_reason = CANT_FETCH_WORK_DONT_NEED;
    }
546
547
}

548
549
550
// see if there's a fetchable non-CPU-intensive project without work
//
PROJECT* WORK_FETCH::non_cpu_intensive_project_needing_work() {
David Anderson's avatar
David Anderson committed
551
552
553
554
    for (unsigned int i=0; i<gstate.projects.size(); i++) {
        PROJECT* p = gstate.projects[i];
        if (!p->non_cpu_intensive) continue;
        if (!p->can_request_work()) continue;
555
        if (p->rsc_pwf[0].backoff_time > gstate.now) continue;
556
        if (has_a_job_in_progress(p)) continue;
557
        clear_request();
558
        rsc_work_fetch[0].req_secs = 1;
559
        return p;
David Anderson's avatar
David Anderson committed
560
561
    }
    return 0;
562
563
}

564
565
566
567
static bool higher_priority(PROJECT *p1, PROJECT *p2) {
    return (p1->sched_priority > p2->sched_priority);
}

568
569
570
571
572
573
574
575
576
577
// check resource-level backoff
//
bool RSC_WORK_FETCH::backed_off(PROJECT* p) {
    if (project_state(p).backoff_time > gstate.now) {
        DEBUG(msg_printf(p, MSG_INFO, "skip: backoff");)
        return true;
    }
    return false;
}

578
579
// a variety of checks for whether we should ask this project
// for work of this type
David Anderson's avatar
David Anderson committed
580
//
581
582
583
584
585
586
587
588
bool RSC_WORK_FETCH::can_fetch(PROJECT *p) {
    // see whether work fetch for this resource is banned
    // by prefs, config, project, or acct mgr
    //
    if (dont_fetch(p, rsc_type)) {
        DEBUG(msg_printf(p, MSG_INFO, "skip: dont_fetch");)
        return false;
    }
589

590
    RSC_PROJECT_WORK_FETCH& rpwf = project_state(p);
591

592
593
594
595
596
597
598
    // if project has zero resource share,
    // only fetch work if a device is idle
    //
    if (p->resource_share == 0 && nidle_now == 0) {
        DEBUG(msg_printf(p, MSG_INFO, "skip: zero share");)
        return false;
    }
599

600
601
602
603
604
605
606
607
608
609
610
611
612
613
    // if project has excluded GPUs of this type,
    // we need to avoid fetching work just because there's an idle instance
    // or a shortfall;
    // fetching work might not alleviate either of these,
    // and we'd end up fetching unbounded work.
    // At the same time, we want to respect work buf params if possible.
    //
    // Current policy:
    // don't fetch work if remaining time of this project's jobs
    // exceeds work_buf_min * (#usable instances / #instances)
    //
    // TODO: THIS IS FAIRLY CRUDE. Making it smarter would require
    // computing shortfall etc. on a per-project basis
    //
614
    int nexcl = rpwf.ncoprocs_excluded;
615
616
    if (rsc_type && nexcl) {
        int n_not_excluded = ninstances - nexcl;
617
618
619
        if (rpwf.n_runnable_jobs >= n_not_excluded
            && rpwf.queue_est > (gstate.work_buf_min() * n_not_excluded)/ninstances
        ) {
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
            DEBUG(msg_printf(p, MSG_INFO, "skip: too much work");)
            return false;
        }
    }

    if (rpwf.anon_skip) {
        DEBUG(msg_printf(p, MSG_INFO, "skip: anon");)
        return false;
    }
    return true;
}

// return true if there is exclusion starvation
// and this project can use the starved instances
//
bool RSC_WORK_FETCH::uses_starved_excluded_instances(PROJECT* p) {
    RSC_PROJECT_WORK_FETCH& rpwf = project_state(p);
    if (!sim_excluded_instances) return false;
    if ((sim_excluded_instances & rpwf.non_excluded_instances) == 0) {
        DEBUG(msg_printf(p, MSG_INFO, "skip: excl");)
        return false;
    }
    return true;
}

// setup for choose_project() and piggyback()
//
void WORK_FETCH::setup() {
648
649
    gstate.compute_nuploading_results();

650
    rr_simulation();
David Anderson's avatar
   
David Anderson committed
651
    compute_shares();
652
    project_priority_init(true);
653
    clear_request();
654

655
    // Decrement the priority of projects that have work queued.
656
657
658
659
    // Specifically, subtract
    // (FLOPs queued for P)/(FLOPs of max queue)
    // which will generally be between 0 and 1.
    // This is a little arbitrary but I can't think of anything better.
660
    //
661
    double max_queued_flops = gstate.work_buf_total()*total_peak_flops();
David Anderson's avatar
David Anderson committed
662
663
    for (unsigned int i=0; i<gstate.results.size(); i++) {
        RESULT* rp = gstate.results[i];
664
        PROJECT* p = rp->project;
665
        p->sched_priority -= rp->estimated_flops_remaining()/max_queued_flops;
David Anderson's avatar
David Anderson committed
666
    }
667
668
669
670
671
672
673
674
675
676

    // don't request work from projects w/ > 1000 runnable jobs
    //
    for (unsigned int i=0; i<gstate.projects.size(); i++) {
        PROJECT* p = gstate.projects[i];
        if (p->pwf.n_runnable_jobs > 1000 && !p->pwf.cant_fetch_work_reason) {
            p->pwf.cant_fetch_work_reason = CANT_FETCH_WORK_TOO_MANY_RUNNABLE;
        }
    }

677
678
679
680
681
    std::sort(
        gstate.projects.begin(),
        gstate.projects.end(),
        higher_priority
    );
682
683
684
    if (log_flags.work_fetch_debug) {
        print_state();
    }
685
686
687
688
}

// Choose a project to fetch work from,
// and set the request fields of resource objects.
689
690
// Set p->sched_rpc_pending; if you decide not to request work
// from the project, you must clear this.
691
692
693
694
695
//
PROJECT* WORK_FETCH::choose_project() {
    PROJECT* p;

    if (log_flags.work_fetch_debug) {
696
        msg_printf(0, MSG_INFO, "[work_fetch] entering choose_project()");
697
    }
698

699
700
701
702
703
704
705
    p = non_cpu_intensive_project_needing_work();
    if (p) return p;

    setup();

    for (int i=0; i<coprocs.n_rsc; i++) {
        rsc_work_fetch[i].found_project = NULL;
706
    }
707
708
709

    // scan projects in order of decreasing priority
    //
710
    bool found = false;
711
712
713
714
715
716
717
718
719
720
721
722
723
    for (unsigned int j=0; j<gstate.projects.size(); j++) {
        p = gstate.projects[j];
        DEBUG(msg_printf(p, MSG_INFO, "scanning");)
        if (p->pwf.cant_fetch_work_reason) {
            DEBUG(msg_printf(p, MSG_INFO, "skip: cfwr %d", p->pwf.cant_fetch_work_reason);)
            continue;
        }

        // For each resource type:
        // - See if we can ask this project for work of that type;
        //   if so set a flag so that lower-priority projects
        //   won't request it
        // - If so, see if work is needed for this type;
724
        //   if so, set "found_project" flag
725
726
727
728
729
        //
        int rsc_index = -1;
        for (int i=0; i<coprocs.n_rsc; i++) {
            if (i && !gpus_usable) continue;
            RSC_WORK_FETCH& rwf = rsc_work_fetch[i];
730
            if (rwf.can_fetch(p) && !rwf.backed_off(p)) {
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
                if (!rwf.found_project) {
                    rwf.found_project = p;
                }
                DEBUG(msg_printf(p, MSG_INFO, "can fetch %s", rsc_name(i));)
            } else {
                DEBUG(msg_printf(p, MSG_INFO, "can't fetch %s", rsc_name(i));)
                continue;
            }
            bool buffer_low = (rwf.saturated_time < gstate.work_buf_min());
            if (buffer_low || rwf.uses_starved_excluded_instances(p)) {
                DEBUG(msg_printf(p, MSG_INFO, "%s needs work", rsc_name(i));)
                rsc_index = i;
                break;
            }
        }

        // If rsc_index is nonzero, it's a resource that this project
        // can ask for work, and which needs work.
        // And this is the highest-priority project having this property.
        // Request work from this resource,
        // and any others for which this is the highest-priority project
        // able to request work
        //
        if (rsc_index >= 0) {
755
            bool any_request = false;
756
757
758
759
            for (int i=0; i<coprocs.n_rsc; i++) {
                if (i && !gpus_usable) continue;
                RSC_WORK_FETCH& rwf = rsc_work_fetch[i];
                bool buffer_low;
760
                DEBUG(msg_printf(p, MSG_INFO, "checking %s", rsc_name(i));)
761
762
763
764
                if (i == rsc_index) {
                    buffer_low = (rwf.saturated_time < gstate.work_buf_min());
                } else {
                    if (rwf.found_project && rwf.found_project != p) {
765
                        DEBUG(msg_printf(p, MSG_INFO, "%s not high prio proj", rsc_name(i));)
766
767
768
769
                        continue;
                    }
                    buffer_low = (rwf.saturated_time < gstate.work_buf_total());
                    if (!buffer_low && !rwf.uses_starved_excluded_instances(p)) {
770
                        DEBUG(msg_printf(p, MSG_INFO, "%s don't need", rsc_name(i));)
771
772
                        continue;
                    }
773
774
                    if (!rwf.can_fetch(p)) {
                        DEBUG(msg_printf(p, MSG_INFO, "%s can't fetch", rsc_name(i));)
775
776
777
778
779
                        continue;
                    }
                }
                if (buffer_low) {
                    rwf.set_request(p);
780
                    DEBUG(msg_printf(p, MSG_INFO, "%s set_request: %f", rsc_name(i), rwf.req_secs);)
781
782
                } else {
                    rwf.set_request_excluded(p);
783
                    DEBUG(msg_printf(p, MSG_INFO, "%s set_request_excluded: %f", rsc_name(i), rwf.req_secs);)
784
                }
785
786
787
788
789
790
791
                if (rwf.req_secs > 0) {
                    any_request = true;
                }
            }
            if (any_request) {
                found = true;
                break;
792
793
            }
        }
794
    }
795

796
    if (found) {
797
798
799
        p->sched_rpc_pending = RPC_REASON_NEED_WORK;
    } else {
        if (log_flags.work_fetch_debug) {
800
            msg_printf(0, MSG_INFO, "[work_fetch] No project chosen for work fetch");
801
        }
802
        p = NULL;
803
804
    }

805
806
    return p;
}
David Anderson's avatar
David Anderson committed
807

808
809
810
811
void WORK_FETCH::accumulate_inst_sec(ACTIVE_TASK* atp, double dt) {
    APP_VERSION* avp = atp->result->avp;
    PROJECT* p = atp->result->project;
    double x = dt*avp->avg_ncpus;
812
813
    p->rsc_pwf[0].secs_this_rec_interval += x;
    rsc_work_fetch[0].secs_this_rec_interval += x;
814
815
816
    int rt = avp->gpu_usage.rsc_type;
    if (rt) {
        x = dt*avp->gpu_usage.usage;
817
818
        p->rsc_pwf[rt].secs_this_rec_interval += x;
        rsc_work_fetch[rt].secs_this_rec_interval += x;
819
    }
820
}
David Anderson's avatar
David Anderson committed
821

822
823
824
825
826
827
828
829
// find total and per-project resource shares for each resource
//
void WORK_FETCH::compute_shares() {
    unsigned int i;
    PROJECT* p;
    for (i=0; i<gstate.projects.size(); i++) {
        p = gstate.projects[i];
        if (p->non_cpu_intensive) continue;
830
        if (p->pwf.cant_fetch_work_reason) continue;
831
832
833
834
        for (int j=0; j<coprocs.n_rsc; j++) {
            if (p->rsc_pwf[j].may_have_work) {
                rsc_work_fetch[j].total_fetchable_share += p->resource_share;
            }
835
        }
836
837
838
839
    }
    for (i=0; i<gstate.projects.size(); i++) {
        p = gstate.projects[i];
        if (p->non_cpu_intensive) continue;
840
        if (p->pwf.cant_fetch_work_reason) continue;
841
842
843
844
        for (int j=0; j<coprocs.n_rsc; j++) {
            if (p->rsc_pwf[j].may_have_work) {
                p->rsc_pwf[j].fetchable_share = rsc_work_fetch[j].total_fetchable_share?p->resource_share/rsc_work_fetch[j].total_fetchable_share:1;
            }
845
        }
846
847
848
    }
}

849
850
851
void WORK_FETCH::request_string(char* buf) {
    char buf2[256];
    sprintf(buf,
852
        "[work_fetch] request: CPU (%.2f sec, %.2f inst)",
853
        rsc_work_fetch[0].req_secs, rsc_work_fetch[0].req_instances
854
    );
855
856
857
    for (int i=1; i<coprocs.n_rsc; i++) {
        sprintf(buf2, " %s (%.2f sec, %.2f inst)",
            rsc_name(i), rsc_work_fetch[i].req_secs, rsc_work_fetch[i].req_instances
858
859
860
861
862
        );
        strcat(buf, buf2);
    }
}

863
void WORK_FETCH::write_request(FILE* f, PROJECT* p) {
864
    double work_req = rsc_work_fetch[0].req_secs;
865
866

    // if project is anonymous platform, set the overall work req
867
    // to the max of the requests of resource types for which we have versions.
868
    // Otherwise projects with old schedulers won't send us work.
869
    // THIS CAN BE REMOVED AT SOME POINT
870
    //
871
    if (p->anonymous_platform) {
872
873
874
875
876
        for (int i=1; i<coprocs.n_rsc; i++) {
            if (has_coproc_app(p, i)) {
                if (rsc_work_fetch[i].req_secs > work_req) {
                    work_req = rsc_work_fetch[i].req_secs;
                }
877
            }
878
879
        }
    }
880
    fprintf(f,
881
        "    <work_req_seconds>%f</work_req_seconds>\n"
882
        "    <cpu_req_secs>%f</cpu_req_secs>\n"
883
        "    <cpu_req_instances>%f</cpu_req_instances>\n"
884
        "    <estimated_delay>%f</estimated_delay>\n",
885
        work_req,
886
887
888
        rsc_work_fetch[0].req_secs,
        rsc_work_fetch[0].req_instances,
        rsc_work_fetch[0].req_secs?rsc_work_fetch[0].busy_time_estimator.get_busy_time():0
889
    );
890
    if (log_flags.work_fetch_debug) {
891
892
        char buf[256];
        request_string(buf);
David Anderson's avatar
David Anderson committed
893
        msg_printf(p, MSG_INFO, "%s", buf);
894
    }
David Anderson's avatar
David Anderson committed
895
896
}

897
// we just got a scheduler reply with the given jobs; update backoffs
David Anderson's avatar
David Anderson committed
898
//
899
void WORK_FETCH::handle_reply(
900
    PROJECT* p, SCHEDULER_REPLY*, vector<RESULT*> new_results
901
) {
902
    bool got_work[MAX_RSC];
903
    bool requested_work_rsc[MAX_RSC];
904
    for (int i=0; i<coprocs.n_rsc; i++) {
905
        got_work[i] = false;
906
        requested_work_rsc[i] = (rsc_work_fetch[i].req_secs > 0);
907
908
909
910
    }
    for (unsigned int i=0; i<new_results.size(); i++) {
        RESULT* rp = new_results[i];
        got_work[rp->avp->gpu_usage.rsc_type] = true;
911
912
    }

913
914
915
916
917
918
919
920
    for (int i=0; i<coprocs.n_rsc; i++) {
        // back off on a resource type if
        // - we asked for jobs
        // - we didn't get any
        // - we're not currently backed off for that type
        //   (i.e. don't back off because of a piggyback request)
        // - the RPC was done for a reason that is automatic
        //   and potentially frequent
921
        //
922
        if (requested_work_rsc[i] && !got_work[i]) {
923
924
925
926
927
            if (p->rsc_pwf[i].backoff_time < gstate.now) {
                switch (p->sched_rpc_pending) {
                case RPC_REASON_RESULTS_DUE:
                case RPC_REASON_NEED_WORK:
                case RPC_REASON_TRICKLE_UP:
928
                    p->rsc_pwf[i].resource_backoff(p, rsc_name(i));
929
                }
930
            }
931
        }
932
933
934
935
936
        // if we did get jobs, clear backoff
        //
        if (got_work[i]) {
            p->rsc_pwf[i].clear_backoff();
        }
David Anderson's avatar
David Anderson committed
937
938
939
    }
}

940
941
942
943
// set up for initial RPC.
// arrange to always get one job, even if we don't need it or can't handle it.
// (this is probably what user wants)
//
944
void WORK_FETCH::set_initial_work_request(PROJECT* p) {
945
946
    for (int i=0; i<coprocs.n_rsc; i++) {
        rsc_work_fetch[i].req_secs = 1;
947
948
        if (i) {
            RSC_WORK_FETCH& rwf = rsc_work_fetch[i];
949
            if (rwf.ninstances ==  p->rsc_pwf[i].ncoprocs_excluded) {
950
951
952
                rsc_work_fetch[i].req_secs = 0;
            }
        }
953
954
        rsc_work_fetch[i].req_instances = 0;
        rsc_work_fetch[i].busy_time_estimator.reset();
955
    }
956
957
}

958
959
// called once, at client startup
//
960
void WORK_FETCH::init() {
961
    rsc_work_fetch[0].init(0, gstate.ncpus, 1);
962
    double cpu_flops = gstate.host_info.p_fpops;
963

964
965
    // use 20% as a rough estimate of GPU efficiency

966
967
968
969
    for (int i=1; i<coprocs.n_rsc; i++) {
        rsc_work_fetch[i].init(
            i, coprocs.coprocs[i].count,
            coprocs.coprocs[i].count*0.2*coprocs.coprocs[i].peak_flops/cpu_flops
970
        );
971
    }
972