Concurrency with Thread Pools - Chad Salinas ::: Data Scientist
Life and times of Chad Salinas
Chad Salinas, golf caddy, data scientist, chad rPubs, recovering chess addict, daddy caddy
1182
post-template-default,single,single-post,postid-1182,single-format-standard,qode-listing-1.0.1,qode-social-login-1.0,qode-news-1.0,qode-quick-links-1.0,qode-restaurant-1.0,ajax_fade,page_not_loaded,,qode-title-hidden,qode_grid_1300,qode-theme-ver-12.0.1,qode-theme-bridge,bridge,wpb-js-composer js-comp-ver-5.4.2,vc_responsive

Concurrency with Thread Pools

Thread Pool

static set<string> sunets;
static map<short, int> counts;  // initially empty
static mutex m;   // // initially unlocked

int countProcesses(short num, const set<string>& sunets);

int main() {
  readInSunets();  // singly threaded
  compileCountsMap();
  // threading done
  // crawl over the built-up map to get the id of least loaded machine
 return 0;
}
static void compileCountsMap() {
    thread threads[30];  // 30 machines to poll
    for(size_t i = 0; i < 30; i++) {
        threads[i] = thread([i]() {
            int count = countProcesses(i + 1, sunets);
            if( count > = 0) {
               lock.guard <mutex> lg(m);
               counts[i + 1] = counts;
           }
     });
t.join();
}
The above has the problem of not being scalable.  For some small number of machines, think short, the solution is fine.  However, if we need to poll a million machines or a number greater than the allowable number of simultaneous threads, then we are going to have to think about a thread pool.  This brings the semaphore and its permission slip semantics back in to the solution.
So, rewriting compileCountsMap() from above, we have:
static void compileCountsMap() {
    thread threads[300];  // to represent machines > than allowable threads
    semaphore permits(12);   // only allow 12 simultaneous threads
    for(size_t i = 0; i < 300; i++) {
        permits.wait();
        threads[i] = thread(countProcesses, i + 1, ref(permits));  // ref() b/c can’t pass semaphore by value
    }
    for(threads& t: threads) t.join();
}
// need to pass around the master count of permision slips
static void countProcesses(short num, semaphore& permits) {   // has to be void ret.
    int count = countCS110Processes(num, sunets);
    if( count > = 0) {
        lock.guard <mutex> lg(m);
        counts[num] = counts;
    }
    permits.signal(on_thread_exit);  // schedules the semaphore on thread destruction
}

No Comments

Sorry, the comment form is closed at this time.