11/* *****************************************************
22Copyright (c) 2011-2013 Percona LLC and/or its affiliates.
3- Copyright (c) 2022, MariaDB Corporation.
43
54Compressing datasink implementation for XtraBackup.
65
@@ -33,8 +32,11 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
3332typedef struct {
3433pthread_t id;
3534uint num;
35+ pthread_mutex_t ctrl_mutex;
36+ pthread_cond_t ctrl_cond;
3637pthread_mutex_t data_mutex;
3738pthread_cond_t data_cond;
39+ my_bool started;
3840my_bool data_avail;
3941my_bool cancelled;
4042const char *from;
@@ -206,13 +208,14 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
206208
207209thd = threads + i;
208210
209- pthread_mutex_lock (&thd->data_mutex );
211+ pthread_mutex_lock (&thd->ctrl_mutex );
210212
211213chunk_len = (len > COMPRESS_CHUNK_SIZE) ?
212214COMPRESS_CHUNK_SIZE : len;
213215thd->from = ptr;
214216thd->from_len = chunk_len;
215217
218+ pthread_mutex_lock (&thd->data_mutex );
216219thd->data_avail = TRUE ;
217220pthread_cond_signal (&thd->data_cond );
218221pthread_mutex_unlock (&thd->data_mutex );
@@ -256,6 +259,7 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
256259 " failed." );
257260return 1 ;
258261}
262+ pthread_mutex_unlock (&threads[i].ctrl_mutex );
259263}
260264}
261265
@@ -325,23 +329,6 @@ write_uint64_le(ds_file_t *file, ulonglong n)
325329return ds_write (file, tmp, sizeof (tmp));
326330}
327331
328- static
329- void
330- destroy_worker_thread (comp_thread_ctxt_t *thd)
331- {
332- pthread_mutex_lock (&thd->data_mutex );
333- thd->cancelled = TRUE ;
334- pthread_cond_signal (&thd->data_cond );
335- pthread_mutex_unlock (&thd->data_mutex );
336-
337- pthread_join (thd->id , NULL );
338-
339- pthread_cond_destroy (&thd->data_cond );
340- pthread_mutex_destroy (&thd->data_mutex );
341-
342- my_free (thd->to );
343- }
344-
345332static
346333comp_thread_ctxt_t *
347334create_worker_threads (uint n)
@@ -356,32 +343,54 @@ create_worker_threads(uint n)
356343comp_thread_ctxt_t *thd = threads + i;
357344
358345thd->num = i + 1 ;
346+ thd->started = FALSE ;
359347thd->cancelled = FALSE ;
360348thd->data_avail = FALSE ;
361349
362350thd->to = (char *) my_malloc (COMPRESS_CHUNK_SIZE +
363351 MY_QLZ_COMPRESS_OVERHEAD,
364352 MYF (MY_FAE));
365353
354+ /* Initialize the control mutex and condition var */
355+ if (pthread_mutex_init (&thd->ctrl_mutex , NULL ) ||
356+ pthread_cond_init (&thd->ctrl_cond , NULL )) {
357+ goto err;
358+ }
359+
366360/* Initialize and data mutex and condition var */
367361if (pthread_mutex_init (&thd->data_mutex , NULL ) ||
368362 pthread_cond_init (&thd->data_cond , NULL )) {
369363goto err;
370364}
371365
366+ pthread_mutex_lock (&thd->ctrl_mutex );
367+
372368if (pthread_create (&thd->id , NULL , compress_worker_thread_func,
373369 thd)) {
374370msg (" compress: pthread_create() failed: "
375371 " errno = %d" , errno);
372+ pthread_mutex_unlock (&thd->ctrl_mutex );
376373goto err;
377374}
378375}
379376
377+ /* Wait for the threads to start */
378+ for (i = 0 ; i < n; i++) {
379+ comp_thread_ctxt_t *thd = threads + i;
380+
381+ while (thd->started == FALSE )
382+ pthread_cond_wait (&thd->ctrl_cond , &thd->ctrl_mutex );
383+ pthread_mutex_unlock (&thd->ctrl_mutex );
384+ }
385+
380386return threads;
381387
382388err:
383- for (; i; i--) {
384- destroy_worker_thread (threads + i);
389+ while (i > 0 ) {
390+ comp_thread_ctxt_t *thd;
391+ i--;
392+ thd = threads + i;
393+ pthread_mutex_unlock (&thd->ctrl_mutex );
385394}
386395
387396my_free (threads);
@@ -395,7 +404,21 @@ destroy_worker_threads(comp_thread_ctxt_t *threads, uint n)
395404uint i;
396405
397406for (i = 0 ; i < n; i++) {
398- destroy_worker_thread (threads + i);
407+ comp_thread_ctxt_t *thd = threads + i;
408+
409+ pthread_mutex_lock (&thd->data_mutex );
410+ threads[i].cancelled = TRUE ;
411+ pthread_cond_signal (&thd->data_cond );
412+ pthread_mutex_unlock (&thd->data_mutex );
413+
414+ pthread_join (thd->id , NULL );
415+
416+ pthread_cond_destroy (&thd->data_cond );
417+ pthread_mutex_destroy (&thd->data_mutex );
418+ pthread_cond_destroy (&thd->ctrl_cond );
419+ pthread_mutex_destroy (&thd->ctrl_mutex );
420+
421+ my_free (thd->to );
399422}
400423
401424my_free (threads);
@@ -407,9 +430,19 @@ compress_worker_thread_func(void *arg)
407430{
408431comp_thread_ctxt_t *thd = (comp_thread_ctxt_t *) arg;
409432
433+ pthread_mutex_lock (&thd->ctrl_mutex );
434+
410435pthread_mutex_lock (&thd->data_mutex );
411436
437+ thd->started = TRUE ;
438+ pthread_cond_signal (&thd->ctrl_cond );
439+
440+ pthread_mutex_unlock (&thd->ctrl_mutex );
441+
412442while (1 ) {
443+ thd->data_avail = FALSE ;
444+ pthread_cond_signal (&thd->data_cond );
445+
413446while (!thd->data_avail && !thd->cancelled ) {
414447pthread_cond_wait (&thd->data_cond , &thd->data_mutex );
415448}
0 commit comments