Automatic date update in version.in
[binutils-gdb.git] / gprofng / src / ipcio.cc
blob57f2617c29357f65dac8ac5bfad48393e2fc77d9
1 /* Copyright (C) 2021 Free Software Foundation, Inc.
2 Contributed by Oracle.
4 This file is part of GNU Binutils.
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3, or (at your option)
9 any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, write to the Free Software
18 Foundation, 51 Franklin Street - Fifth Floor, Boston,
19 MA 02110-1301, USA. */
21 #include "config.h"
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <signal.h>
25 #include <unistd.h>
26 #include <iostream>
27 #include <iomanip>
28 #include <sstream>
29 #include <queue>
30 #include "vec.h"
31 #include "util.h"
32 #include "ipcio.h"
33 #include "DbeThread.h"
34 #include "Experiment.h"
36 #define ipc_trace if (ipc_flags) ipc_default_log
37 #define ipc_request_trace if (ipc_flags) ipc_request_log
38 #define ipc_response_trace if (ipc_flags) ipc_response_log
40 using namespace std;
42 // IPC implementation
43 static const int L_PROGRESS = 0;
44 static const int L_INTEGER = 1;
45 static const int L_BOOLEAN = 2;
46 static const int L_LONG = 3;
47 static const int L_STRING = 4;
48 static const int L_DOUBLE = 5;
49 static const int L_ARRAY = 6;
50 static const int L_OBJECT = 7;
51 static const int L_CHAR = 8;
53 int currentRequestID;
54 int currentChannelID;
55 static long maxSize;
57 extern int cancellableChannelID;
58 extern int error_flag;
59 extern int ipc_delay_microsec;
60 extern FILE *responseLogFileP;
62 IPCresponse *IPCresponseGlobal;
64 BufferPool *responseBufferPool;
66 IPCrequest::IPCrequest (int sz, int reqID, int chID)
68 size = sz;
69 requestID = reqID;
70 channelID = chID;
71 status = INITIALIZED;
72 idx = 0;
73 buf = (char *) malloc (size);
74 cancelImmediate = false;
77 IPCrequest::~IPCrequest ()
79 free (buf);
82 void
83 IPCrequest::read (void)
85 for (int i = 0; i < size; i++)
87 int c = getc (stdin);
88 ipc_request_trace (TRACE_LVL_4, " IPCrequest:getc(stdin): %02x\n", c);
89 buf[i] = c;
93 IPCrequestStatus
94 IPCrequest::getStatus (void)
96 return status;
99 void
100 IPCrequest::setStatus (IPCrequestStatus newStatus)
102 status = newStatus;
105 static int
106 readByte (IPCrequest* req)
108 int c;
109 int val = 0;
110 for (int i = 0; i < 2; i++)
112 if (req == NULL)
114 c = getc (stdin);
115 ipc_request_trace (TRACE_LVL_4, " readByte:getc(stdin): %02x\n", c);
117 else
118 c = req->rgetc ();
119 switch (c)
121 case '0': case '1': case '2': case '3':
122 case '4': case '5': case '6': case '7':
123 case '8': case '9':
124 val = val * 16 + c - '0';
125 break;
126 case 'a': case 'b': case 'c': case 'd': case 'e': case 'f':
127 val = val * 16 + c - 'a' + 10;
128 break;
129 case EOF:
130 val = EOF;
131 break;
132 default:
133 fprintf (stderr, "readByte: Unknown byte: %d\n", c);
134 break;
137 return val;
140 static int
141 readIVal (IPCrequest *req)
143 int val = readByte (req);
144 for (int i = 0; i < 3; i++)
145 val = val * 256 + readByte (req);
146 ipc_trace (" readIVal: %d\n", val);
147 return val;
150 static String
151 readSVal (IPCrequest *req)
153 int len = readIVal (req);
154 if (len == -1)
156 ipc_trace (" readSVal: <NULL>\n");
157 return NULL;
159 char *str = (char *) malloc (len + 1);
160 char *s = str;
161 *s = (char) 0;
162 while (len--)
163 *s++ = req->rgetc ();
164 *s = (char) 0;
165 ipc_trace (" readSVal: '%s'\n", str);
166 return str;
169 static long long
170 readLVal (IPCrequest *req)
172 long long val = readByte (req);
173 for (int i = 0; i < 7; i++)
174 val = val * 256 + readByte (req);
175 ipc_trace (" readLVal: %lld\n", val);
176 return val;
179 static bool
180 readBVal (IPCrequest *req)
182 int val = readByte (req);
183 ipc_trace (" readBVal: %s\n", val == 0 ? "true" : "false");
184 return val != 0;
187 static char
188 readCVal (IPCrequest *req)
190 int val = readByte (req);
191 ipc_trace (" readCVal: %d\n", val);
192 return (char) val;
195 static double
196 readDVal (IPCrequest *req)
198 String s = readSVal (req);
199 double d = atof (s);
200 free (s);
201 return d;
204 static Object
205 readAVal (IPCrequest *req)
207 bool twoD = false;
208 int type = readByte (req);
209 if (type == L_ARRAY)
211 twoD = true;
212 type = readByte (req);
214 ipc_trace ("readAVal: twoD=%s type=%d\n", twoD ? "true" : "false", type);
216 int len = readIVal (req);
217 if (len == -1)
218 return NULL;
219 switch (type)
221 case L_INTEGER:
222 if (twoD)
224 Vector<Vector<int>*> *array = new Vector<Vector<int>*>(len);
225 for (int i = 0; i < len; i++)
226 array->store (i, (Vector<int>*)readAVal (req));
227 return array;
229 else
231 Vector<int> *array = new Vector<int>(len);
232 for (int i = 0; i < len; i++)
233 array->store (i, readIVal (req));
234 return array;
236 //break;
237 case L_LONG:
238 if (twoD)
240 Vector<Vector<long long>*> *array = new Vector<Vector<long long>*>(len);
241 for (int i = 0; i < len; i++)
242 array->store (i, (Vector<long long>*)readAVal (req));
243 return array;
245 else
247 Vector<long long> *array = new Vector<long long>(len);
248 for (int i = 0; i < len; i++)
249 array->store (i, readLVal (req));
250 return array;
252 //break;
253 case L_DOUBLE:
254 if (twoD)
256 Vector<Vector<double>*> *array = new Vector<Vector<double>*>(len);
257 for (int i = 0; i < len; i++)
258 array->store (i, (Vector<double>*)readAVal (req));
259 return array;
261 else
263 Vector<double> *array = new Vector<double>(len);
264 for (int i = 0; i < len; i++)
265 array->store (i, readDVal (req));
266 return array;
268 //break;
269 case L_BOOLEAN:
270 if (twoD)
272 Vector < Vector<bool>*> *array = new Vector < Vector<bool>*>(len);
273 for (int i = 0; i < len; i++)
274 array->store (i, (Vector<bool>*)readAVal (req));
275 return array;
277 else
279 Vector<bool> *array = new Vector<bool>(len);
280 for (int i = 0; i < len; i++)
281 array->store (i, readBVal (req));
282 return array;
284 //break;
285 case L_CHAR:
286 if (twoD)
288 Vector<Vector<char>*> *array = new Vector<Vector<char>*>(len);
289 for (int i = 0; i < len; i++)
290 array->store (i, (Vector<char>*)readAVal (req));
291 return array;
293 else
295 Vector<char> *array = new Vector<char>(len);
296 for (int i = 0; i < len; i++)
297 array->store (i, readCVal (req));
298 return array;
300 //break;
301 case L_STRING:
302 if (twoD)
304 Vector<Vector<String>*> *array = new Vector<Vector<String>*>(len);
305 for (int i = 0; i < len; i++)
306 array->store (i, (Vector<String>*)readAVal (req));
307 return array;
309 else
311 Vector<String> *array = new Vector<String>(len);
312 for (int i = 0; i < len; i++)
313 array->store (i, readSVal (req));
314 return array;
316 //break;
317 case L_OBJECT:
318 if (twoD)
320 Vector<Vector<Object>*> *array = new Vector<Vector<Object>*>(len);
321 for (int i = 0; i < len; i++)
322 array->store (i, (Vector<Object>*)readAVal (req));
323 return array;
325 else
327 Vector<Object> *array = new Vector<Object>(len);
328 for (int i = 0; i < len; i++)
329 array->store (i, readAVal (req));
330 return array;
332 //break;
333 default:
334 fprintf (stderr, "readAVal: Unknown code: %d\n", type);
335 break;
337 return NULL;
340 static int iVal;
341 static bool bVal;
342 static long long lVal;
343 static String sVal;
344 static double dVal;
345 static Object aVal;
347 static void
348 readResult (int type, IPCrequest *req)
350 int tVal = readByte (req);
351 switch (tVal)
353 case L_INTEGER:
354 iVal = readIVal (req);
355 break;
356 case L_LONG:
357 lVal = readLVal (req);
358 break;
359 case L_BOOLEAN:
360 bVal = readBVal (req);
361 break;
362 case L_DOUBLE:
363 dVal = readDVal (req);
364 break;
365 case L_STRING:
366 sVal = readSVal (req);
367 break;
368 case L_ARRAY:
369 aVal = readAVal (req);
370 break;
371 case EOF:
372 fprintf (stderr, "EOF read in readResult\n");
373 sVal = NULL;
374 return;
375 default:
376 fprintf (stderr, "Unknown code: %d\n", tVal);
377 abort ();
379 if (type != tVal)
381 fprintf (stderr, "Internal error: readResult: parameter mismatch: type=%d should be %d\n", tVal, type);
382 abort ();
387 readInt (IPCrequest *req)
389 readResult (L_INTEGER, req);
390 return iVal;
393 String
394 readString (IPCrequest *req)
396 readResult (L_STRING, req);
397 return sVal;
400 long long
401 readLong (IPCrequest *req)
403 readResult (L_LONG, req);
404 return lVal;
407 double
408 readDouble (IPCrequest *req)
410 readResult (L_DOUBLE, req);
411 return dVal;
414 bool
415 readBoolean (IPCrequest *req)
417 readResult (L_BOOLEAN, req);
418 return bVal;
421 DbeObj
422 readObject (IPCrequest *req)
424 readResult (L_LONG, req);
425 return (DbeObj) lVal;
428 Object
429 readArray (IPCrequest *req)
431 readResult (L_ARRAY, req);
432 return aVal;
435 // Write
436 IPCresponse::IPCresponse (int sz)
438 requestID = -1;
439 channelID = -1;
440 responseType = -1;
441 responseStatus = RESPONSE_STATUS_SUCCESS;
442 sb = new StringBuilder (sz);
443 next = NULL;
446 IPCresponse::~IPCresponse ()
448 delete sb;
451 void
452 IPCresponse::reset ()
454 requestID = -1;
455 channelID = -1;
456 responseType = -1;
457 responseStatus = RESPONSE_STATUS_SUCCESS;
458 sb->setLength (0);
461 void
462 IPCresponse::sendByte (int b)
464 ipc_trace ("sendByte: %02x %d\n", b, b);
465 sb->appendf ("%02x", b);
468 void
469 IPCresponse::sendIVal (int i)
471 ipc_trace ("sendIVal: %08x %d\n", i, i);
472 sb->appendf ("%08x", i);
475 void
476 IPCresponse::sendLVal (long long l)
478 ipc_trace ("sendLVal: %016llx %lld\n", l, l);
479 sb->appendf ("%016llx", l);
482 void
483 IPCresponse::sendSVal (const char *s)
485 if (s == NULL)
487 sendIVal (-1);
488 return;
490 sendIVal ((int) strlen (s));
491 ipc_trace ("sendSVal: %s\n", s);
492 sb->appendf ("%s", s);
495 void
496 IPCresponse::sendBVal (bool b)
498 sendByte (b ? 1 : 0);
501 void
502 IPCresponse::sendCVal (char c)
504 sendByte (c);
507 void
508 IPCresponse::sendDVal (double d)
510 char str[32];
511 snprintf (str, sizeof (str), "%.12f", d);
512 sendSVal (str);
515 void
516 IPCresponse::sendAVal (void *ptr)
518 if (ptr == NULL)
520 sendByte (L_INTEGER);
521 sendIVal (-1);
522 return;
525 VecType type = ((Vector<void*>*)ptr)->type ();
526 switch (type)
528 case VEC_INTEGER:
530 sendByte (L_INTEGER);
531 Vector<int> *array = (Vector<int>*)ptr;
532 sendIVal (array->size ());
533 for (int i = 0; i < array->size (); i++)
534 sendIVal (array->fetch (i));
535 break;
537 case VEC_BOOL:
539 sendByte (L_BOOLEAN);
540 Vector<bool> *array = (Vector<bool>*)ptr;
541 sendIVal (array->size ());
542 for (int i = 0; i < array->size (); i++)
543 sendBVal (array->fetch (i));
544 break;
546 case VEC_CHAR:
548 sendByte (L_CHAR);
549 Vector<char> *array = (Vector<char>*)ptr;
550 sendIVal (array->size ());
551 for (int i = 0; i < array->size (); i++)
552 sendCVal (array->fetch (i));
553 break;
555 case VEC_LLONG:
557 sendByte (L_LONG);
558 Vector<long long> *array = (Vector<long long>*)ptr;
559 sendIVal (array->size ());
560 for (int i = 0; i < array->size (); i++)
561 sendLVal (array->fetch (i));
562 break;
564 case VEC_DOUBLE:
566 sendByte (L_DOUBLE);
567 Vector<double> *array = (Vector<double>*)ptr;
568 sendIVal (array->size ());
569 for (int i = 0; i < array->size (); i++)
570 sendDVal (array->fetch (i));
571 break;
573 case VEC_STRING:
575 sendByte (L_STRING);
576 Vector<String> *array = (Vector<String>*)ptr;
577 sendIVal (array->size ());
578 for (int i = 0; i < array->size (); i++)
579 sendSVal (array->fetch (i));
580 break;
582 case VEC_STRINGARR:
584 sendByte (L_ARRAY);
585 sendByte (L_STRING);
586 Vector<void*> *array = (Vector<void*>*)ptr;
587 sendIVal (array->size ());
588 for (int i = 0; i < array->size (); i++)
589 sendAVal (array->fetch (i));
590 break;
592 case VEC_INTARR:
594 sendByte (L_ARRAY);
595 sendByte (L_INTEGER);
596 Vector<void*> *array = (Vector<void*>*)ptr;
597 sendIVal (array->size ());
598 for (int i = 0; i < array->size (); i++)
599 sendAVal (array->fetch (i));
600 break;
602 case VEC_LLONGARR:
604 sendByte (L_ARRAY);
605 sendByte (L_LONG);
606 Vector<void*> *array = (Vector<void*>*)ptr;
607 sendIVal (array->size ());
608 for (int i = 0; i < array->size (); i++)
609 sendAVal (array->fetch (i));
610 break;
612 case VEC_VOIDARR:
614 sendByte (L_OBJECT);
615 Vector<void*> *array = (Vector<void*>*)ptr;
616 sendIVal (array->size ());
617 for (int i = 0; i < array->size (); i++)
618 sendAVal (array->fetch (i));
619 break;
621 default:
622 fprintf (stderr, "sendAVal: Unknown type: %d\n", type);
623 abort ();
627 static void
628 writeResponseHeader (int requestID, int responseType, int responseStatus, int nBytes)
630 if (responseType == RESPONSE_TYPE_HANDSHAKE)
631 nBytes = IPC_VERSION_NUMBER;
632 int use_write = 2;
633 ipc_response_trace (TRACE_LVL_1, "ResponseHeaderBegin----- %x ---- %x ----- %x -----%x -------\n", requestID, responseType, responseStatus, nBytes);
634 if (use_write)
636 char buf[23];
637 if (use_write == 1)
639 int i = 0;
640 snprintf (buf + i, 3, "%2x", HEADER_MARKER);
641 i += 2;
642 snprintf (buf + i, 9, "%8x", requestID);
643 i += 8;
644 snprintf (buf + i, 3, "%2x", responseType);
645 i += 2;
646 snprintf (buf + i, 3, "%2x", responseStatus);
647 i += 2;
648 snprintf (buf + i, 9, "%8x", nBytes);
650 else
651 snprintf (buf, 23, "%02x%08x%02x%02x%08x", HEADER_MARKER, requestID,
652 responseType, responseStatus, nBytes);
653 buf[22] = 0;
654 write (1, buf, 22);
656 else
658 cout << setfill ('0') << setw (2) << hex << HEADER_MARKER;
659 cout << setfill ('0') << setw (8) << hex << requestID;
660 cout << setfill ('0') << setw (2) << hex << responseType;
661 cout << setfill ('0') << setw (2) << hex << responseStatus;
662 cout << setfill ('0') << setw (8) << hex << nBytes;
663 cout.flush ();
665 ipc_response_trace (TRACE_LVL_1, "----------------------------ResponseHeaderEnd\n");
666 if (nBytes > maxSize)
668 maxSize = nBytes;
669 ipc_trace ("New maxsize %ld\n", maxSize);
673 bool
674 cancelNeeded (int chID)
676 if (chID == cancellableChannelID && chID == cancelRequestedChannelID)
677 return true;
678 else
679 return false;
682 static void
683 writeResponseWithHeader (int requestID, int channelID, int responseType,
684 int responseStatus, IPCresponse* os)
686 if (cancelNeeded (channelID))
688 responseStatus = RESPONSE_STATUS_CANCELLED;
689 ipc_trace ("CANCELLING %d %d\n", requestID, channelID);
690 // This is for gracefully cancelling regular ops like openExperiment - getFiles should never reach here
692 os->setRequestID (requestID);
693 os->setChannelID (channelID);
694 os->setResponseType (responseType);
695 os->setResponseStatus (responseStatus);
696 os->print ();
697 os->reset ();
698 responseBufferPool->recycle (os);
701 void
702 writeAckFast (int requestID)
704 writeResponseHeader (requestID, RESPONSE_TYPE_ACK, RESPONSE_STATUS_SUCCESS, 0);
707 void
708 writeAck (int requestID, int channelID)
710 #if DEBUG
711 char *s = getenv (NTXT ("SP_NO_IPC_ACK"));
712 #else /* ^DEBUG */
713 char *s = NULL;
714 #endif /* ^DEBUG */
715 if (s)
717 int i = requestID;
718 int j = channelID;
719 ipc_request_trace (TRACE_LVL_4, "ACK skipped: requestID=%d channelID=%d\n", i, j);
721 else
723 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL);
724 writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_ACK,
725 RESPONSE_STATUS_SUCCESS, OUTS);
729 void
730 writeHandshake (int requestID, int channelID)
732 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL);
733 writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_HANDSHAKE, RESPONSE_STATUS_SUCCESS, OUTS);
734 // writeResponseHeader(requestID, RESPONSE_TYPE_HANDSHAKE, RESPONSE_STATUS_SUCCESS, IPC_VERSION_NUMBER);
737 void
738 writeResponseGeneric (int responseStatus, int requestID, int channelID)
740 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL);
741 writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_COMPLETE, responseStatus, OUTS);
744 BufferPool::BufferPool ()
746 pthread_mutex_init (&p_mutex, NULL);
747 smallBuf = NULL;
748 largeBuf = NULL;
751 BufferPool::~BufferPool ()
753 for (IPCresponse *p = smallBuf; p;)
755 IPCresponse *tmp = p;
756 p = tmp->next;
757 delete tmp;
759 for (IPCresponse *p = largeBuf; p;)
761 IPCresponse *tmp = p;
762 p = tmp->next;
763 delete tmp;
767 IPCresponse*
768 BufferPool::getNewResponse (int size)
770 pthread_mutex_lock (&p_mutex);
771 if (ipc_single_threaded_mode && size < BUFFER_SIZE_LARGE)
772 size = BUFFER_SIZE_LARGE;
773 IPCresponse *newResponse = NULL;
774 if (size >= BUFFER_SIZE_LARGE)
776 if (largeBuf)
778 newResponse = largeBuf;
779 largeBuf = largeBuf->next;
782 else if (smallBuf)
784 newResponse = smallBuf;
785 smallBuf = smallBuf->next;
787 if (newResponse)
788 newResponse->reset ();
789 else
791 newResponse = new IPCresponse (size);
792 ipc_trace ("GETNEWBUFFER %d\n", size);
794 pthread_mutex_unlock (&p_mutex);
795 return newResponse;
798 void
799 BufferPool::recycle (IPCresponse *respB)
801 pthread_mutex_lock (&p_mutex);
802 if (respB->getCurBufSize () >= BUFFER_SIZE_LARGE)
804 respB->next = largeBuf;
805 largeBuf = respB;
807 else
809 respB->next = smallBuf;
810 smallBuf = respB;
812 pthread_mutex_unlock (&p_mutex);
815 void
816 writeArray (void *ptr, IPCrequest* req)
818 if (req->getStatus () == CANCELLED_IMMEDIATE)
819 return;
820 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_LARGE);
821 OUTS->sendByte (L_ARRAY);
822 OUTS->sendAVal (ptr);
823 writeResponseWithHeader (req->getRequestID (), req->getChannelID (),
824 RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
827 void
828 writeString (const char *s, IPCrequest* req)
830 if (req->getStatus () == CANCELLED_IMMEDIATE)
831 return;
832 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_LARGE);
833 OUTS->sendByte (L_STRING);
834 OUTS->sendSVal (s);
835 writeResponseWithHeader (req->getRequestID (), req->getChannelID (),
836 RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
839 void
840 writeObject (DbeObj obj, IPCrequest* req)
842 writeLong ((long long) obj, req);
845 void
846 writeBoolean (bool b, IPCrequest* req)
848 if (req->getStatus () == CANCELLED_IMMEDIATE)
849 return;
850 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
851 OUTS->sendByte (L_BOOLEAN);
852 OUTS->sendBVal (b);
853 writeResponseWithHeader (req->getRequestID (), req->getChannelID (),
854 RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
857 void
858 writeInt (int i, IPCrequest* req)
860 if (req->getStatus () == CANCELLED_IMMEDIATE)
861 return;
862 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
863 OUTS->sendByte (L_INTEGER);
864 OUTS->sendIVal (i);
865 writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
868 void
869 writeChar (char c, IPCrequest* req)
871 if (req->getStatus () == CANCELLED_IMMEDIATE)
872 return;
873 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
874 OUTS->sendByte (L_CHAR);
875 OUTS->sendCVal (c);
876 writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
879 void
880 writeLong (long long l, IPCrequest* req)
882 if (req->getStatus () == CANCELLED_IMMEDIATE)
883 return;
884 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
885 OUTS->sendByte (L_LONG);
886 OUTS->sendLVal (l);
887 writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
890 void
891 writeDouble (double d, IPCrequest* req)
893 if (req->getStatus () == CANCELLED_IMMEDIATE) return;
894 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
895 OUTS->sendByte (L_DOUBLE);
896 OUTS->sendDVal (d);
897 writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
901 setProgress (int percentage, const char *proc_str)
903 if (cancelNeeded (currentChannelID))
905 // ExperimentLoadCancelException *e1 = new ExperimentLoadCancelException();
906 // throw (e1);
907 return 1;
909 if (NULL == proc_str)
910 return 1;
911 int size = strlen (proc_str) + 100; // 100 bytes for additional data
912 int bs = BUFFER_SIZE_MEDIUM;
913 if (size > BUFFER_SIZE_MEDIUM)
915 if (size > BUFFER_SIZE_LARGE) return 1; // This should never happen
916 bs = BUFFER_SIZE_LARGE;
918 IPCresponse *OUTS = responseBufferPool->getNewResponse (bs);
919 OUTS->sendByte (L_PROGRESS);
920 OUTS->sendIVal (percentage);
921 OUTS->sendSVal (proc_str);
922 writeResponseWithHeader (currentRequestID, currentChannelID, RESPONSE_TYPE_PROGRESS, RESPONSE_STATUS_SUCCESS, OUTS);
923 return 0;
926 void
927 IPCresponse::print (void)
929 if (ipc_delay_microsec)
930 usleep (ipc_delay_microsec);
931 int stringSize = sb->length ();
932 writeResponseHeader (requestID, responseType, responseStatus, stringSize);
933 if (stringSize > 0)
935 char *s = sb->toString ();
936 hrtime_t start_time = gethrtime ();
937 int use_write = 1;
938 if (use_write)
939 write (1, s, stringSize); // write(1, sb->toString(), stringSize);
940 else
942 cout << s;
943 cout.flush ();
945 hrtime_t end_time = gethrtime ();
946 unsigned long long time_stamp = end_time - start_time;
947 ipc_response_log (TRACE_LVL_3, "ReqID %x flush time %llu nanosec \n", requestID, time_stamp);
948 free (s);
952 void
953 setCancelRequestedCh (int chID)
955 cancelRequestedChannelID = chID;
958 void
959 readRequestHeader ()
961 int marker = readByte (NULL);
962 if (marker != HEADER_MARKER)
964 fprintf (stderr, "Internal error: received request (%d) without header marker\n", marker);
965 error_flag = 1;
966 return;
968 else
969 ipc_request_trace (TRACE_LVL_1, "RequestHeaderBegin------------------------\n");
970 int requestID = readIVal (NULL);
971 int requestType = readByte (NULL);
972 int channelID = readIVal (NULL);
973 int nBytes = readIVal (NULL);
974 if (requestType == REQUEST_TYPE_HANDSHAKE)
976 // write the ack directly to the wire, not through the response queue
977 // writeAckFast(requestID);
978 writeAck (requestID, channelID);
979 maxSize = 0;
980 writeHandshake (requestID, channelID);
981 ipc_request_trace (TRACE_LVL_1, "RQ: HANDSHAKE --- %x ----- %x ---- %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes);
983 else if (requestType == REQUEST_TYPE_CANCEL)
985 writeAck (requestID, channelID);
986 ipc_request_trace (TRACE_LVL_1, "RQ: CANCEL --- RQ: %x ----- %x --- CH: %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes);
987 if (channelID == cancellableChannelID)
989 // we have worked on at least one request belonging to this channel
990 writeResponseGeneric (RESPONSE_STATUS_SUCCESS, requestID, channelID);
991 setCancelRequestedCh (channelID);
992 ipc_trace ("CANCELLABLE %x %x\n", channelID, currentChannelID);
993 if (channelID == currentChannelID)
994 // request for this channel is currently in progress
995 ipc_request_trace (TRACE_LVL_1, "IN PROGRESS REQUEST NEEDS CANCELLATION");
996 // ssp_post_cond(waitingToFinish);
998 else
1000 // FIXME:
1001 // it is possible that a request for this channel is on the requestQ
1002 // or has been submitted to the work group queue but is waiting for a thread to pick it up
1003 writeResponseGeneric (RESPONSE_STATUS_FAILURE, requestID, channelID);
1004 setCancelRequestedCh (channelID);
1005 ipc_request_trace (TRACE_LVL_1, "RETURNING FAILURE TO CANCEL REQUEST channel %d\n", channelID);
1008 else
1010 writeAck (requestID, channelID);
1011 ipc_request_trace (TRACE_LVL_1, "RQ: --- %x ----- %x ---- %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes);
1012 IPCrequest *nreq = new IPCrequest (nBytes, requestID, channelID);
1013 nreq->read ();
1014 ipc_request_trace (TRACE_LVL_1, "RQ: --- %x Read from stream \n", requestID);
1015 if (cancelNeeded (channelID))
1017 ipc_request_trace (TRACE_LVL_1, "CANCELLABLE REQ RECVD %x %x\n", channelID, requestID);
1018 writeResponseGeneric (RESPONSE_STATUS_CANCELLED, requestID, channelID);
1019 delete nreq;
1020 return;
1022 DbeQueue *q = new DbeQueue (ipc_doWork, nreq);
1023 ipcThreadPool->put_queue (q);