3636
3737#include < welcome_copyright_notice.h> /* ORACLE_WELCOME_COPYRIGHT_NOTICE */
3838
39-
40- /* Global Thread counter */
41- uint counter= 0 ;
42- pthread_mutex_t init_mutex;
43- pthread_mutex_t counter_mutex;
44- pthread_cond_t count_threshhold;
39+ #include < tpool.h>
40+ tpool::thread_pool *thread_pool;
4541
4642static void db_error_with_table (MYSQL *mysql, char *table);
4743static void db_error (MYSQL *mysql);
@@ -445,13 +441,10 @@ static MYSQL *db_connect(char *host, char *database,
445441 fprintf (stdout, " Connecting to %s\n " , host ? host : " localhost" );
446442 if (opt_use_threads && !lock_tables)
447443 {
448- pthread_mutex_lock (&init_mutex);
449444 if (!(mysql= mysql_init (NULL )))
450445 {
451- pthread_mutex_unlock (&init_mutex);
452446 return 0 ;
453447 }
454- pthread_mutex_unlock (&init_mutex);
455448 }
456449 else
457450 if (!(mysql= mysql_init (NULL )))
@@ -496,6 +489,8 @@ static MYSQL *db_connect(char *host, char *database,
496489 if (ignore_foreign_keys)
497490 mysql_query (mysql, " set foreign_key_checks= 0;" );
498491
492+ if (mysql_query (mysql, " /*!40101 set @@character_set_database=binary */;" ))
493+ db_error (mysql);
499494 return mysql;
500495}
501496
@@ -514,14 +509,10 @@ static void safe_exit(int error, MYSQL *mysql)
514509 if (error && ignore_errors)
515510 return ;
516511
517- /* in multi-threaded mode protect from concurrent safe_exit's */
518- if (counter)
519- pthread_mutex_lock (&counter_mutex);
520-
521512 if (mysql)
522513 mysql_close (mysql);
523514
524- if (counter )
515+ if (thread_pool )
525516 {
526517 /* dirty exit. some threads are running,
527518 memory is not freed, openssl not deinitialized */
@@ -603,49 +594,42 @@ static char *field_escape(char *to,const char *from,uint length)
603594 return to;
604595}
605596
606- int exitcode= 0 ;
607-
608- pthread_handler_t worker_thread (void *arg)
597+ std::atomic<int > exitcode;
598+ void set_exitcode (int code)
609599{
610- int error ;
611- char *raw_table_name= ( char *)arg ;
612- MYSQL *mysql= 0 ;
600+ int expected= 0 ;
601+ exitcode. compare_exchange_strong (expected,code) ;
602+ }
613603
614- if (mysql_thread_init ())
615- goto error;
616-
617- if (!(mysql= db_connect (current_host,current_db,current_user,opt_password)))
618- {
619- goto error;
620- }
604+ thread_local MYSQL *thread_local_mysql;
621605
622- if (mysql_query (mysql, " /*!40101 set @@character_set_database=binary */;" ))
623- {
624- db_error (mysql); /* We shall continue here, if --force was given */
625- goto error;
626- }
627606
607+ void load_single_table (void *arg)
608+ {
609+ int error;
610+ char *raw_table_name= (char *)arg;
611+ MYSQL *mysql= thread_local_mysql;
628612 /*
629613 We are not currently catching the error here.
630614 */
631615 if ((error= write_to_table (raw_table_name, mysql)))
632- if (exitcode == 0 )
633- exitcode= error;
634-
635- error:
636- if (mysql)
637- db_disconnect (current_host, mysql);
616+ set_exitcode (error);
617+ }
638618
639- pthread_mutex_lock (&counter_mutex);
640- counter--;
641- pthread_cond_signal (&count_threshhold);
642- pthread_mutex_unlock (&counter_mutex);
619+ static void tpool_thread_init (void )
620+ {
621+ mysql_thread_init ();
622+ thread_local_mysql= db_connect (current_host,current_db,current_user,opt_password);
623+ }
624+ static void tpool_thread_exit (void )
625+ {
626+ if (thread_local_mysql)
627+ db_disconnect (current_host,thread_local_mysql);
643628 mysql_thread_end ();
644- pthread_exit (0 );
645- return 0 ;
646629}
647630
648631
632+ #include < vector>
649633int main (int argc, char **argv)
650634{
651635 int error=0 ;
@@ -668,102 +652,33 @@ int main(int argc, char **argv)
668652
669653 if (opt_use_threads && !lock_tables)
670654 {
671- char **save_argv;
672- uint worker_thread_count= 0 , table_count= 0 , i= 0 ;
673- pthread_t *worker_threads; /* Thread descriptor */
674- pthread_attr_t attr; /* Thread attributes */
675- pthread_attr_init (&attr);
676- pthread_attr_setdetachstate (&attr,
677- PTHREAD_CREATE_JOINABLE);
678-
679- pthread_mutex_init (&init_mutex, NULL );
680- pthread_mutex_init (&counter_mutex, NULL );
681- pthread_cond_init (&count_threshhold, NULL );
682-
683- /* Count the number of tables. This number denotes the total number
684- of threads spawn.
685- */
686- save_argv= argv;
687- for (table_count= 0 ; *argv != NULL ; argv++)
688- table_count++;
689- argv= save_argv;
690-
691- if (!(worker_threads= (pthread_t *) my_malloc (PSI_NOT_INSTRUMENTED,
692- table_count * sizeof (*worker_threads), MYF (0 ))))
693- return -2 ;
694-
695- for (; *argv != NULL ; argv++) /* Loop through tables */
696- {
697- pthread_mutex_lock (&counter_mutex);
698- while (counter == opt_use_threads)
699- {
700- struct timespec abstime;
701-
702- set_timespec (abstime, 3 );
703- pthread_cond_timedwait (&count_threshhold, &counter_mutex, &abstime);
704- }
705- /* Before exiting the lock we set ourselves up for the next thread */
706- counter++;
707- pthread_mutex_unlock (&counter_mutex);
708- /* now create the thread */
709- if (pthread_create (&worker_threads[worker_thread_count], &attr,
710- worker_thread, (void *)*argv) != 0 )
711- {
712- pthread_mutex_lock (&counter_mutex);
713- counter--;
714- pthread_mutex_unlock (&counter_mutex);
715- fprintf (stderr," %s: Could not create thread\n " , my_progname);
716- continue ;
717- }
718- worker_thread_count++;
719- }
655+ thread_pool= tpool::create_thread_pool_generic (opt_use_threads,opt_use_threads);
656+ thread_pool->set_thread_callbacks (tpool_thread_init,tpool_thread_exit);
720657
721- /*
722- We loop until we know that all children have cleaned up.
723- */
724- pthread_mutex_lock (&counter_mutex);
725- while (counter)
726- {
727- struct timespec abstime;
728-
729- set_timespec (abstime, 3 );
730- pthread_cond_timedwait (&count_threshhold, &counter_mutex, &abstime);
731- }
732- pthread_mutex_unlock (&counter_mutex);
733- pthread_mutex_destroy (&init_mutex);
734- pthread_mutex_destroy (&counter_mutex);
735- pthread_cond_destroy (&count_threshhold);
736- pthread_attr_destroy (&attr);
658+ std::vector<tpool::task> all_tasks;
659+ for (int i=0 ; argv[i]; i++)
660+ all_tasks.push_back (tpool::task (load_single_table, argv[i]));
737661
738- for (i= 0 ; i < worker_thread_count; i++)
739- {
740- if (pthread_join (worker_threads[i], NULL ))
741- fprintf (stderr," %s: Could not join worker thread.\n " , my_progname);
742- }
662+ for (auto &t: all_tasks)
663+ thread_pool->submit_task (&t);
743664
744- my_free (worker_threads);
665+ delete thread_pool;
666+ thread_pool= nullptr ;
745667 }
746668 else
747669 {
748- MYSQL *mysql= 0 ;
749- if (!( mysql= db_connect (current_host,current_db,current_user,opt_password)) )
670+ MYSQL *mysql= db_connect (current_host,current_db,current_user,opt_password) ;
671+ if (!mysql)
750672 {
751673 free_defaults (argv_to_free);
752674 return (1 ); /* purecov: dead code */
753675 }
754676
755- if (mysql_query (mysql, " /*!40101 set @@character_set_database=binary */;" ))
756- {
757- db_error (mysql); /* We shall continue here, if --force was given */
758- return (1 );
759- }
760-
761677 if (lock_tables)
762678 lock_table (mysql, argc, argv);
763679 for (; *argv != NULL ; argv++)
764680 if ((error= write_to_table (*argv, mysql)))
765- if (exitcode == 0 )
766- exitcode= error;
681+ set_exitcode (error);
767682 db_disconnect (current_host, mysql);
768683 }
769684 safe_exit (0 , 0 );
0 commit comments