00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 #include "postgres_fe.h"
00011
00012 #include "pg_upgrade.h"
00013
00014 #include <stdlib.h>
00015 #include <string.h>
00016 #include <sys/types.h>
00017 #include <sys/wait.h>
00018
00019 #ifdef WIN32
00020 #include <io.h>
00021 #endif
00022
00023 static int parallel_jobs;
00024
00025 #ifdef WIN32
00026
00027
00028
00029
00030
00031 HANDLE *thread_handles;
00032
00033 typedef struct {
00034 char log_file[MAXPGPATH];
00035 char opt_log_file[MAXPGPATH];
00036 char cmd[MAX_STRING];
00037 } exec_thread_arg;
00038
00039 typedef struct {
00040 DbInfoArr *old_db_arr;
00041 DbInfoArr *new_db_arr;
00042 char old_pgdata[MAXPGPATH];
00043 char new_pgdata[MAXPGPATH];
00044 char old_tablespace[MAXPGPATH];
00045 } transfer_thread_arg;
00046
00047 exec_thread_arg **exec_thread_args;
00048 transfer_thread_arg **transfer_thread_args;
00049
00050
00051 void **cur_thread_args;
00052
00053 DWORD win32_exec_prog(exec_thread_arg *args);
00054 DWORD win32_transfer_all_new_dbs(transfer_thread_arg *args);
00055
00056 #endif
00057
00058
00059
00060
00061
00062
00063
00064 void
00065 parallel_exec_prog(const char *log_file, const char *opt_log_file,
00066 const char *fmt,...)
00067 {
00068 va_list args;
00069 char cmd[MAX_STRING];
00070 #ifndef WIN32
00071 pid_t child;
00072 #else
00073 HANDLE child;
00074 exec_thread_arg *new_arg;
00075 #endif
00076
00077 va_start(args, fmt);
00078 vsnprintf(cmd, sizeof(cmd), fmt, args);
00079 va_end(args);
00080
00081 if (user_opts.jobs <= 1)
00082
00083 exec_prog(log_file, opt_log_file, true, "%s", cmd);
00084 else
00085 {
00086
00087 #ifdef WIN32
00088 cur_thread_args = (void **)exec_thread_args;
00089 #endif
00090
00091 while (reap_child(false) == true)
00092 ;
00093
00094
00095 if (parallel_jobs >= user_opts.jobs)
00096 reap_child(true);
00097
00098
00099 parallel_jobs++;
00100
00101
00102 fflush(NULL);
00103
00104 #ifndef WIN32
00105 child = fork();
00106 if (child == 0)
00107
00108 _exit(!exec_prog(log_file, opt_log_file, true, "%s", cmd));
00109 else if (child < 0)
00110
00111 pg_log(PG_FATAL, "could not create worker process: %s\n", strerror(errno));
00112 #else
00113 if (thread_handles == NULL)
00114 {
00115 int i;
00116
00117 thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
00118 exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *));
00119
00120
00121
00122
00123
00124
00125 for (i = 0; i < user_opts.jobs; i++)
00126 exec_thread_args[i] = pg_malloc(sizeof(exec_thread_arg));
00127 }
00128
00129
00130 new_arg = exec_thread_args[parallel_jobs-1];
00131
00132
00133 strcpy(new_arg->log_file, log_file);
00134 strcpy(new_arg->opt_log_file, opt_log_file);
00135 strcpy(new_arg->cmd, cmd);
00136
00137 child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
00138 new_arg, 0, NULL);
00139 if (child == 0)
00140 pg_log(PG_FATAL, "could not create worker thread: %s\n", strerror(errno));
00141
00142 thread_handles[parallel_jobs-1] = child;
00143 #endif
00144 }
00145
00146 return;
00147 }
00148
00149
00150 #ifdef WIN32
00151 DWORD
00152 win32_exec_prog(exec_thread_arg *args)
00153 {
00154 int ret;
00155
00156 ret = !exec_prog(args->log_file, args->opt_log_file, true, "%s", args->cmd);
00157
00158
00159 return ret;
00160 }
00161 #endif
00162
00163
00164
00165
00166
00167
00168
00169
00170 void parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr,
00171 char *old_pgdata, char *new_pgdata,
00172 char *old_tablespace)
00173 {
00174 #ifndef WIN32
00175 pid_t child;
00176 #else
00177 HANDLE child;
00178 transfer_thread_arg *new_arg;
00179 #endif
00180
00181 if (user_opts.jobs <= 1)
00182
00183 transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL);
00184 else
00185 {
00186
00187 #ifdef WIN32
00188 cur_thread_args = (void **)transfer_thread_args;
00189 #endif
00190
00191 while (reap_child(false) == true)
00192 ;
00193
00194
00195 if (parallel_jobs >= user_opts.jobs)
00196 reap_child(true);
00197
00198
00199 parallel_jobs++;
00200
00201
00202 fflush(NULL);
00203
00204 #ifndef WIN32
00205 child = fork();
00206 if (child == 0)
00207 {
00208 transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata,
00209 old_tablespace);
00210
00211
00212 _exit(0);
00213 }
00214 else if (child < 0)
00215
00216 pg_log(PG_FATAL, "could not create worker process: %s\n", strerror(errno));
00217 #else
00218 if (thread_handles == NULL)
00219 {
00220 int i;
00221
00222 thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
00223 transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *));
00224
00225
00226
00227
00228
00229
00230 for (i = 0; i < user_opts.jobs; i++)
00231 transfer_thread_args[i] = pg_malloc(sizeof(transfer_thread_arg));
00232 }
00233
00234
00235 new_arg = transfer_thread_args[parallel_jobs-1];
00236
00237
00238 new_arg->old_db_arr = old_db_arr;
00239 new_arg->new_db_arr = new_db_arr;
00240 strcpy(new_arg->old_pgdata, old_pgdata);
00241 strcpy(new_arg->new_pgdata, new_pgdata);
00242 strcpy(new_arg->old_tablespace, old_tablespace);
00243
00244 child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
00245 new_arg, 0, NULL);
00246 if (child == 0)
00247 pg_log(PG_FATAL, "could not create worker thread: %s\n", strerror(errno));
00248
00249 thread_handles[parallel_jobs-1] = child;
00250 #endif
00251 }
00252
00253 return;
00254 }
00255
00256
00257 #ifdef WIN32
00258 DWORD
00259 win32_transfer_all_new_dbs(transfer_thread_arg *args)
00260 {
00261 transfer_all_new_dbs(args->old_db_arr, args->new_db_arr, args->old_pgdata,
00262 args->new_pgdata, args->old_tablespace);
00263
00264
00265 return 0;
00266 }
00267 #endif
00268
00269
00270
00271
00272
00273 bool
00274 reap_child(bool wait_for_child)
00275 {
00276 #ifndef WIN32
00277 int work_status;
00278 int ret;
00279 #else
00280 int thread_num;
00281 DWORD res;
00282 #endif
00283
00284 if (user_opts.jobs <= 1 || parallel_jobs == 0)
00285 return false;
00286
00287 #ifndef WIN32
00288 ret = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
00289
00290
00291 if (ret <= 0 || !WIFEXITED(work_status))
00292 return false;
00293
00294 if (WEXITSTATUS(work_status) != 0)
00295 pg_log(PG_FATAL, "child worker exited abnormally: %s\n", strerror(errno));
00296
00297 #else
00298
00299 thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles,
00300 false, wait_for_child ? INFINITE : 0);
00301
00302 if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED)
00303 return false;
00304
00305
00306 thread_num -= WAIT_OBJECT_0;
00307
00308
00309 GetExitCodeThread(thread_handles[thread_num], &res);
00310 if (res != 0)
00311 pg_log(PG_FATAL, "child worker exited abnormally: %s\n", strerror(errno));
00312
00313
00314 CloseHandle(thread_handles[thread_num]);
00315
00316
00317 if (thread_num != parallel_jobs - 1)
00318 {
00319 void *tmp_args;
00320
00321 thread_handles[thread_num] = thread_handles[parallel_jobs - 1];
00322
00323
00324
00325
00326
00327
00328
00329 tmp_args = cur_thread_args[thread_num];
00330 cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1];
00331 cur_thread_args[parallel_jobs - 1] = tmp_args;
00332 }
00333 #endif
00334
00335
00336 parallel_jobs--;
00337
00338 return true;
00339 }