我正在尝试编写一个创建两个线程的程序:“前端”和“后端”线程。我想创建一个“后端”线程来迭代和计算斐波那契序列中的术语对,然后将它们放入数组中,并创建一个“前端”线程,该线程将在每次迭代时打印出阵列对。
“前端”线程-用于在每次迭代中显示“后端”线程操作的结果
“后端”线程-用于计算和设置数组
即。[5,8],并且在迭代后它将包含[13,21]
我正在努力在线程中实现Fibonacci序列部分,并且取得了以下进展:
#include#include #include #include int fib; void *front_end(void *ptr); void *back_end(void *ptr); int main() { pthread_t thread1, thread2; int arr[2] = {5,8}; const int *ptrtoarr; ptrtoarr=arr; int create1, create2; int *s=(int *)(ptrtoarr); printf("%d \n", *s); ptrtoarr++; s = (int *)(ptrtoarr); printf("%d \n", *s); ptrtoarr--; create1 = pthread_create(&thread1, NULL, back_end, &arr); if(create1) { fprintf(stderr,"Error - pthread_create() return code: %d\n",create1); exit(EXIT_FAILURE); } pthread_join(thread1, NULL); //pthread_join(thread2, NULL); } // front-end thread to be callback for each back-end iteration void *front_end(void *ptr) { int *sum = ptr; int i, upper = atoi(ptr); if (upper > 0) { for (i=0; i 0) { int pre1 = 0; int current; //calc fib numbers..... if(fib == 1){ printf("") } } }
有人可以指导我逐步解决这个问题吗?
您的骨骼需要工作。
假设以下内容:
unsigned n = ...; // How many to generate. unsigned n_ready = 2; // How many are ready to print. unsigned *fibs = malloc(sizeof(unsigned)*n); fibs[0] = 0; fibs[1] = 1;
作为后端工作人员的核心,您将拥有
for (unsigned i=2; i作为前端工作人员的核心,您将拥有
for (unsigned i=0; i= n_ready) /* Nothing */; printf("%u\n", fibs[i]); }
问题1
如果线程试图在另一个变量写入变量时尝试读取该变量,则会遇到问题。两个或多个线程同时读取同一变量是可以的。
通过两个线程所使用的变量是
n
,的元素fib[]
和n_ready
。
n
:
两个线程均未更改,因此我们无需控制对其的访问。
fib[i]
fori >= n_ready
:
仅由后端工作人员访问,因此我们无需控制对这些对象的访问。
fib[i]
fori < n_ready
:
仅由前端工作人员访问,因此我们无需控制对这些访问。
n_ready
:
后端工作人员可以随时设置n_ready
,而前端工作可以随时尝试读取n_ready
,因此我们确实需要控制对的访问n_ready
。
互斥锁通常用于确保一次只有一个线程正在访问资源(例如,变量,变量组,文件句柄等)。
我们的后端工人变成
for (unsigned i=2; i我们的前端工作者成为
for (unsigned i=0; i= n_ready) { // Allow other thread to gain the lock. pthread_mutex_unlock(&mutex); // We need to access n_ready. pthread_mutex_lock(&mutex); } // The mutex only protects n_ready // --nothing is going to change fib[i]-- // so we can release it now rather than later. pthread_mutex_unlock(&mutex); printf("%u\n", fibs[i]); }
问题二
您有一个忙碌的循环。通常,这很糟糕,因为这意味着您的线程正在使用100%的等待时间。(在这种情况下,由于
i >= n_ready
可能已经成立,因此这实际上是一个不错的策略。但是让我们忽略它。)一个线程可以休眠,直到另一个线程使用条件vars发出信号为止。我们的后端工人变成
for (unsigned i=2; i我们的前端工作者成为
for (unsigned i=0; i= n_ready) pthread_cond_wait(&cond, &mutex); // The mutex only protects n_ready // --nothing is going to change fib[i]-- // so we can release it now rather than later. pthread_mutex_unlock(&mutex); printf("%u\n", fibs[i]); } 始终调用
pthread_cond_wait
锁定的互斥锁。互斥量在调用时将解锁,并在返回之前将其锁定。这允许其他线程获取互斥体以进行更改n_ready
。
完整的代码:
#include#include #include #include #define UNUSED(x) (void)(x) // To control access to n_ready. static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; static unsigned n_ready = 0; // How many are ready to print. static unsigned n; // How many to generate. static unsigned *fibs = NULL; static void *back_worker(void *unused) { UNUSED(unused); fibs[0] = 0; fibs[1] = 1; // We need to access n_ready. pthread_mutex_lock(&mutex); n_ready = 2; // Wake up the other thread if it's blocked. pthread_cond_signal(&cond); pthread_mutex_unlock(&mutex); for (unsigned i=2; i = n_ready) pthread_cond_wait(&cond, &mutex); // The mutex only protects n_ready // --nothing is going to change fib[i]-- // so we can release it now rather than later. pthread_mutex_unlock(&mutex); printf("%u\n", fibs[i]); } return NULL; } int main(void) { n = 20; // How many to generate. fibs = malloc(sizeof(unsigned) * n); pthread_t back_thread; if (errno = pthread_create(&back_thread, NULL, back_worker, NULL)) { perror(NULL); exit(1); } pthread_t front_thread; if (errno = pthread_create(&front_thread, NULL, front_worker, NULL)) { perror(NULL); exit(1); } pthread_join(back_thread, NULL); pthread_join(front_thread, NULL); pthread_cond_destroy(&cond); pthread_mutex_destroy(&mutex); free(fibs); return 0; } 输出:
$ gcc -Wall -Wextra -pedantic a.c -o a -lpthread && a 0 1 1 2 3 5 8 13 21 34 55 89 144 233 377 610 987 1597 2584 4181
建议进行上述练习
创建一个工作池,以打印出放入队列的数字。输出不需要按顺序排列。
worker函数已经为您编写。您不能更改
main
或worker
功能。我什至为您创建了队列。您只需让它通过修改线程安全的Queue_enqueue
,Queue_dequeue
和Queue_done
功能。这些是您可能会更改的唯一功能。#include#include #include #include #include #include #include #define NUM_WORKERS 4 #define QUEUE_SIZE 10 #define NUM_ITEMS 40 typedef struct { pthread_mutex_t mutex; pthread_cond_t cond; int done; int empty; int full; size_t max; size_t next_insert; size_t next_read; unsigned *buf; } Queue; static void Queue_init(Queue* q, size_t max) { pthread_mutex_init(&(q->mutex), NULL); pthread_cond_init(&(q->cond), NULL); q->done = 0; q->empty = 1; q->full = 0; q->max = max; q->next_insert = 0; q->next_read = 0; q->buf = malloc(sizeof(unsigned)*max); } static void Queue_destroy(Queue *q) { free(q->buf); pthread_cond_destroy(&(q->cond)); pthread_mutex_destroy(&(q->mutex)); } static void Queue_done(Queue *q) { q->done = 1; } // Returns the oldest item from the queue (via a parameter) and returns 1. // If the queue is empty and done, returns 0. // If the queue is empty and not done, waits until that changes. static int Queue_dequeue(Queue *q, unsigned *i) { while (q->empty && !q->done) { } if (q->empty) { // We are completely done. return 0; } else { *i = q->buf[ q->next_read ]; q->next_read = ( q->next_read + 1 ) % q->max; q->empty = q->next_read == q->next_insert; q->full = 0; return 1; } } // Adds the argument to the queue. // If the queue is full, waits until that changes. static void Queue_enqueue(Queue *q, unsigned i) { while (q->full && !q->done) { } if (q->done) { fprintf(stderr, "Error: Attempted to add item to \"done\" queue.\n"); return; } q->buf[q->next_insert] = i; q->next_insert = ( q->next_insert + 1 ) % q->max; q->empty = 0; q->full = q->next_insert == q->next_read; } static int msleep(long msec) { struct timespec ts; int res; if (msec < 0) { errno = EINVAL; return -1; } ts.tv_sec = msec / 1000; ts.tv_nsec = (msec % 1000) * 1000000; do { res = nanosleep(&ts, &ts); } while (res && errno == EINTR); return res; } // Protects access to stdout. static pthread_mutex_t stdout_mutex; static Queue q; static void *worker(void *worker_id_) { uintptr_t worker_id = (uintptr_t)worker_id_; unsigned int seed = worker_id; // Whatever. unsigned i; while (Queue_dequeue(&q, &i)) { pthread_mutex_lock(&stdout_mutex); printf("[%" PRIuPTR "] Dequeued %u\n", worker_id, i); pthread_mutex_unlock(&stdout_mutex); // msleep( rand_r(&seed) % 1000 + 1000 ); // Simulate a 1 to 2s load. pthread_mutex_lock(&stdout_mutex); printf("[%" PRIuPTR "] Finished processing %u\n", worker_id, i); pthread_mutex_unlock(&stdout_mutex); } return NULL; } int main(void) { Queue_init(&q, QUEUE_SIZE); pthread_t workers[NUM_WORKERS]; for (uintptr_t i=0; i 如果您对此有疑问,请随时发布该问题的链接,作为对此答案的评论。
我一生从未如此感恩,这是这一切运作方式的绝佳解释!非常感谢您如此清晰,有条理地指导我完成这些工作,看到将它们全部整合到代码中是很有意义的,我不能强调它对于像我这样的学习者有多么宝贵!