-
大小: 9.38KB文件類型: .zip金幣: 1下載: 0 次發布日期: 2021-02-22
- 標簽: threadpool??thread??Read??AD??
資源簡介
threadpool
【核心代碼】
/* * Copyright (c) 2016, Mathias Brossard <mathias@brossard.org>. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are * met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ /** * @file threadpool.c * @brief Threadpool implementation file */ #include <stdlib.h> #include <pthread.h> #include <unistd.h> #include "threadpool.h" typedef enum { immediate_shutdown = 1, graceful_shutdown = 2 } threadpool_shutdown_t; /** * @struct threadpool_task * @brief the work struct * * @var function Pointer to the function that will perform the task. * @var argument Argument to be passed to the function. */ typedef struct { void (*function)(void *); void *argument; } threadpool_task_t; /** * @struct threadpool * @brief The threadpool struct * * @var notify Condition variable to notify worker threads. * @var threads Array containing worker threads ID. * @var thread_count Number of threads * @var queue Array containing the task queue. * @var queue_size Size of the task queue. * @var head Index of the first element. * @var tail Index of the next element. * @var count Number of pending tasks * @var shutdown Flag indicating if the pool is shutting down * @var started Number of started threads */ struct threadpool_t { pthread_mutex_t lock; pthread_cond_t notify; pthread_t *threads; threadpool_task_t *queue; int thread_count; int queue_size; int head; int tail; int count; int shutdown; int started; }; /** * @function void *threadpool_thread(void *threadpool) * @brief the worker thread * @param threadpool the pool which own the thread */ static void *threadpool_thread(void *threadpool); int threadpool_free(threadpool_t *pool); threadpool_t *threadpool_create(int thread_count, int queue_size, int flags) { threadpool_t *pool; int i; (void) flags; if(thread_count <= 0 || thread_count > MAX_THREADS || queue_size <= 0 || queue_size > MAX_QUEUE) { return NULL; } if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) { goto err; } /* Initialize */ pool->thread_count = 0; pool->queue_size = queue_size; pool->head = pool->tail = pool->count = 0; pool->shutdown = pool->started = 0; /* Allocate thread and task queue */ pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thread_count); pool->queue = (threadpool_task_t *)malloc (sizeof(threadpool_task_t) * queue_size); /* Initialize mutex and conditional variable first */ if((pthread_mutex_init(&(pool->lock), NULL) != 0) || (pthread_cond_init(&(pool->notify), NULL) != 0) || (pool->threads == NULL) || (pool->queue == NULL)) { goto err; } /* Start worker threads */ for(i = 0; i < thread_count; i ) { if(pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void*)pool) != 0) { threadpool_destroy(pool, 0); return NULL; } pool->thread_count ; pool->started ; } return pool; err: if(pool) { threadpool_free(pool); } return NULL; } int threadpool_add(threadpool_t *pool, void (*function)(void *), void *argument, int flags) { int err = 0; int next; (void) flags; if(pool == NULL || function == NULL) { return threadpool_invalid; } if(pthread_mutex_lock(&(pool->lock)) != 0) { return threadpool_lock_failure; } next = (pool->tail 1) % pool->queue_size; do { /* Are we full ? */ if(pool->count == pool->queue_size) { err = threadpool_queue_full; break; } /* Are we shutting down ? */ if(pool->shutdown) { err = threadpool_shutdown; break; } /* Add task to queue */ pool->queue[pool->tail].function = function; pool->queue[pool->tail].argument = argument; pool->tail = next; pool->count = 1; /* pthread_cond_broadcast */ if(pthread_cond_signal(&(pool->notify)) != 0) { err = threadpool_lock_failure; break; } } while(0); if(pthread_mutex_unlock(&pool->lock) != 0) { err = threadpool_lock_failure; } return err; } int threadpool_destroy(threadpool_t *pool, int flags) { int i, err = 0; if(pool == NULL) { return threadpool_invalid; } if(pthread_mutex_lock(&(pool->lock)) != 0) { return threadpool_lock_failure; } do { /* Already shutting down */ if(pool->shutdown) { err = threadpool_shutdown; break; } pool->shutdown = (flags & threadpool_graceful) ? graceful_shutdown : immediate_shutdown; /* Wake up all worker threads */ if((pthread_cond_broadcast(&(pool->notify)) != 0) || (pthread_mutex_unlock(&(pool->lock)) != 0)) { err = threadpool_lock_failure; break; } /* Join all worker thread */ for(i = 0; i < pool->thread_count; i ) { if(pthread_join(pool->threads[i], NULL) != 0) { err = threadpool_thread_failure; } } } while(0); /* Only if everything went well do we deallocate the pool */ if(!err) { threadpool_free(pool); } return err; } int threadpool_free(threadpool_t *pool) { if(pool == NULL || pool->started > 0) { return -1; } /* Did we manage to allocate ? */ if(pool->threads) { free(pool->threads); free(pool->queue); /* Because we allocate pool->threads after initializing the mutex and condition variable, we're sure they're initialized. Let's lock the mutex just in case. */ pthread_mutex_lock(&(pool->lock)); pthread_mutex_destroy(&(pool->lock)); pthread_cond_destroy(&(pool->notify)); } free(pool); return 0; } static void *threadpool_thread(void *threadpool) { threadpool_t *pool = (threadpool_t *)threadpool; threadpool_task_t task; for(;;) { /* Lock must be taken to wait on conditional variable */ pthread_mutex_lock(&(pool->lock)); /* Wait on condition variable, check for spurious wakeups. When returning from pthread_cond_wait(), we own the lock. */ while((pool->count == 0) && (!pool->shutdown)) { pthread_cond_wait(&(pool->notify), &(pool->lock)); } if((pool->shutdown == immediate_shutdown) || ((pool->shutdown == graceful_shutdown) && (pool->count == 0))) { break; } /* Grab our task */ task.function = pool->queue[pool->head].function; task.argument = pool->queue[pool->head].argument; pool->head = (pool->head 1) % pool->queue_size; pool->count -= 1; /* Unlock */ pthread_mutex_unlock(&(pool->lock)); /* Get to work */ (*(task.function))(task.argument); } pool->started--; pthread_mutex_unlock(&(pool->lock)); pthread_exit(NULL); return(NULL); }
代碼片段和文件信息
/*
?*?Copyright?(c)?2016?Mathias?Brossard?.
?*?All?rights?reserved.
?*?
?*?Redistribution?and?use?in?source?and?binary?forms?with?or?without
?*?modification?are?permitted?provided?that?the?following?conditions?are
?*?met:
?*?
?*??1.?Redistributions?of?source?code?must?retain?the?above?copyright
?*?????notice?this?list?of?conditions?and?the?following?disclaimer.
?*?
?*??2.?Redistributions?in?binary?form?must?reproduce?the?above?copyright
?*?????notice?this?list?of?conditions?and?the?following?disclaimer?in?the
?*?????documentation?and/or?other?materials?provided?with?the?distribution.
?*?
?*?THIS?SOFTWARE?IS?PROVIDED?BY?THE?COPYRIGHT?HOLDERS?AND?CONTRIBUTORS
?*?“AS?IS“?AND?ANY?EXPRESS?OR?IMPLIED?WARRANTIES?INCLUDING?BUT?NOT
?*?LIMITED?TO?THE?IMPLIED?WARRANTIES?OF?MERCHANTABILITY?AND?FITNESS?FOR
?*?A?PARTICULAR?PURPOSE?ARE?DISCLAIMED.?IN?NO?EVENT?SHALL?THE?COPYRIGHT
?*?HOLDER?OR?CONTRIBUTORS?BE?LIABLE?FOR?ANY?DIRECT?INDIRECT?INCIDENTAL
?*?SPECIAL?EXEMPLA
?屬性????????????大小?????日期????時間???名稱
-----------?---------??----------?-----??----
?????目錄???????????0??2016-10-04?18:11??threadpool-master\
?????文件??????????16??2016-10-04?18:11??threadpool-master\.gitignore
?????文件?????????808??2016-10-04?18:11??threadpool-master\.travis.yml
?????文件????????1310??2016-10-04?18:11??threadpool-master\LICENSE
?????文件?????????937??2016-10-04?18:11??threadpool-master\Makefile
?????文件?????????896??2016-10-04?18:11??threadpool-master\README.md
?????文件?????????246??2016-10-04?18:11??threadpool-master\package.json
?????目錄???????????0??2016-10-04?18:11??threadpool-master\src\
?????文件????????8491??2016-10-04?18:11??threadpool-master\src\threadpool.c
?????文件????????3368??2016-10-04?18:11??threadpool-master\src\threadpool.h
?????目錄???????????0??2016-10-04?18:11??threadpool-master\tests\
?????文件????????1459??2016-10-04?18:11??threadpool-master\tests\heavy.c
?????文件????????1033??2016-10-04?18:11??threadpool-master\tests\shutdown.c
?????文件?????????964??2016-10-04?18:11??threadpool-master\tests\thrdtest.c
- 上一篇:計算電器所消耗的電能.cpp
- 下一篇:茶壺的光照觀察組(c++代碼)
評論
共有 條評論