Skip to content
Snippets Groups Projects
Commit b1e98538 authored by Brad King's avatar Brad King
Browse files

ENH: UNIX implementation of process pipeline.

parent cce4320a
No related branches found
No related tags found
No related merge requests found
......@@ -46,23 +46,35 @@ timeout for program even when it closes stdout and stderr.
#include <signal.h> /* sigaction */
/* The number of pipes for the child's output. The standard stdout
and stderr pipes are the first two. One more pipe is used for the
child to report errors to the parent before the real process is
invoked. */
and stderr pipes are the first two. One more pipe is used to
detect when the child process has terminated. The third pipe is
not given to the child process, so it cannot close it until it
terminates. */
#define KWSYSPE_PIPE_COUNT 3
#define KWSYSPE_PIPE_STDOUT 0
#define KWSYSPE_PIPE_STDERR 1
#define KWSYSPE_PIPE_ERROR 2
#define KWSYSPE_PIPE_TERM 2
/* The maximum amount to read from a pipe at a time. */
#define KWSYSPE_PIPE_BUFFER_SIZE 1024
typedef struct timeval kwsysProcessTime;
typedef struct kwsysProcessCreateInformation_s
{
int stdin;
int stdout;
int stderr;
int term;
int error[2];
} kwsysProcessCreateInformation;
/*--------------------------------------------------------------------------*/
static void kwsysProcessInitialize(kwsysProcess* cp);
static int kwsysProcessInitialize(kwsysProcess* cp);
static void kwsysProcessCleanup(kwsysProcess* cp, int error);
static void kwsysProcessCleanupDescriptor(int* pfd);
static int kwsysProcessCreate(kwsysProcess* cp, int index,
kwsysProcessCreateInformation* si, int* readEnd);
static int kwsysProcessGetTimeoutTime(kwsysProcess* cp, double* userTimeout,
kwsysProcessTime* timeoutTime);
static int kwsysProcessGetTimeoutLeft(kwsysProcessTime* timeoutTime,
......@@ -73,7 +85,7 @@ static kwsysProcessTime kwsysProcessTimeFromDouble(double d);
static int kwsysProcessTimeLess(kwsysProcessTime in1, kwsysProcessTime in2);
static kwsysProcessTime kwsysProcessTimeAdd(kwsysProcessTime in1, kwsysProcessTime in2);
static kwsysProcessTime kwsysProcessTimeSubtract(kwsysProcessTime in1, kwsysProcessTime in2);
static void kwsysProcessChildErrorExit(kwsysProcess* cp);
static void kwsysProcessChildErrorExit(int errorPipe);
static void kwsysProcessRestoreDefaultSignalHandlers();
/*--------------------------------------------------------------------------*/
......@@ -87,17 +99,14 @@ struct kwsysProcess_s
/* Descriptors for the read ends of the child's output pipes. */
int PipeReadEnds[KWSYSPE_PIPE_COUNT];
/* Descriptors for the write ends of the child's output pipes. */
int PipeWriteEnds[KWSYSPE_PIPE_COUNT];
/* Buffer for pipe data. */
char PipeBuffer[KWSYSPE_PIPE_BUFFER_SIZE];
/* Process ID returned by the fork. */
pid_t ForkPID;
/* Process IDs returned by the calls to fork. */
pid_t* ForkPIDs;
/* Flag for whether the child reported an error. */
int ChildError;
/* Flag for whether the children were terminated by a faild select. */
int SelectError;
/* The timeout length. */
double Timeout;
......@@ -141,7 +150,9 @@ struct kwsysProcess_s
/* Buffer for error message in case of failure. */
char ErrorMessage[KWSYSPE_PIPE_BUFFER_SIZE+1];
int ErrorMessageLength;
/* The exit codes of each child process in the pipeline. */
int* CommandExitCodes;
};
/*--------------------------------------------------------------------------*/
......@@ -170,6 +181,10 @@ void kwsysProcess_Delete(kwsysProcess* cp)
/* Free memory. */
kwsysProcess_SetCommand(cp, 0);
kwsysProcess_SetWorkingDirectory(cp, 0);
if(cp->CommandExitCodes)
{
free(cp->CommandExitCodes);
}
free(cp);
}
......@@ -359,6 +374,7 @@ void kwsysProcess_Execute(kwsysProcess* cp)
{
int i;
struct sigaction newSigChldAction;
kwsysProcessCreateInformation si = {-1, -1, -1, -1, {-1, -1}};
/* Do not execute a second copy simultaneously. */
if(cp->State == kwsysProcess_State_Executing)
......@@ -367,7 +383,12 @@ void kwsysProcess_Execute(kwsysProcess* cp)
}
/* Initialize the control structure for a new process. */
kwsysProcessInitialize(cp);
if(!kwsysProcessInitialize(cp))
{
strcpy(cp->ErrorMessage, "Out of memory");
cp->State = kwsysProcess_State_Error;
return;
}
/* We want no special handling of SIGCHLD. Repeat call until it is
not interrupted. */
......@@ -376,29 +397,37 @@ void kwsysProcess_Execute(kwsysProcess* cp)
while((sigaction(SIGCHLD, &newSigChldAction, &cp->OldSigChldAction) < 0) &&
(errno == EINTR));
/* Create pipes for subprocess output. */
for(i=0; i < KWSYSPE_PIPE_COUNT; ++i)
/* Setup the stderr and termination pipes to be shared by all processes. */
for(i=KWSYSPE_PIPE_STDERR; i < KWSYSPE_PIPE_COUNT; ++i)
{
int p[2];
/* Create the pipe. */
int p[2];
if(pipe(p) < 0)
{
kwsysProcessCleanup(cp, 1);
return;
}
/* Store the pipe. */
cp->PipeReadEnds[i] = p[0];
if(i == KWSYSPE_PIPE_STDERR)
{
si.stderr = p[1];
}
else
{
si.term = p[1];
}
/* Set close-on-exec flag on the pipe's ends. */
if((fcntl(p[0], F_SETFD, FD_CLOEXEC) < 0) ||
(fcntl(p[1], F_SETFD, FD_CLOEXEC) < 0))
{
kwsysProcessCleanup(cp, 1);
kwsysProcessCleanupDescriptor(&si.stderr);
kwsysProcessCleanupDescriptor(&si.term);
return;
}
/* Store the pipe. */
cp->PipeReadEnds[i] = p[0];
cp->PipeWriteEnds[i] = p[1];
}
/* The timeout period starts now. */
......@@ -406,62 +435,37 @@ void kwsysProcess_Execute(kwsysProcess* cp)
cp->TimeoutTime.tv_sec = -1;
cp->TimeoutTime.tv_usec = -1;
/* Fork off a child process. */
cp->ForkPID = fork();
if(cp->ForkPID < 0)
{
kwsysProcessCleanup(cp, 1);
return;
}
/* If this is the child process, run the real process. */
if(cp->ForkPID == 0)
/* Create the pipeline of processes. */
{
int readEnd = 0;
for(i=0; i < cp->NumberOfCommands; ++i)
{
/* We used to close stdin, but some programs do not like being run
without stdin. Just use whatever stdin the parent program is
using. */
/*close(0);*/
/* Setup the stdout/stderr pipes. */
dup2(cp->PipeWriteEnds[KWSYSPE_PIPE_STDOUT], 1);
dup2(cp->PipeWriteEnds[KWSYSPE_PIPE_STDERR], 2);
/* Clear the close-on-exec flag for stdout, stderr, and the child
error report pipe. All other pipe handles will be closed when
exec succeeds. */
fcntl(1, F_SETFD, 0);
fcntl(2, F_SETFD, 0);
fcntl(cp->PipeWriteEnds[KWSYSPE_PIPE_ERROR], F_SETFD, 0);
/* Restore all default signal handlers. */
kwsysProcessRestoreDefaultSignalHandlers();
/* Change to the working directory specified, if any. */
if(cp->WorkingDirectory)
if(!kwsysProcessCreate(cp, i, &si, &readEnd))
{
/* Some platforms specify that the chdir call may be
interrupted. Repeat the call until it finishes. */
int r;
while(((r = chdir(cp->WorkingDirectory)) < 0) && (errno == EINTR));
if(r < 0)
kwsysProcessCleanup(cp, 1);
/* Release resources that may have been allocated for this
process before an error occurred. */
kwsysProcessCleanupDescriptor(&readEnd);
if(i > 0)
{
/* Failure. Report error to parent and terminate. */
kwsysProcessChildErrorExit(cp);
kwsysProcessCleanupDescriptor(&si.stdin);
}
kwsysProcessCleanupDescriptor(&si.stdout);
kwsysProcessCleanupDescriptor(&si.stderr);
kwsysProcessCleanupDescriptor(&si.term);
kwsysProcessCleanupDescriptor(&si.error[0]);
kwsysProcessCleanupDescriptor(&si.error[1]);
return;
}
/* Execute the real process. If successful, this does not return. */
execvp(cp->Command[0], cp->Command);
/* Failure. Report error to parent and terminate. */
kwsysProcessChildErrorExit(cp);
}
/* Save a handle to the output pipe for the last process. */
cp->PipeReadEnds[KWSYSPE_PIPE_STDOUT] = readEnd;
}
/* The parent process does not need the pipe write ends. */
for(i=0; i < KWSYSPE_PIPE_COUNT; ++i)
{
kwsysProcessCleanupDescriptor(&cp->PipeWriteEnds[i]);
}
/* The parent process does not need the output pipe write ends. */
kwsysProcessCleanupDescriptor(&si.stderr);
kwsysProcessCleanupDescriptor(&si.term);
/* All the pipes are now open. */
cp->PipesLeft = KWSYSPE_PIPE_COUNT;
......@@ -518,22 +522,9 @@ int kwsysProcess_WaitForData(kwsysProcess* cp, int pipes, char** data,
if(n > 0)
{
/* We have data on this pipe. */
if(i == KWSYSPE_PIPE_ERROR)
if(i == KWSYSPE_PIPE_TERM)
{
/* This is data on the special error reporting pipe. The
child process failed to execute the program. */
cp->ChildError = 1;
if(n > KWSYSPE_PIPE_BUFFER_SIZE - cp->ErrorMessageLength)
{
n = KWSYSPE_PIPE_BUFFER_SIZE - cp->ErrorMessageLength;
}
if(n > 0)
{
memcpy(cp->ErrorMessage+cp->ErrorMessageLength,
cp->PipeBuffer, n);
cp->ErrorMessageLength += n;
cp->ErrorMessage[cp->ErrorMessageLength] = 0;
}
/* This is data on the special termination pipe. Ignore it. */
}
else if(pipes & (1 << i))
{
......@@ -618,10 +609,10 @@ int kwsysProcess_WaitForData(kwsysProcess* cp, int pipes, char** data,
pipe buffer. */
strncpy(cp->ErrorMessage, strerror(errno), KWSYSPE_PIPE_BUFFER_SIZE);
/* Kill the child now. */
/* Kill the children now. */
kwsysProcess_Kill(cp);
cp->Killed = 0;
cp->ChildError = 1;
cp->SelectError = 1;
cp->PipesLeft = 0;
}
}
......@@ -656,7 +647,7 @@ int kwsysProcess_WaitForData(kwsysProcess* cp, int pipes, char** data,
}
else
{
/* The process timeout has expired. Kill the child now. */
/* The process timeout has expired. Kill the children now. */
kwsysProcess_Kill(cp);
cp->Killed = 0;
cp->TimeoutExpired = 1;
......@@ -693,19 +684,36 @@ int kwsysProcess_WaitForExit(kwsysProcess* cp, double* userTimeout)
}
}
/* Wait for the child to terminate. The process should have already
exited because KWSYSPE_PIPE_ERROR has been closed by this point.
Repeat the call until it is not interrupted. */
while(((result = waitpid(cp->ForkPID, &status, 0)) < 0) && (errno == EINTR));
if(result <= 0)
/* Wait for each child to terminate. The process should have
already exited because KWSYSPE_PIPE_TERM has been closed by this
point. Repeat the call until it is not interrupted. */
{
int i;
for(i=0; i < cp->NumberOfCommands; ++i)
{
while(((result = waitpid(cp->ForkPIDs[i],
&cp->CommandExitCodes[i], 0)) < 0) &&
(errno == EINTR));
if(result <= 0 && cp->State != kwsysProcess_State_Error)
{
/* Unexpected error. Report the first time this happens. */
strncpy(cp->ErrorMessage, strerror(errno), KWSYSPE_PIPE_BUFFER_SIZE);
cp->State = kwsysProcess_State_Error;
}
}
}
/* Check if there was an error in one of the waitpid calls. */
if(cp->State == kwsysProcess_State_Error)
{
/* Unexpected error. */
kwsysProcessCleanup(cp, 1);
/* The error message is already in its buffer. Tell
kwsysProcessCleanup to not create it. */
kwsysProcessCleanup(cp, 0);
return 1;
}
/* Check whether the child reported an error invoking the process. */
if(cp->ChildError)
if(cp->SelectError)
{
/* The error message is already in its buffer. Tell
kwsysProcessCleanup to not create it. */
......@@ -714,6 +722,9 @@ int kwsysProcess_WaitForExit(kwsysProcess* cp, double* userTimeout)
return 1;
}
/* Use the status of the last process in the pipeline. */
status = cp->CommandExitCodes[cp->NumberOfCommands-1];
/* Determine the outcome. */
if(cp->Killed)
{
......@@ -773,29 +784,35 @@ int kwsysProcess_WaitForExit(kwsysProcess* cp, double* userTimeout)
/*--------------------------------------------------------------------------*/
void kwsysProcess_Kill(kwsysProcess* cp)
{
int i;
/* Make sure we are executing a process. */
if(cp->State != kwsysProcess_State_Executing)
{
return;
}
/* Kill the child. */
/* Kill the children. */
cp->Killed = 1;
kill(cp->ForkPID, SIGKILL);
for(i=0; i < cp->NumberOfCommands; ++i)
{
if(cp->ForkPIDs[i])
{
kill(cp->ForkPIDs[i], SIGKILL);
}
}
}
/*--------------------------------------------------------------------------*/
/* Initialize a process control structure for kwsysProcess_Execute. */
static void kwsysProcessInitialize(kwsysProcess* cp)
static int kwsysProcessInitialize(kwsysProcess* cp)
{
int i;
for(i=0; i < KWSYSPE_PIPE_COUNT; ++i)
{
cp->PipeReadEnds[i] = -1;
cp->PipeWriteEnds[i] = -1;
}
cp->ForkPID = -1;
cp->ChildError = 0;
cp->SelectError = 0;
cp->StartTime.tv_sec = -1;
cp->StartTime.tv_usec = -1;
cp->TimeoutTime.tv_sec = -1;
......@@ -809,7 +826,30 @@ static void kwsysProcessInitialize(kwsysProcess* cp)
cp->ExitCode = 1;
cp->ExitValue = 1;
cp->ErrorMessage[0] = 0;
cp->ErrorMessageLength = 0;
if(cp->ForkPIDs)
{
free(cp->ForkPIDs);
}
cp->ForkPIDs = (pid_t*)malloc(sizeof(pid_t)*cp->NumberOfCommands);
if(!cp->ForkPIDs)
{
return 0;
}
memset(cp->ForkPIDs, 0, sizeof(pid_t)*cp->NumberOfCommands);
if(cp->CommandExitCodes)
{
free(cp->CommandExitCodes);
}
cp->CommandExitCodes = (int*)malloc(sizeof(int)*cp->NumberOfCommands);
if(!cp->CommandExitCodes)
{
return 0;
}
memset(cp->CommandExitCodes, 0, sizeof(int)*cp->NumberOfCommands);
return 1;
}
/*--------------------------------------------------------------------------*/
......@@ -819,22 +859,46 @@ static void kwsysProcessCleanup(kwsysProcess* cp, int error)
{
int i;
/* If cleaning up due to an error, report the error message. */
if(error)
{
strncpy(cp->ErrorMessage, strerror(errno), KWSYSPE_PIPE_BUFFER_SIZE);
/* We are cleaning up due to an error. Report the error message
if one has not been provided already. */
if(cp->ErrorMessage[0] == 0)
{
strncpy(cp->ErrorMessage, strerror(errno), KWSYSPE_PIPE_BUFFER_SIZE);
}
/* Set the error state. */
cp->State = kwsysProcess_State_Error;
/* Kill any children already started. */
if(cp->ForkPIDs)
{
for(i=0; i < cp->NumberOfCommands; ++i)
{
if(cp->ForkPIDs[i])
{
kill(cp->ForkPIDs[i], SIGKILL);
}
}
}
}
/* Restore the SIGCHLD handler. */
while((sigaction(SIGCHLD, &cp->OldSigChldAction, 0) < 0) &&
(errno == EINTR));
/* Free memory. */
if(cp->ForkPIDs)
{
free(cp->ForkPIDs);
cp->ForkPIDs = 0;
}
/* Close pipe handles. */
for(i=0; i < KWSYSPE_PIPE_COUNT; ++i)
{
kwsysProcessCleanupDescriptor(&cp->PipeReadEnds[i]);
kwsysProcessCleanupDescriptor(&cp->PipeWriteEnds[i]);
}
}
......@@ -851,6 +915,137 @@ static void kwsysProcessCleanupDescriptor(int* pfd)
}
}
/*--------------------------------------------------------------------------*/
int kwsysProcessCreate(kwsysProcess* cp, int index,
kwsysProcessCreateInformation* si, int* readEnd)
{
/* Setup the process's stdin. */
if(index > 0)
{
si->stdin = *readEnd;
*readEnd = 0;
}
else
{
si->stdin = 0;
}
/* Setup the process's stdout. */
{
/* Create the pipe. */
int p[2];
if(pipe(p) < 0)
{
return 0;
}
*readEnd = p[0];
si->stdout = p[1];
/* Set close-on-exec flag on the pipe's ends. */
if((fcntl(p[0], F_SETFD, FD_CLOEXEC) < 0) ||
(fcntl(p[1], F_SETFD, FD_CLOEXEC) < 0))
{
return 0;
}
}
/* Create the error reporting pipe. */
if(pipe(si->error) < 0)
{
return 0;
}
/* Set close-on-exec flag on the error pipe's write end. */
if(fcntl(si->error[1], F_SETFD, FD_CLOEXEC) < 0)
{
return 0;
}
/* Fork off a child process. */
cp->ForkPIDs[index] = fork();
if(cp->ForkPIDs[index] < 0)
{
return 0;
}
if(cp->ForkPIDs[index] == 0)
{
/* Close the read end of the error reporting pipe. */
close(si->error[0]);
/* Setup the stdin, stdout, and stderr pipes. */
if(index > 0)
{
dup2(si->stdin, 0);
}
dup2(si->stdout, 1);
dup2(si->stderr, 2);
/* Clear the close-on-exec flag for stdin, stdout, and stderr.
Also clear it for the termination pipe. All other pipe handles
will be closed when exec succeeds. */
fcntl(0, F_SETFD, 0);
fcntl(1, F_SETFD, 0);
fcntl(2, F_SETFD, 0);
fcntl(si->term, F_SETFD, 0);
/* Restore all default signal handlers. */
kwsysProcessRestoreDefaultSignalHandlers();
/* Change to the working directory specified, if any. */
if(cp->WorkingDirectory)
{
/* Some platforms specify that the chdir call may be
interrupted. Repeat the call until it finishes. */
int r;
while(((r = chdir(cp->WorkingDirectory)) < 0) && (errno == EINTR));
if(r < 0)
{
/* Failure. Report error to parent and terminate. */
kwsysProcessChildErrorExit(si->error[1]);
}
}
/* Execute the real process. If successful, this does not return. */
execvp(cp->Commands[index][0], cp->Commands[index]);
/* Failure. Report error to parent and terminate. */
kwsysProcessChildErrorExit(si->error[1]);
}
/* Close the write end of the error pipe. */
close(si->error[1]);
si->error[1] = 0;
/* Block until the child's exec call succeeds and closes the error
pipe or writes data to the pipe to report an error. */
{
int n;
/* Keep trying to read until the operation is not interrupted. */
while(((n = read(si->error[0], cp->ErrorMessage,
KWSYSPE_PIPE_BUFFER_SIZE)) < 0) && (errno == EINTR));
close(si->error[0]);
si->error[0] = 0;
if(n > 0)
{
/* The child failed to execute the process. */
return 0;
}
}
/* Successfully created this child process. */
if(index > 0)
{
/* The parent process does not need the input pipe read end. */
kwsysProcessCleanupDescriptor(&si->stdin);
}
/* The parent process does not need the output pipe write ends. */
kwsysProcessCleanupDescriptor(&si->stdout);
return 1;
}
/*--------------------------------------------------------------------------*/
/* Get the time at which either the process or user timeout will
expire. Returns 1 if the user timeout is first, and 0 otherwise. */
......@@ -975,14 +1170,14 @@ static kwsysProcessTime kwsysProcessTimeSubtract(kwsysProcessTime in1, kwsysProc
/* When the child process encounters an error before its program is
invoked, this is called to report the error to the parent and
exit. */
static void kwsysProcessChildErrorExit(kwsysProcess* cp)
static void kwsysProcessChildErrorExit(int errorPipe)
{
/* Construct the error message. */
char buffer[KWSYSPE_PIPE_BUFFER_SIZE];
strncpy(buffer, strerror(errno), KWSYSPE_PIPE_BUFFER_SIZE);
/* Report the error to the parent through the special pipe. */
write(cp->PipeWriteEnds[KWSYSPE_PIPE_ERROR], buffer, strlen(buffer));
write(errorPipe, buffer, strlen(buffer));
/* Terminate without cleanup. */
_exit(1);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment