2 * @brief Class representing worker process.
4 /* Copyright (C) 2005-2023 Olly Betts
5 * Copyright (C) 2019 Bruno Baruffaldi
7 * This program is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License as
9 * published by the Free Software Foundation; either version 2 of the
10 * License, or (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
26 #include "worker_comms.h"
32 #include "safefcntl.h"
33 #include "safeunistd.h"
34 #include "safesyssocket.h"
35 #include "safesyswait.h"
36 #include "safesysexits.h"
39 #include "closefrom.h"
43 #include "pkglibbindir.h"
47 # include "safewindows.h"
52 bool Worker::ignoring_sigpipe
= false;
55 Worker::start_worker_subprocess()
57 static bool keep_stderr
= (getenv("XAPIAN_OMEGA_DEBUG_WORKERS") != nullptr);
59 if (error_prefix
.empty()) {
60 // The first time we're called, set error_prefix to the leafname of
61 // filter_module followed by ": ", and qualify filter_module if it's
63 auto slash
= filter_module
.rfind('/');
64 if (slash
== string::npos
) {
65 error_prefix
= std::move(filter_module
);
66 // Look for unqualified filters in pkglibbindir.
67 filter_module
= get_pkglibbindir();
69 filter_module
+= error_prefix
;
71 error_prefix
.assign(filter_module
, slash
+ 1);
76 #if defined HAVE_SOCKETPAIR && defined HAVE_FORK
78 if (socketpair(AF_UNIX
, SOCK_STREAM
, PF_UNSPEC
, fds
) < 0) {
79 error
= string("socketpair failed: ") + strerror(errno
);
88 // Connect pipe to fd 3. Don't use stdin and stdout in case the
89 // filter library tries to read from stdin or write debug or progress
93 // Make sure we don't hang on to open files which may get deleted but
94 // not have their disk space released until we exit.
97 // Connect stdin, stdout and (conditionally) stderr to /dev/null.
98 int devnull
= open("/dev/null", O_RDWR
);
103 if (devnull
> 3) close(devnull
);
105 // FIXME: For filters which support a file descriptor as input, we
106 // could open the file here, and pass the file descriptor across the
107 // socket to the worker process using sendmsg(). Then the worker
108 // process could be chroot()-ed to a sandbox directory, which means
109 // we're reasonably protected from security bugs in the filter.
111 #ifdef HAVE_SETRLIMIT
112 #if defined RLIMIT_AS || defined RLIMIT_VMEM || defined RLIMIT_DATA
113 // Set a memory limit if it is possible
114 long mem
= get_free_physical_memory();
116 struct rlimit ram_limit
= {
117 static_cast<rlim_t
>(mem
),
121 // Limit size of the process's virtual memory
122 // (address space) in bytes.
123 setrlimit(RLIMIT_AS
, &ram_limit
);
124 #elif defined RLIMIT_VMEM
125 // Limit size of a process's mapped address space in bytes.
126 setrlimit(RLIMIT_VMEM
, &ram_limit
);
128 // Limit size of the process's data segment.
129 setrlimit(RLIMIT_DATA
, &ram_limit
);
134 // Replacing the current process image with a new process image
135 const char* mod
= filter_module
.c_str();
136 execl(mod
, mod
, static_cast<void*>(NULL
));
141 int fork_errno
= errno
;
145 error
= string("fork failed: ") + strerror(fork_errno
);
149 sockt
= fdopen(fds
[0], "r+");
151 if (!ignoring_sigpipe
) {
152 ignoring_sigpipe
= true;
153 signal(SIGPIPE
, SIG_IGN
);
157 #elif defined __WIN32__
158 LARGE_INTEGER counter
;
159 // QueryPerformanceCounter() will always succeed on XP and later
160 // and gives us a counter which increments each CPU clock cycle
161 // on modern hardware (Pentium or newer).
162 QueryPerformanceCounter(&counter
);
164 snprintf(pipename
, sizeof(pipename
),
165 "\\\\.\\pipe\\xapian-omega-worker-%lx-%lx_%" PRIx64
,
166 static_cast<unsigned long>(GetCurrentProcessId()),
167 static_cast<unsigned long>(GetCurrentThreadId()),
168 static_cast<unsigned long long>(counter
.QuadPart
));
169 pipename
[sizeof(pipename
) - 1] = '\0';
170 // Create a pipe so we can read stdout from the child process.
171 HANDLE hPipe
= CreateNamedPipe(pipename
,
172 PIPE_ACCESS_DUPLEX
|FILE_FLAG_OVERLAPPED
,
174 1, 4096, 4096, NMPWAIT_USE_DEFAULT_WAIT
,
177 if (hPipe
== INVALID_HANDLE_VALUE
) {
178 error
= "CreateNamedPipe failed: " + str(GetLastError());
182 HANDLE hClient
= CreateFile(pipename
,
183 GENERIC_READ
|GENERIC_WRITE
, 0, NULL
,
185 FILE_FLAG_OVERLAPPED
, NULL
);
187 if (hClient
== INVALID_HANDLE_VALUE
) {
188 error
= "CreateFile failed: " + str(GetLastError());
192 if (!ConnectNamedPipe(hPipe
, NULL
) &&
193 GetLastError() != ERROR_PIPE_CONNECTED
) {
194 error
= "ConnectNamedPipe failed: " + str(GetLastError());
198 // Set the appropriate handles to be inherited by the child process.
199 SetHandleInformation(hClient
, HANDLE_FLAG_INHERIT
, 1);
201 // Create the child process.
202 PROCESS_INFORMATION procinfo
;
203 memset(&procinfo
, 0, sizeof(PROCESS_INFORMATION
));
205 STARTUPINFO startupinfo
;
206 memset(&startupinfo
, 0, sizeof(STARTUPINFO
));
207 startupinfo
.cb
= sizeof(STARTUPINFO
);
208 // FIXME: Is NULL the way to say "/dev/null"?
209 // It's what GetStdHandle() is documented to return if "an application does
210 // not have associated standard handles"...
211 startupinfo
.hStdError
= keep_stderr
? GetStdHandle(STD_ERROR_HANDLE
) : NULL
;
212 startupinfo
.hStdOutput
= hClient
;
213 startupinfo
.hStdInput
= hClient
;
214 startupinfo
.dwFlags
|= STARTF_USESTDHANDLES
;
216 string cmdline
{filter_module
};
217 // For some reason Windows wants a modifiable command line so we
218 // pass `&cmdline[0]` rather than `cmdline.c_str()`.
219 BOOL ok
= CreateProcess(filter_module
.c_str(), &cmdline
[0],
221 &startupinfo
, &procinfo
);
223 if (GetLastError() == ERROR_FILE_NOT_FOUND
) {
224 error
= error_prefix
+ "failed to run helper";
225 filter_module
= string();
228 error
= "CreateProcess failed: " + str(GetLastError());
232 CloseHandle(hClient
);
233 CloseHandle(procinfo
.hThread
);
234 child
= procinfo
.hProcess
;
236 sockt
= _fdopen(_open_osfhandle(intptr_t(hPipe
), O_RDWR
|O_BINARY
), "r+");
240 # error Omega needs porting to this platform
245 Worker::extract(const std::string
& filename
,
246 const std::string
& mimetype
,
249 std::string
& keywords
,
254 std::string
& message_id
,
258 if (filter_module
.empty()) {
259 error
= error_prefix
+ "hard failed earlier in the current run";
265 // Check if the worker process is still alive - if it is, waitpid()
266 // with WNOHANG returns 0.
268 if (waitpid(child
, &status
, WNOHANG
) != 0) {
272 #elif defined __WIN32__
273 // Check if the worker process is still alive by trying to wait for it
274 // with a timeout of 0ms.
275 if (WaitForSingleObject(child
, 0) != WAIT_TIMEOUT
) {
280 # error Omega needs porting to this platform
284 int r
= start_worker_subprocess();
285 if (r
!= 0) return r
;
288 string attachment_filename
;
289 // Send a filename and wait for the reply.
290 if (write_string(sockt
, filename
) && write_string(sockt
, mimetype
)) {
292 int field_code
= getc(sockt
);
294 switch (field_code
) {
295 case FIELD_PAGE_COUNT
: {
297 if (!read_unsigned(sockt
, u_pages
)) {
300 pages
= int(u_pages
);
303 case FIELD_CREATED_DATE
: {
305 if (!read_unsigned(sockt
, u_created
)) {
308 created
= time_t(long(u_created
));
332 case FIELD_MESSAGE_ID
:
336 if (!read_string(sockt
, error
)) goto comms_error
;
337 // Fields shouldn't be empty but the protocol allows them to be.
339 error
= error_prefix
+ "Couldn't extract text";
341 error
.insert(0, error_prefix
);
347 case FIELD_ATTACHMENT
:
348 value
= &attachment_filename
;
351 error
= error_prefix
+ "Unknown field code ";
352 error
+= str(field_code
);
355 if (value
->empty()) {
356 // First instance of this field for this document.
357 if (!read_string(sockt
, *value
)) break;
359 // Repeat instance of this field.
361 if (!read_string(sockt
, s
)) break;
365 if (field_code
== FIELD_ATTACHMENT
) {
366 // FIXME: Do something more useful with attachment_filename!
367 attachment_filename
.clear();
373 error
= error_prefix
;
374 // Check if the worker process already died, so we can report if it
375 // was killed by a segmentation fault, etc.
378 int result
= waitpid(child
, &status
, WNOHANG
);
379 int waitpid_errno
= errno
;
385 // The worker is still alive, so terminate it.
386 kill(child
, SIGTERM
);
387 waitpid(child
, &status
, 0);
389 } else if (result
< 0) {
390 // waitpid() failed with an error.
392 error
+= strerror(waitpid_errno
);
393 } else if (WIFEXITED(status
)) {
394 int rc
= WEXITSTATUS(status
);
396 case OMEGA_EX_SOCKET_READ_ERROR
:
397 error
+= "subprocess failed to read data from us";
399 case OMEGA_EX_SOCKET_WRITE_ERROR
:
400 error
+= "subprocess failed to write data to us";
403 error
+= "timed out";
406 error
+= "failed to initialise";
407 filter_module
= string();
410 error
+= "failed to run helper";
411 filter_module
= string();
414 error
+= "exited with status ";
419 error
+= "killed by signal ";
420 error
+= str(WTERMSIG(status
));
422 #elif defined __WIN32__
423 DWORD result
= WaitForSingleObject(child
, 0);
428 if (result
== WAIT_TIMEOUT
) {
429 // The worker is still alive, so terminate it. We need to specify an
430 // exit code here, 255 is an arbitrary choice.
431 TerminateProcess(child
, 255);
432 WaitForSingleObject(child
, INFINITE
);
435 } else if (result
== WAIT_FAILED
) {
436 // WaitForSingleObject() failed in an unexpected way.
438 error
+= str(GetLastError());
442 while (GetExitCodeProcess(child
, &rc
) && rc
== STILL_ACTIVE
) {
446 case OMEGA_EX_SOCKET_READ_ERROR
:
447 error
+= "subprocess failed to read data from us";
449 case OMEGA_EX_SOCKET_WRITE_ERROR
:
450 error
+= "subprocess failed to write data to us";
453 error
+= "timed out";
456 error
+= "failed to initialise";
457 filter_module
= string();
460 error
+= "failed to run helper";
461 filter_module
= string();
464 error
+= "exited with status ";
470 # error Omega needs porting to this platform