Fix tg_termpos1 for 64-bit termpos
[xapian.git] / xapian-applications / omega / worker.cc
blobf7e10640875b07d9c74fd1bfbc52288da46b1675
1 /** @file
2 * @brief Class representing worker process.
3 */
4 /* Copyright (C) 2005-2023 Olly Betts
5 * Copyright (C) 2019 Bruno Baruffaldi
7 * This program is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License as
9 * published by the Free Software Foundation; either version 2 of the
10 * License, or (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
20 * USA
23 #include <config.h>
25 #include "worker.h"
26 #include "worker_comms.h"
28 #include <cinttypes>
29 #include <csignal>
30 #include <cstring>
31 #include <cerrno>
32 #include "safefcntl.h"
33 #include "safeunistd.h"
34 #include "safesyssocket.h"
35 #include "safesyswait.h"
36 #include "safesysexits.h"
37 #include <utility>
39 #include "closefrom.h"
40 #include "freemem.h"
41 #include "handler.h"
42 #include "parseint.h"
43 #include "pkglibbindir.h"
44 #include "str.h"
46 #ifdef __WIN32__
47 # include "safewindows.h"
48 #endif
50 using namespace std;
52 bool Worker::ignoring_sigpipe = false;
54 int
55 Worker::start_worker_subprocess()
57 static bool keep_stderr = (getenv("XAPIAN_OMEGA_DEBUG_WORKERS") != nullptr);
59 if (error_prefix.empty()) {
60 // The first time we're called, set error_prefix to the leafname of
61 // filter_module followed by ": ", and qualify filter_module if it's
62 // just a leafname.
63 auto slash = filter_module.rfind('/');
64 if (slash == string::npos) {
65 error_prefix = std::move(filter_module);
66 // Look for unqualified filters in pkglibbindir.
67 filter_module = get_pkglibbindir();
68 filter_module += '/';
69 filter_module += error_prefix;
70 } else {
71 error_prefix.assign(filter_module, slash + 1);
73 error_prefix += ": ";
76 #if defined HAVE_SOCKETPAIR && defined HAVE_FORK
77 int fds[2];
78 if (socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC, fds) < 0) {
79 error = string("socketpair failed: ") + strerror(errno);
80 return 1;
83 child = fork();
84 if (child == 0) {
85 // Child process.
86 close(fds[0]);
88 // Connect pipe to fd 3. Don't use stdin and stdout in case the
89 // filter library tries to read from stdin or write debug or progress
90 // info to stdout.
91 dup2(fds[1], 3);
93 // Make sure we don't hang on to open files which may get deleted but
94 // not have their disk space released until we exit.
95 closefrom(4);
97 // Connect stdin, stdout and (conditionally) stderr to /dev/null.
98 int devnull = open("/dev/null", O_RDWR);
99 dup2(devnull, 0);
100 dup2(devnull, 1);
101 if (!keep_stderr)
102 dup2(devnull, 2);
103 if (devnull > 3) close(devnull);
105 // FIXME: For filters which support a file descriptor as input, we
106 // could open the file here, and pass the file descriptor across the
107 // socket to the worker process using sendmsg(). Then the worker
108 // process could be chroot()-ed to a sandbox directory, which means
109 // we're reasonably protected from security bugs in the filter.
111 #ifdef HAVE_SETRLIMIT
112 #if defined RLIMIT_AS || defined RLIMIT_VMEM || defined RLIMIT_DATA
113 // Set a memory limit if it is possible
114 long mem = get_free_physical_memory();
115 if (mem > 0) {
116 struct rlimit ram_limit = {
117 static_cast<rlim_t>(mem),
118 RLIM_INFINITY
120 #ifdef RLIMIT_AS
121 // Limit size of the process's virtual memory
122 // (address space) in bytes.
123 setrlimit(RLIMIT_AS, &ram_limit);
124 #elif defined RLIMIT_VMEM
125 // Limit size of a process's mapped address space in bytes.
126 setrlimit(RLIMIT_VMEM, &ram_limit);
127 #else
128 // Limit size of the process's data segment.
129 setrlimit(RLIMIT_DATA, &ram_limit);
130 #endif
132 #endif
133 #endif
134 // Replacing the current process image with a new process image
135 const char* mod = filter_module.c_str();
136 execl(mod, mod, static_cast<void*>(NULL));
137 _exit(EX_OSERR);
140 // Main process
141 int fork_errno = errno;
142 close(fds[1]);
143 if (child == -1) {
144 close(fds[0]);
145 error = string("fork failed: ") + strerror(fork_errno);
146 return 1;
149 sockt = fdopen(fds[0], "r+");
151 if (!ignoring_sigpipe) {
152 ignoring_sigpipe = true;
153 signal(SIGPIPE, SIG_IGN);
156 return 0;
157 #elif defined __WIN32__
158 LARGE_INTEGER counter;
159 // QueryPerformanceCounter() will always succeed on XP and later
160 // and gives us a counter which increments each CPU clock cycle
161 // on modern hardware (Pentium or newer).
162 QueryPerformanceCounter(&counter);
163 char pipename[256];
164 snprintf(pipename, sizeof(pipename),
165 "\\\\.\\pipe\\xapian-omega-worker-%lx-%lx_%" PRIx64,
166 static_cast<unsigned long>(GetCurrentProcessId()),
167 static_cast<unsigned long>(GetCurrentThreadId()),
168 static_cast<unsigned long long>(counter.QuadPart));
169 pipename[sizeof(pipename) - 1] = '\0';
170 // Create a pipe so we can read stdout from the child process.
171 HANDLE hPipe = CreateNamedPipe(pipename,
172 PIPE_ACCESS_DUPLEX|FILE_FLAG_OVERLAPPED,
174 1, 4096, 4096, NMPWAIT_USE_DEFAULT_WAIT,
175 NULL);
177 if (hPipe == INVALID_HANDLE_VALUE) {
178 error = "CreateNamedPipe failed: " + str(GetLastError());
179 return 1;
182 HANDLE hClient = CreateFile(pipename,
183 GENERIC_READ|GENERIC_WRITE, 0, NULL,
184 OPEN_EXISTING,
185 FILE_FLAG_OVERLAPPED, NULL);
187 if (hClient == INVALID_HANDLE_VALUE) {
188 error = "CreateFile failed: " + str(GetLastError());
189 return 1;
192 if (!ConnectNamedPipe(hPipe, NULL) &&
193 GetLastError() != ERROR_PIPE_CONNECTED) {
194 error = "ConnectNamedPipe failed: " + str(GetLastError());
195 return 1;
198 // Set the appropriate handles to be inherited by the child process.
199 SetHandleInformation(hClient, HANDLE_FLAG_INHERIT, 1);
201 // Create the child process.
202 PROCESS_INFORMATION procinfo;
203 memset(&procinfo, 0, sizeof(PROCESS_INFORMATION));
205 STARTUPINFO startupinfo;
206 memset(&startupinfo, 0, sizeof(STARTUPINFO));
207 startupinfo.cb = sizeof(STARTUPINFO);
208 // FIXME: Is NULL the way to say "/dev/null"?
209 // It's what GetStdHandle() is documented to return if "an application does
210 // not have associated standard handles"...
211 startupinfo.hStdError = keep_stderr ? GetStdHandle(STD_ERROR_HANDLE) : NULL;
212 startupinfo.hStdOutput = hClient;
213 startupinfo.hStdInput = hClient;
214 startupinfo.dwFlags |= STARTF_USESTDHANDLES;
216 string cmdline{filter_module};
217 // For some reason Windows wants a modifiable command line so we
218 // pass `&cmdline[0]` rather than `cmdline.c_str()`.
219 BOOL ok = CreateProcess(filter_module.c_str(), &cmdline[0],
220 0, 0, TRUE, 0, 0, 0,
221 &startupinfo, &procinfo);
222 if (!ok) {
223 if (GetLastError() == ERROR_FILE_NOT_FOUND) {
224 error = error_prefix + "failed to run helper";
225 filter_module = string();
226 return -1;
228 error = "CreateProcess failed: " + str(GetLastError());
229 return 1;
232 CloseHandle(hClient);
233 CloseHandle(procinfo.hThread);
234 child = procinfo.hProcess;
236 sockt = _fdopen(_open_osfhandle(intptr_t(hPipe), O_RDWR|O_BINARY), "r+");
238 return 0;
239 #else
240 # error Omega needs porting to this platform
241 #endif
245 Worker::extract(const std::string& filename,
246 const std::string& mimetype,
247 std::string& dump,
248 std::string& title,
249 std::string& keywords,
250 std::string& author,
251 std::string& to,
252 std::string& cc,
253 std::string& bcc,
254 std::string& message_id,
255 int& pages,
256 time_t& created)
258 if (filter_module.empty()) {
259 error = error_prefix + "hard failed earlier in the current run";
260 return -1;
263 if (sockt) {
264 #ifdef HAVE_WAITPID
265 // Check if the worker process is still alive - if it is, waitpid()
266 // with WNOHANG returns 0.
267 int status;
268 if (waitpid(child, &status, WNOHANG) != 0) {
269 fclose(sockt);
270 sockt = NULL;
272 #elif defined __WIN32__
273 // Check if the worker process is still alive by trying to wait for it
274 // with a timeout of 0ms.
275 if (WaitForSingleObject(child, 0) != WAIT_TIMEOUT) {
276 fclose(sockt);
277 sockt = NULL;
279 #else
280 # error Omega needs porting to this platform
281 #endif
283 if (!sockt) {
284 int r = start_worker_subprocess();
285 if (r != 0) return r;
288 string attachment_filename;
289 // Send a filename and wait for the reply.
290 if (write_string(sockt, filename) && write_string(sockt, mimetype)) {
291 while (true) {
292 int field_code = getc(sockt);
293 string* value;
294 switch (field_code) {
295 case FIELD_PAGE_COUNT: {
296 unsigned u_pages;
297 if (!read_unsigned(sockt, u_pages)) {
298 goto comms_error;
300 pages = int(u_pages);
301 continue;
303 case FIELD_CREATED_DATE: {
304 unsigned u_created;
305 if (!read_unsigned(sockt, u_created)) {
306 goto comms_error;
308 created = time_t(long(u_created));
309 continue;
311 case FIELD_BODY:
312 value = &dump;
313 break;
314 case FIELD_TITLE:
315 value = &title;
316 break;
317 case FIELD_KEYWORDS:
318 value = &keywords;
319 break;
320 case FIELD_AUTHOR:
321 value = &author;
322 break;
323 case FIELD_TO:
324 value = &to;
325 break;
326 case FIELD_CC:
327 value = &cc;
328 break;
329 case FIELD_BCC:
330 value = &bcc;
331 break;
332 case FIELD_MESSAGE_ID:
333 value = &message_id;
334 break;
335 case FIELD_ERROR:
336 if (!read_string(sockt, error)) goto comms_error;
337 // Fields shouldn't be empty but the protocol allows them to be.
338 if (error.empty())
339 error = error_prefix + "Couldn't extract text";
340 else
341 error.insert(0, error_prefix);
342 return 1;
343 case FIELD_END:
344 return 0;
345 case EOF:
346 goto comms_error;
347 case FIELD_ATTACHMENT:
348 value = &attachment_filename;
349 break;
350 default:
351 error = error_prefix + "Unknown field code ";
352 error += str(field_code);
353 return 1;
355 if (value->empty()) {
356 // First instance of this field for this document.
357 if (!read_string(sockt, *value)) break;
358 } else {
359 // Repeat instance of this field.
360 string s;
361 if (!read_string(sockt, s)) break;
362 *value += ' ';
363 *value += s;
365 if (field_code == FIELD_ATTACHMENT) {
366 // FIXME: Do something more useful with attachment_filename!
367 attachment_filename.clear();
372 comms_error:
373 error = error_prefix;
374 // Check if the worker process already died, so we can report if it
375 // was killed by a segmentation fault, etc.
376 #ifdef HAVE_WAITPID
377 int status;
378 int result = waitpid(child, &status, WNOHANG);
379 int waitpid_errno = errno;
381 fclose(sockt);
382 sockt = NULL;
384 if (result == 0) {
385 // The worker is still alive, so terminate it.
386 kill(child, SIGTERM);
387 waitpid(child, &status, 0);
388 error += "failed";
389 } else if (result < 0) {
390 // waitpid() failed with an error.
391 error += "failed: ";
392 error += strerror(waitpid_errno);
393 } else if (WIFEXITED(status)) {
394 int rc = WEXITSTATUS(status);
395 switch (rc) {
396 case OMEGA_EX_SOCKET_READ_ERROR:
397 error += "subprocess failed to read data from us";
398 break;
399 case OMEGA_EX_SOCKET_WRITE_ERROR:
400 error += "subprocess failed to write data to us";
401 break;
402 case EX_TEMPFAIL:
403 error += "timed out";
404 break;
405 case EX_UNAVAILABLE:
406 error += "failed to initialise";
407 filter_module = string();
408 return -1;
409 case EX_OSERR:
410 error += "failed to run helper";
411 filter_module = string();
412 return -1;
413 default:
414 error += "exited with status ";
415 error += str(rc);
416 break;
418 } else {
419 error += "killed by signal ";
420 error += str(WTERMSIG(status));
422 #elif defined __WIN32__
423 DWORD result = WaitForSingleObject(child, 0);
425 fclose(sockt);
426 sockt = NULL;
428 if (result == WAIT_TIMEOUT) {
429 // The worker is still alive, so terminate it. We need to specify an
430 // exit code here, 255 is an arbitrary choice.
431 TerminateProcess(child, 255);
432 WaitForSingleObject(child, INFINITE);
433 error += "failed";
434 CloseHandle(child);
435 } else if (result == WAIT_FAILED) {
436 // WaitForSingleObject() failed in an unexpected way.
437 error += "failed: ";
438 error += str(GetLastError());
439 CloseHandle(child);
440 } else {
441 DWORD rc;
442 while (GetExitCodeProcess(child, &rc) && rc == STILL_ACTIVE) {
443 Sleep(100);
445 switch (rc) {
446 case OMEGA_EX_SOCKET_READ_ERROR:
447 error += "subprocess failed to read data from us";
448 break;
449 case OMEGA_EX_SOCKET_WRITE_ERROR:
450 error += "subprocess failed to write data to us";
451 break;
452 case EX_TEMPFAIL:
453 error += "timed out";
454 break;
455 case EX_UNAVAILABLE:
456 error += "failed to initialise";
457 filter_module = string();
458 return -1;
459 case EX_OSERR:
460 error += "failed to run helper";
461 filter_module = string();
462 return -1;
463 default:
464 error += "exited with status ";
465 error += str(rc);
466 break;
469 #else
470 # error Omega needs porting to this platform
471 #endif
472 return 1;