Header And Logo

PostgreSQL
| The world's most advanced open source database.

parallel.c

Go to the documentation of this file.
00001 /*
00002  *  parallel.c
00003  *
00004  *  multi-process support
00005  *
00006  *  Copyright (c) 2010-2013, PostgreSQL Global Development Group
00007  *  contrib/pg_upgrade/parallel.c
00008  */
00009 
00010 #include "postgres_fe.h"
00011 
00012 #include "pg_upgrade.h"
00013 
00014 #include <stdlib.h>
00015 #include <string.h>
00016 #include <sys/types.h>
00017 #include <sys/wait.h>
00018 
00019 #ifdef WIN32
00020 #include <io.h>
00021 #endif
00022 
00023 static int parallel_jobs;
00024 
00025 #ifdef WIN32
00026 /*
00027  *  Array holding all active threads.  There can't be any gaps/zeros so
00028  *  it can be passed to WaitForMultipleObjects().  We use two arrays
00029  *  so the thread_handles array can be passed to WaitForMultipleObjects().
00030  */
00031 HANDLE *thread_handles;
00032 
00033 typedef struct {
00034     char log_file[MAXPGPATH];
00035     char opt_log_file[MAXPGPATH];
00036     char cmd[MAX_STRING];
00037 } exec_thread_arg;
00038 
00039 typedef struct {
00040     DbInfoArr *old_db_arr;
00041     DbInfoArr *new_db_arr;
00042     char old_pgdata[MAXPGPATH];
00043     char new_pgdata[MAXPGPATH];
00044     char old_tablespace[MAXPGPATH];
00045 } transfer_thread_arg;
00046 
00047 exec_thread_arg **exec_thread_args;
00048 transfer_thread_arg **transfer_thread_args;
00049 
00050 /* track current thread_args struct so reap_child() can be used for all cases */
00051 void **cur_thread_args;
00052 
00053 DWORD win32_exec_prog(exec_thread_arg *args);
00054 DWORD win32_transfer_all_new_dbs(transfer_thread_arg *args);
00055 
00056 #endif
00057 
00058 /*
00059  *  parallel_exec_prog
00060  *
00061  *  This has the same API as exec_prog, except it does parallel execution,
00062  *  and therefore must throw errors and doesn't return an error status.
00063  */
00064 void
00065 parallel_exec_prog(const char *log_file, const char *opt_log_file,
00066                    const char *fmt,...)
00067 {
00068     va_list     args;
00069     char        cmd[MAX_STRING];
00070 #ifndef WIN32
00071     pid_t       child;
00072 #else
00073     HANDLE      child;
00074     exec_thread_arg *new_arg;
00075 #endif
00076 
00077     va_start(args, fmt);
00078     vsnprintf(cmd, sizeof(cmd), fmt, args);
00079     va_end(args);
00080 
00081     if (user_opts.jobs <= 1)
00082         /* throw_error must be true to allow jobs */
00083         exec_prog(log_file, opt_log_file, true, "%s", cmd);
00084     else
00085     {
00086         /* parallel */
00087 #ifdef WIN32
00088         cur_thread_args = (void **)exec_thread_args;
00089 #endif  
00090         /* harvest any dead children */
00091         while (reap_child(false) == true)
00092             ;
00093 
00094         /* must we wait for a dead child? */
00095         if (parallel_jobs >= user_opts.jobs)
00096             reap_child(true);
00097             
00098         /* set this before we start the job */
00099         parallel_jobs++;
00100     
00101         /* Ensure stdio state is quiesced before forking */
00102         fflush(NULL);
00103 
00104 #ifndef WIN32
00105         child = fork();
00106         if (child == 0)
00107             /* use _exit to skip atexit() functions */
00108             _exit(!exec_prog(log_file, opt_log_file, true, "%s", cmd));
00109         else if (child < 0)
00110             /* fork failed */
00111             pg_log(PG_FATAL, "could not create worker process: %s\n", strerror(errno));
00112 #else
00113         if (thread_handles == NULL)
00114         {
00115             int i;
00116 
00117             thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
00118             exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *));
00119 
00120             /*
00121              *  For safety and performance, we keep the args allocated during
00122              *  the entire life of the process, and we don't free the args
00123              *  in a thread different from the one that allocated it.
00124              */
00125             for (i = 0; i < user_opts.jobs; i++)
00126                 exec_thread_args[i] = pg_malloc(sizeof(exec_thread_arg));
00127         }
00128 
00129         /* use first empty array element */
00130         new_arg = exec_thread_args[parallel_jobs-1];
00131 
00132         /* Can only pass one pointer into the function, so use a struct */
00133         strcpy(new_arg->log_file, log_file);
00134         strcpy(new_arg->opt_log_file, opt_log_file);
00135         strcpy(new_arg->cmd, cmd);
00136 
00137         child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
00138                         new_arg, 0, NULL);
00139         if (child == 0)
00140             pg_log(PG_FATAL, "could not create worker thread: %s\n", strerror(errno));
00141 
00142         thread_handles[parallel_jobs-1] = child;
00143 #endif
00144     }
00145 
00146     return;
00147 }
00148 
00149 
00150 #ifdef WIN32
00151 DWORD
00152 win32_exec_prog(exec_thread_arg *args)
00153 {
00154     int ret;
00155 
00156     ret = !exec_prog(args->log_file, args->opt_log_file, true, "%s", args->cmd);
00157 
00158     /* terminates thread */
00159     return ret;
00160 }
00161 #endif
00162 
00163 
00164 /*
00165  *  parallel_transfer_all_new_dbs
00166  *
00167  *  This has the same API as transfer_all_new_dbs, except it does parallel execution
00168  *  by transfering multiple tablespaces in parallel
00169  */
00170 void parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr,
00171                                    char *old_pgdata, char *new_pgdata,
00172                                    char *old_tablespace)
00173 {
00174 #ifndef WIN32
00175     pid_t       child;
00176 #else
00177     HANDLE      child;
00178     transfer_thread_arg *new_arg;
00179 #endif
00180 
00181     if (user_opts.jobs <= 1)
00182         /* throw_error must be true to allow jobs */
00183         transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL);
00184     else
00185     {
00186         /* parallel */
00187 #ifdef WIN32
00188         cur_thread_args = (void **)transfer_thread_args;
00189 #endif
00190         /* harvest any dead children */
00191         while (reap_child(false) == true)
00192             ;
00193 
00194         /* must we wait for a dead child? */
00195         if (parallel_jobs >= user_opts.jobs)
00196             reap_child(true);
00197             
00198         /* set this before we start the job */
00199         parallel_jobs++;
00200     
00201         /* Ensure stdio state is quiesced before forking */
00202         fflush(NULL);
00203 
00204 #ifndef WIN32
00205         child = fork();
00206         if (child == 0)
00207         {
00208             transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata,
00209                                  old_tablespace);
00210             /* if we take another exit path, it will be non-zero */
00211             /* use _exit to skip atexit() functions */
00212             _exit(0);
00213         }
00214         else if (child < 0)
00215             /* fork failed */
00216             pg_log(PG_FATAL, "could not create worker process: %s\n", strerror(errno));
00217 #else
00218         if (thread_handles == NULL)
00219         {
00220             int i;
00221 
00222             thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
00223             transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *));
00224 
00225             /*
00226              *  For safety and performance, we keep the args allocated during
00227              *  the entire life of the process, and we don't free the args
00228              *  in a thread different from the one that allocated it.
00229              */
00230             for (i = 0; i < user_opts.jobs; i++)
00231                 transfer_thread_args[i] = pg_malloc(sizeof(transfer_thread_arg));
00232         }
00233 
00234         /* use first empty array element */
00235         new_arg = transfer_thread_args[parallel_jobs-1];
00236 
00237         /* Can only pass one pointer into the function, so use a struct */
00238         new_arg->old_db_arr = old_db_arr;
00239         new_arg->new_db_arr = new_db_arr;
00240         strcpy(new_arg->old_pgdata, old_pgdata);
00241         strcpy(new_arg->new_pgdata, new_pgdata);
00242         strcpy(new_arg->old_tablespace, old_tablespace);
00243 
00244         child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
00245                         new_arg, 0, NULL);
00246         if (child == 0)
00247             pg_log(PG_FATAL, "could not create worker thread: %s\n", strerror(errno));
00248 
00249         thread_handles[parallel_jobs-1] = child;
00250 #endif
00251     }
00252 
00253     return;
00254 }
00255 
00256 
00257 #ifdef WIN32
00258 DWORD
00259 win32_transfer_all_new_dbs(transfer_thread_arg *args)
00260 {
00261     transfer_all_new_dbs(args->old_db_arr, args->new_db_arr, args->old_pgdata,
00262                          args->new_pgdata, args->old_tablespace);
00263 
00264     /* terminates thread */
00265     return 0;
00266 }
00267 #endif
00268 
00269 
00270 /*
00271  *  collect status from a completed worker child
00272  */
00273 bool
00274 reap_child(bool wait_for_child)
00275 {
00276 #ifndef WIN32
00277     int work_status;
00278     int ret;
00279 #else
00280     int             thread_num;
00281     DWORD           res;
00282 #endif
00283 
00284     if (user_opts.jobs <= 1 || parallel_jobs == 0)
00285         return false;
00286 
00287 #ifndef WIN32
00288     ret = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
00289 
00290     /* no children or, for WNOHANG, no dead children */
00291     if (ret <= 0 || !WIFEXITED(work_status))
00292         return false;
00293 
00294     if (WEXITSTATUS(work_status) != 0)
00295         pg_log(PG_FATAL, "child worker exited abnormally: %s\n", strerror(errno));
00296 
00297 #else
00298     /* wait for one to finish */
00299     thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles,
00300                     false, wait_for_child ? INFINITE : 0);
00301 
00302     if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED)
00303         return false;
00304 
00305     /* compute thread index in active_threads */
00306     thread_num -= WAIT_OBJECT_0;
00307     
00308     /* get the result */
00309     GetExitCodeThread(thread_handles[thread_num], &res);
00310     if (res != 0)
00311         pg_log(PG_FATAL, "child worker exited abnormally: %s\n", strerror(errno));
00312 
00313     /* dispose of handle to stop leaks */
00314     CloseHandle(thread_handles[thread_num]);
00315 
00316     /*  Move last slot into dead child's position */
00317     if (thread_num != parallel_jobs - 1)
00318     {
00319         void *tmp_args;
00320     
00321         thread_handles[thread_num] = thread_handles[parallel_jobs - 1];
00322 
00323         /*
00324          *  We must swap the arg struct pointers because the thread we
00325          *  just moved is active, and we must make sure it is not
00326          *  reused by the next created thread.  Instead, the new thread
00327          *  will use the arg struct of the thread that just died.
00328          */
00329         tmp_args = cur_thread_args[thread_num];
00330         cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1];
00331         cur_thread_args[parallel_jobs - 1] = tmp_args;
00332     }
00333 #endif
00334 
00335     /* do this after job has been removed */
00336     parallel_jobs--;
00337 
00338     return true;
00339 }