@@ -34,9 +34,10 @@ typedef struct {
3434pthread_t id;
3535uint num;
3636pthread_mutex_t data_mutex;
37+ pthread_cond_t avail_cond;
3738pthread_cond_t data_cond;
3839pthread_cond_t done_cond;
39- my_bool data_avail;
40+ pthread_t data_avail;
4041my_bool cancelled;
4142const char *from;
4243size_t from_len;
@@ -195,9 +196,13 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
195196threads = comp_ctxt->threads ;
196197nthreads = comp_ctxt->nthreads ;
197198
199+ const pthread_t self = pthread_self ();
200+
198201ptr = (const char *) buf;
199202while (len > 0 ) {
200- uint max_thread;
203+ bool wait = nthreads == 1 ;
204+ retry:
205+ bool submitted = false ;
201206
202207/* Send data to worker threads for compression */
203208for (i = 0 ; i < nthreads; i++) {
@@ -206,30 +211,54 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
206211thd = threads + i;
207212
208213pthread_mutex_lock (&thd->data_mutex );
214+ if (thd->data_avail == pthread_t (~0UL )) {
215+ } else if (!wait) {
216+ skip:
217+ pthread_mutex_unlock (&thd->data_mutex );
218+ continue ;
219+ } else {
220+ for (;;) {
221+ pthread_cond_wait (&thd->avail_cond ,
222+ &thd->data_mutex );
223+ if (thd->data_avail
224+ == pthread_t (~0UL )) {
225+ break ;
226+ }
227+ goto skip;
228+ }
229+ }
209230
210231chunk_len = (len > COMPRESS_CHUNK_SIZE) ?
211232COMPRESS_CHUNK_SIZE : len;
212233thd->from = ptr;
213234thd->from_len = chunk_len;
214235
215- thd->data_avail = TRUE ;
236+ thd->data_avail = self ;
216237pthread_cond_signal (&thd->data_cond );
217238pthread_mutex_unlock (&thd->data_mutex );
218239
240+ submitted = true ;
219241len -= chunk_len;
220242if (len == 0 ) {
221243break ;
222244}
223245ptr += chunk_len;
224246}
225247
226- max_thread = (i < nthreads) ? i : nthreads - 1 ;
248+ if (!submitted) {
249+ wait = true ;
250+ goto retry;
251+ }
227252
228- /* Reap and stream the compressed data */
229- for (i = 0 ; i <= max_thread; i++) {
253+ for (i = 0 ; i < nthreads; i++) {
230254thd = threads + i;
231255
232256pthread_mutex_lock (&thd->data_mutex );
257+ if (thd->data_avail != self) {
258+ pthread_mutex_unlock (&thd->data_mutex );
259+ continue ;
260+ }
261+
233262while (!thd->to_len ) {
234263pthread_cond_wait (&thd->done_cond ,
235264 &thd->data_mutex );
@@ -247,6 +276,8 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
247276}
248277
249278thd->to_len = 0 ;
279+ thd->data_avail = pthread_t (~0UL );
280+ pthread_cond_signal (&thd->avail_cond );
250281pthread_mutex_unlock (&thd->data_mutex );
251282
252283if (fail) {
@@ -334,6 +365,7 @@ destroy_worker_thread(comp_thread_ctxt_t *thd)
334365
335366pthread_join (thd->id , NULL );
336367
368+ pthread_cond_destroy (&thd->avail_cond );
337369pthread_cond_destroy (&thd->data_cond );
338370pthread_cond_destroy (&thd->done_cond );
339371pthread_mutex_destroy (&thd->data_mutex );
@@ -364,11 +396,14 @@ create_worker_threads(uint n)
364396
365397/* Initialize and data mutex and condition var */
366398if (pthread_mutex_init (&thd->data_mutex , NULL ) ||
399+ pthread_cond_init (&thd->avail_cond , NULL ) ||
367400 pthread_cond_init (&thd->data_cond , NULL ) ||
368401 pthread_cond_init (&thd->done_cond , NULL )) {
369402goto err;
370403}
371404
405+ thd->data_avail = pthread_t (~0UL );
406+
372407if (pthread_create (&thd->id , NULL , compress_worker_thread_func,
373408 thd)) {
374409msg (" compress: pthread_create() failed: "
@@ -410,13 +445,13 @@ compress_worker_thread_func(void *arg)
410445pthread_mutex_lock (&thd->data_mutex );
411446
412447while (1 ) {
413- while (!thd->data_avail && !thd->cancelled ) {
448+ while (!thd->cancelled
449+ && (thd->to_len || thd->data_avail == pthread_t (~0UL ))) {
414450pthread_cond_wait (&thd->data_cond , &thd->data_mutex );
415451}
416452
417453if (thd->cancelled )
418454break ;
419- thd->data_avail = FALSE ;
420455thd->to_len = qlz_compress (thd->from , thd->to , thd->from_len ,
421456 &thd->state );
422457
0 commit comments