Skip to content

Commit 04988d8

Browse files
committed
MDEV-33627 refactor threading in mariadb-import
Use threadpool, instead of one-thread-and-connection-per-table
1 parent c483c5c commit 04988d8

File tree

2 files changed

+42
-126
lines changed

2 files changed

+42
-126
lines changed

client/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ MYSQL_ADD_EXECUTABLE(mariadb-dump mysqldump.cc ../sql-common/my_user.c connectio
5757
TARGET_LINK_LIBRARIES(mariadb-dump ${CLIENT_LIB})
5858

5959
MYSQL_ADD_EXECUTABLE(mariadb-import mysqlimport.cc)
60-
TARGET_LINK_LIBRARIES(mariadb-import ${CLIENT_LIB})
60+
target_include_directories(mariadb-import PRIVATE ${CMAKE_SOURCE_DIR}/tpool)
61+
target_link_libraries(mariadb-import PRIVATE tpool ${CLIENT_LIB})
6162

6263
MYSQL_ADD_EXECUTABLE(mariadb-upgrade mysql_upgrade.c COMPONENT Server)
6364
TARGET_LINK_LIBRARIES(mariadb-upgrade ${CLIENT_LIB})

client/mysqlimport.cc

Lines changed: 40 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,8 @@
3636

3737
#include <welcome_copyright_notice.h> /* ORACLE_WELCOME_COPYRIGHT_NOTICE */
3838

39-
40-
/* Global Thread counter */
41-
uint counter= 0;
42-
pthread_mutex_t init_mutex;
43-
pthread_mutex_t counter_mutex;
44-
pthread_cond_t count_threshhold;
39+
#include <tpool.h>
40+
tpool::thread_pool *thread_pool;
4541

4642
static void db_error_with_table(MYSQL *mysql, char *table);
4743
static void db_error(MYSQL *mysql);
@@ -445,13 +441,10 @@ static MYSQL *db_connect(char *host, char *database,
445441
fprintf(stdout, "Connecting to %s\n", host ? host : "localhost");
446442
if (opt_use_threads && !lock_tables)
447443
{
448-
pthread_mutex_lock(&init_mutex);
449444
if (!(mysql= mysql_init(NULL)))
450445
{
451-
pthread_mutex_unlock(&init_mutex);
452446
return 0;
453447
}
454-
pthread_mutex_unlock(&init_mutex);
455448
}
456449
else
457450
if (!(mysql= mysql_init(NULL)))
@@ -496,6 +489,8 @@ static MYSQL *db_connect(char *host, char *database,
496489
if (ignore_foreign_keys)
497490
mysql_query(mysql, "set foreign_key_checks= 0;");
498491

492+
if (mysql_query(mysql, "/*!40101 set @@character_set_database=binary */;"))
493+
db_error(mysql);
499494
return mysql;
500495
}
501496

@@ -514,14 +509,10 @@ static void safe_exit(int error, MYSQL *mysql)
514509
if (error && ignore_errors)
515510
return;
516511

517-
/* in multi-threaded mode protect from concurrent safe_exit's */
518-
if (counter)
519-
pthread_mutex_lock(&counter_mutex);
520-
521512
if (mysql)
522513
mysql_close(mysql);
523514

524-
if (counter)
515+
if (thread_pool)
525516
{
526517
/* dirty exit. some threads are running,
527518
memory is not freed, openssl not deinitialized */
@@ -603,49 +594,42 @@ static char *field_escape(char *to,const char *from,uint length)
603594
return to;
604595
}
605596

606-
int exitcode= 0;
607-
608-
pthread_handler_t worker_thread(void *arg)
597+
std::atomic<int> exitcode;
598+
void set_exitcode(int code)
609599
{
610-
int error;
611-
char *raw_table_name= (char *)arg;
612-
MYSQL *mysql= 0;
600+
int expected= 0;
601+
exitcode.compare_exchange_strong(expected,code);
602+
}
613603

614-
if (mysql_thread_init())
615-
goto error;
616-
617-
if (!(mysql= db_connect(current_host,current_db,current_user,opt_password)))
618-
{
619-
goto error;
620-
}
604+
thread_local MYSQL *thread_local_mysql;
621605

622-
if (mysql_query(mysql, "/*!40101 set @@character_set_database=binary */;"))
623-
{
624-
db_error(mysql); /* We shall continue here, if --force was given */
625-
goto error;
626-
}
627606

607+
void load_single_table(void *arg)
608+
{
609+
int error;
610+
char *raw_table_name= (char *)arg;
611+
MYSQL *mysql= thread_local_mysql;
628612
/*
629613
We are not currently catching the error here.
630614
*/
631615
if((error= write_to_table(raw_table_name, mysql)))
632-
if (exitcode == 0)
633-
exitcode= error;
634-
635-
error:
636-
if (mysql)
637-
db_disconnect(current_host, mysql);
616+
set_exitcode(error);
617+
}
638618

639-
pthread_mutex_lock(&counter_mutex);
640-
counter--;
641-
pthread_cond_signal(&count_threshhold);
642-
pthread_mutex_unlock(&counter_mutex);
619+
static void tpool_thread_init(void)
620+
{
621+
mysql_thread_init();
622+
thread_local_mysql= db_connect(current_host,current_db,current_user,opt_password);
623+
}
624+
static void tpool_thread_exit(void)
625+
{
626+
if (thread_local_mysql)
627+
db_disconnect(current_host,thread_local_mysql);
643628
mysql_thread_end();
644-
pthread_exit(0);
645-
return 0;
646629
}
647630

648631

632+
#include <vector>
649633
int main(int argc, char **argv)
650634
{
651635
int error=0;
@@ -668,102 +652,33 @@ int main(int argc, char **argv)
668652

669653
if (opt_use_threads && !lock_tables)
670654
{
671-
char **save_argv;
672-
uint worker_thread_count= 0, table_count= 0, i= 0;
673-
pthread_t *worker_threads; /* Thread descriptor */
674-
pthread_attr_t attr; /* Thread attributes */
675-
pthread_attr_init(&attr);
676-
pthread_attr_setdetachstate(&attr,
677-
PTHREAD_CREATE_JOINABLE);
678-
679-
pthread_mutex_init(&init_mutex, NULL);
680-
pthread_mutex_init(&counter_mutex, NULL);
681-
pthread_cond_init(&count_threshhold, NULL);
682-
683-
/* Count the number of tables. This number denotes the total number
684-
of threads spawn.
685-
*/
686-
save_argv= argv;
687-
for (table_count= 0; *argv != NULL; argv++)
688-
table_count++;
689-
argv= save_argv;
690-
691-
if (!(worker_threads= (pthread_t*) my_malloc(PSI_NOT_INSTRUMENTED,
692-
table_count * sizeof(*worker_threads), MYF(0))))
693-
return -2;
694-
695-
for (; *argv != NULL; argv++) /* Loop through tables */
696-
{
697-
pthread_mutex_lock(&counter_mutex);
698-
while (counter == opt_use_threads)
699-
{
700-
struct timespec abstime;
701-
702-
set_timespec(abstime, 3);
703-
pthread_cond_timedwait(&count_threshhold, &counter_mutex, &abstime);
704-
}
705-
/* Before exiting the lock we set ourselves up for the next thread */
706-
counter++;
707-
pthread_mutex_unlock(&counter_mutex);
708-
/* now create the thread */
709-
if (pthread_create(&worker_threads[worker_thread_count], &attr,
710-
worker_thread, (void *)*argv) != 0)
711-
{
712-
pthread_mutex_lock(&counter_mutex);
713-
counter--;
714-
pthread_mutex_unlock(&counter_mutex);
715-
fprintf(stderr,"%s: Could not create thread\n", my_progname);
716-
continue;
717-
}
718-
worker_thread_count++;
719-
}
655+
thread_pool= tpool::create_thread_pool_generic(opt_use_threads,opt_use_threads);
656+
thread_pool->set_thread_callbacks(tpool_thread_init,tpool_thread_exit);
720657

721-
/*
722-
We loop until we know that all children have cleaned up.
723-
*/
724-
pthread_mutex_lock(&counter_mutex);
725-
while (counter)
726-
{
727-
struct timespec abstime;
728-
729-
set_timespec(abstime, 3);
730-
pthread_cond_timedwait(&count_threshhold, &counter_mutex, &abstime);
731-
}
732-
pthread_mutex_unlock(&counter_mutex);
733-
pthread_mutex_destroy(&init_mutex);
734-
pthread_mutex_destroy(&counter_mutex);
735-
pthread_cond_destroy(&count_threshhold);
736-
pthread_attr_destroy(&attr);
658+
std::vector<tpool::task> all_tasks;
659+
for (int i=0; argv[i]; i++)
660+
all_tasks.push_back(tpool::task(load_single_table, argv[i]));
737661

738-
for(i= 0; i < worker_thread_count; i++)
739-
{
740-
if (pthread_join(worker_threads[i], NULL))
741-
fprintf(stderr,"%s: Could not join worker thread.\n", my_progname);
742-
}
662+
for (auto &t: all_tasks)
663+
thread_pool->submit_task(&t);
743664

744-
my_free(worker_threads);
665+
delete thread_pool;
666+
thread_pool= nullptr;
745667
}
746668
else
747669
{
748-
MYSQL *mysql= 0;
749-
if (!(mysql= db_connect(current_host,current_db,current_user,opt_password)))
670+
MYSQL *mysql= db_connect(current_host,current_db,current_user,opt_password);
671+
if (!mysql)
750672
{
751673
free_defaults(argv_to_free);
752674
return(1); /* purecov: dead code */
753675
}
754676

755-
if (mysql_query(mysql, "/*!40101 set @@character_set_database=binary */;"))
756-
{
757-
db_error(mysql); /* We shall continue here, if --force was given */
758-
return(1);
759-
}
760-
761677
if (lock_tables)
762678
lock_table(mysql, argc, argv);
763679
for (; *argv != NULL; argv++)
764680
if ((error= write_to_table(*argv, mysql)))
765-
if (exitcode == 0)
766-
exitcode= error;
681+
set_exitcode(error);
767682
db_disconnect(current_host, mysql);
768683
}
769684
safe_exit(0, 0);

0 commit comments

Comments
 (0)