Merge pull request #110 from tesselode/fixes
[wdl/wdl-ol.git] / WDL / shm_connection.cpp
blob78fce7af750782ea1510619916ba366ba91fd22c
1 #include "shm_connection.h"
3 #ifdef _WIN32
4 #define SHM_MINSIZE 64
5 #define SHM_HDRSIZE (4+4*4+2)
6 // 4 bytes: buffer size (each channel)
7 // 4 bytes: read pointer chan 0
8 // 4 bytes: write pointer chan 0
9 // 4 bytes: read pointer chan 1
10 // 4 bytes: write pointer chan 1
11 // 2 bytes: chan 0 refresh, chan 1 refresh
12 // data follows (2x buffer size)
15 WDL_SHM_Connection::WDL_SHM_Connection(bool whichChan,
16 const char *uniquestring, // identify
17 int shmsize, // bytes, whoever opens first decides
18 int timeout_sec,
19 int extra_flags // unused on win32
23 m_timeout_cnt=0;
24 m_timeout_sec=timeout_sec;
25 m_last_recvt=time(NULL)+2; // grace period
26 { // make shmsize the next power of two
27 int a = shmsize;
28 shmsize=2;
29 while (shmsize < SHM_MINSIZE || shmsize<a) shmsize*=2;
32 m_file=INVALID_HANDLE_VALUE;
33 m_filemap=NULL;
34 m_mem=NULL;
35 m_lockmutex=m_events[0]=m_events[1]=NULL;
37 m_whichChan=whichChan ? 1 : 0;
39 char buf[512];
40 GetTempPath(sizeof(buf)-4,buf);
41 if (!buf[0]) strcpy(buf,"C:\\");
42 if (buf[strlen(buf)-1] != '/' && buf[strlen(buf)-1] != '\\') strcat(buf,"\\");
43 m_tempfn.Set(buf);
44 m_tempfn.Append("WDL_SHM_");
45 m_tempfn.Append(uniquestring);
46 m_tempfn.Append(".tmp");
48 WDL_String tmp("Global\\WDL_SHM_");
49 #ifdef WDL_SUPPORT_WIN9X
50 if (GetVersion()&0x80000000) tmp.Set("WDL_SHM_");
51 #endif
52 tmp.Append(uniquestring);
53 const size_t tmp_l = strlen(tmp.Get());
55 tmp.Append(".m");
56 HANDLE mutex = CreateMutex(NULL,FALSE,tmp.Get());
58 if (mutex) WaitForSingleObject(mutex,INFINITE);
60 tmp.Get()[tmp_l]=0;
61 tmp.Append(whichChan?".l1":".l0");
62 m_lockmutex = CreateMutex(NULL,FALSE,tmp.Get());
63 if (m_lockmutex)
65 if (WaitForSingleObject(m_lockmutex,100) == WAIT_OBJECT_0)
67 DeleteFile(m_tempfn.Get()); // this is designed to fail if another process has it locked
69 m_file=CreateFile(m_tempfn.Get(),GENERIC_READ|GENERIC_WRITE,
70 FILE_SHARE_READ|FILE_SHARE_WRITE ,
71 NULL,whichChan ? OPEN_EXISTING : OPEN_ALWAYS,FILE_ATTRIBUTE_TEMPORARY,NULL);
73 else
75 CloseHandle(m_lockmutex);
76 m_lockmutex=0;
80 int mapsize;
81 if (m_file != INVALID_HANDLE_VALUE &&
82 ((mapsize=GetFileSize(m_file,NULL)) < SHM_HDRSIZE+SHM_MINSIZE*2 ||
83 mapsize == 0xFFFFFFFF))
85 char buf[4096];
86 memset(buf,0,sizeof(buf));
87 *(int *)buf=shmsize;
89 int sz=shmsize*2 + SHM_HDRSIZE;
90 while (sz>0)
92 DWORD d;
93 int a = sz;
94 if (a>sizeof(buf))a=sizeof(buf);
95 WriteFile(m_file,buf,a,&d,NULL);
96 sz-=a;
97 *(int *)buf = 0;
101 if (m_file!=INVALID_HANDLE_VALUE)
102 m_filemap=CreateFileMapping(m_file,NULL,PAGE_READWRITE,0,0,NULL);
104 if (m_filemap)
106 m_mem=(unsigned char *)MapViewOfFile(m_filemap,FILE_MAP_WRITE,0,0,0);
108 tmp.Get()[tmp_l]=0;
109 tmp.Append(".1");
110 m_events[0]=CreateEvent(NULL,false,false,tmp.Get());
111 tmp.Get()[strlen(tmp.Get())-1]++;
112 m_events[1]=CreateEvent(NULL,false,false,tmp.Get());
115 if (mutex)
117 ReleaseMutex(mutex);
118 CloseHandle(mutex);
124 WDL_SHM_Connection::~WDL_SHM_Connection()
126 if (m_mem) UnmapViewOfFile(m_mem);
127 if (m_filemap) CloseHandle(m_filemap);
128 if (m_file != INVALID_HANDLE_VALUE) CloseHandle(m_file);
129 DeleteFile(m_tempfn.Get());
131 if (m_events[0]) CloseHandle(m_events[0]);
132 if (m_events[1]) CloseHandle(m_events[1]);
133 if (m_lockmutex)
135 ReleaseMutex(m_lockmutex);
136 CloseHandle(m_lockmutex);
140 bool WDL_SHM_Connection::WantSendKeepAlive() { return false; }
142 int WDL_SHM_Connection::Run()
144 if (!m_mem) return -1;
146 int *hdr = (int *)m_mem;
148 int shm_size = hdr[0];
150 // todo: check to see if we just opened, if so, have a grace period
151 if (shm_size < SHM_MINSIZE) return -1;
153 m_mem[4*5 + !!m_whichChan] = 1;
154 if (m_timeout_sec > 0)
156 if (m_mem[4*5 + !m_whichChan])
158 m_timeout_cnt=0;
159 m_last_recvt=time(NULL);
160 m_mem[4*5 + !m_whichChan]=0;
162 else
164 if (time(NULL) > m_timeout_sec+m_last_recvt)
166 if (m_timeout_cnt >= 4) return -1;
168 m_timeout_cnt++;
169 m_last_recvt=time(NULL);
174 int didStuff=0;
176 // process writes
177 int send_avail=send_queue.Available();
178 if (send_avail>0)
180 int wc = !m_whichChan;
181 unsigned char *data=m_mem+SHM_HDRSIZE+shm_size*wc;
182 int rdptr = hdr[1 + wc*2]; // hopefully atomic
183 int wrptr = hdr[1 + wc*2+1];
184 int wrlen = shm_size - (wrptr-rdptr);
185 if (wrlen > 0)
187 if (wrlen > send_avail) wrlen=send_avail;
188 if (wrlen > shm_size) wrlen=shm_size; // should never happen !
190 int idx = wrptr & (shm_size-1);
191 int l = shm_size - idx;
192 if (l > wrlen) l = wrlen;
193 memcpy(data+idx,send_queue.Get(),l);
194 if (l < wrlen) memcpy(data,(char*)send_queue.Get() + l,wrlen-l);
196 hdr[1 + wc*2+1] = wrptr + wrlen; // advance write pointer, hopefluly atomic
198 didStuff|=1;
200 send_queue.Advance(wrlen);
201 send_queue.Compact();
206 // process reads
208 int wc = m_whichChan;
209 unsigned char *data=m_mem+SHM_HDRSIZE+shm_size*wc;
210 int rdptr = hdr[1 + wc*2];
211 int wrptr = hdr[1 + wc*2+1]; // hopefully atomic
212 int rdlen = wrptr-rdptr;
213 if (rdlen > 0)
215 if (rdlen > shm_size) rdlen=shm_size; // should never happen !
217 int idx = rdptr & (shm_size-1);
218 int l = shm_size - idx;
219 if (l > rdlen) l = rdlen;
220 recv_queue.Add(data+idx,l);
221 if (l < rdlen) recv_queue.Add(data,rdlen-l);
223 hdr[1 + wc*2] = wrptr; // hopefully atomic, bring read pointer up to write pointer
224 didStuff|=2;
228 if (didStuff)
230 if (m_events[!m_whichChan]) SetEvent(m_events[!m_whichChan]);
231 return 1;
235 return 0;
239 #else
241 #include <signal.h>
242 #include <sys/socket.h>
243 #include <sys/un.h>
244 #include <sys/types.h>
245 #include <sys/time.h>
246 #include <sys/errno.h>
247 #include <sys/fcntl.h>
248 #ifndef __APPLE__
249 #include <sys/file.h>
250 #endif
251 #include "swell/swell-internal.h"
253 static void sigpipehandler(int sig) { }
255 // socket version
256 WDL_SHM_Connection::WDL_SHM_Connection(bool whichChan, // first created must be whichChan=0
257 const char *uniquestring, // identify
258 int shmsize,
259 int timeout_sec,
260 int extra_flags // set 1 for lockfile use on master
263 m_sockbufsize = shmsize;
264 if (m_sockbufsize<16384) m_sockbufsize=16384;
265 else if (m_sockbufsize>1024*1024) m_sockbufsize=1024*1024;
267 m_rdbufsize = wdl_min(m_sockbufsize,65536);
268 m_rdbuf = (char *)malloc(m_rdbufsize);
270 static bool hasSigHandler;
271 if (!hasSigHandler)
273 signal(SIGPIPE,sigpipehandler);
274 hasSigHandler=true;
276 m_timeout_cnt=0;
277 m_timeout_sec=timeout_sec;
278 m_last_recvt=time(NULL)+3; // grace period
279 m_next_keepalive = time(NULL)+1;
281 m_tempfn.Set("/tmp/WDL_SHM.");
282 m_tempfn.Append(uniquestring);
283 m_tempfn.Append(".tmp");
286 m_sockaddr=malloc(sizeof(struct sockaddr_un) + strlen(m_tempfn.Get()));
287 m_lockhandle=-1;
288 m_listen_socket=-1;
289 m_socket=-1;
290 m_waitevt=0;
291 m_whichChan = whichChan;
293 struct sockaddr_un *addr = (struct sockaddr_un *)m_sockaddr;
294 addr->sun_family = AF_UNIX;
295 strcpy(addr->sun_path,m_tempfn.Get());
296 #ifdef __APPLE__
297 int l = SUN_LEN(addr)+1;
298 if (l>255)l=255;
299 addr->sun_len=l;
300 #endif
302 if (!whichChan)
304 if (extra_flags & 1)
306 m_lockfn.Set(m_tempfn.Get());
307 m_lockfn.Append(".lock");
308 m_lockhandle = open(m_lockfn.Get(),O_RDWR|O_CREAT,0666);
309 if (m_lockhandle < 0) return; // error getting lockfile, fail
310 if (flock(m_lockhandle,LOCK_NB|LOCK_EX) < 0)
312 close(m_lockhandle);
313 m_lockhandle=-1;
314 return; // could not lock
317 acquireListener();
318 if (m_listen_socket<0) return;
320 else
322 struct stat sbuf;
323 if (stat(addr->sun_path,&sbuf))
325 return; // fail
328 int s = socket(AF_UNIX,SOCK_STREAM,0);
329 if (s<0) return;
331 int bsz=m_sockbufsize;
332 setsockopt(s,SOL_SOCKET,SO_SNDBUF,(char *)&bsz,sizeof(bsz));
333 bsz=m_sockbufsize;
334 setsockopt(s,SOL_SOCKET,SO_RCVBUF,(char *)&bsz,sizeof(bsz));
336 if (connect(s,(struct sockaddr*)addr,SUN_LEN(addr)))
338 close(s);
340 else
342 fcntl(s, F_SETFL, fcntl(s,F_GETFL) | O_NONBLOCK);
343 m_socket=s;
345 // clean up the filesystem, our connection has been made
346 unlink(m_tempfn.Get());
350 if (m_socket>=0 || m_listen_socket>=0)
352 SWELL_InternalObjectHeader_SocketEvent *se = (SWELL_InternalObjectHeader_SocketEvent*)malloc(sizeof(SWELL_InternalObjectHeader_SocketEvent));
353 memset(se,0,sizeof(SWELL_InternalObjectHeader_SocketEvent));
354 se->hdr.type = INTERNAL_OBJECT_EXTERNALSOCKET;
355 se->hdr.count = 1;
356 se->socket[0]=m_socket>=0? m_socket : m_listen_socket;
357 m_waitevt = (HANDLE)se;
361 WDL_SHM_Connection::~WDL_SHM_Connection()
363 if (m_listen_socket>=0 || m_socket>=0)
365 if (m_socket>=0) close(m_socket);
366 if (m_listen_socket>=0) close(m_listen_socket);
368 // only delete temp socket file if the master and successfully had something open
369 if (!m_whichChan && m_tempfn.Get()[0]) unlink(m_tempfn.Get());
372 free(m_waitevt); // don't CloseHandle(), since it's just referencing our socket
373 free(m_sockaddr);
374 free(m_rdbuf);
376 if (m_lockhandle>=0)
378 flock(m_lockhandle,LOCK_UN);
379 close(m_lockhandle);
380 unlink(m_lockfn.Get());
384 bool WDL_SHM_Connection::WantSendKeepAlive()
386 return !send_queue.GetSize() && time(NULL) >= m_next_keepalive;
390 int WDL_SHM_Connection::Run()
392 if (m_socket < 0)
394 if (m_listen_socket < 0) return -1;
396 struct sockaddr_un remote={0,};
397 socklen_t t = sizeof(struct sockaddr_un);
398 int s = accept(m_listen_socket,(struct sockaddr *)&remote,&t);
399 if (s>=0)
401 close(m_listen_socket);
402 m_listen_socket=-1;
404 fcntl(s, F_SETFL, fcntl(s,F_GETFL) | O_NONBLOCK); // nonblocking
406 int bsz=m_sockbufsize;
407 setsockopt(s,SOL_SOCKET,SO_SNDBUF,(char *)&bsz,sizeof(bsz));
408 bsz=m_sockbufsize;
409 setsockopt(s,SOL_SOCKET,SO_RCVBUF,(char *)&bsz,sizeof(bsz));
411 if (m_waitevt)
413 SWELL_InternalObjectHeader_SocketEvent *se = (SWELL_InternalObjectHeader_SocketEvent*)m_waitevt;
414 se->socket[0]=s;
416 m_socket=s;
418 else
420 if (m_timeout_sec>0 && time(NULL) > m_timeout_sec+m_last_recvt)
422 if (m_timeout_cnt >= 2) return -1;
423 m_timeout_cnt++;
424 m_last_recvt=time(NULL);
426 return 0;
430 bool sendcnt=false;
431 bool recvcnt=false;
432 for (;;)
434 bool hadAct=false;
435 while (recv_queue.Available()<128*1024*1024)
437 int n=read(m_socket,m_rdbuf,m_rdbufsize);
438 if (n>0)
440 recv_queue.Add(m_rdbuf,n);
441 hadAct=true;
442 recvcnt=true;
444 else if (n<0&&errno!=EAGAIN) goto abortClose;
445 else break;
447 while (send_queue.Available()>0)
449 int n = send_queue.Available();
450 if (n > m_rdbufsize) n=m_rdbufsize;
451 n = write(m_socket,send_queue.Get(),n);
452 if (n > 0)
454 hadAct=true;
455 sendcnt=true;
456 send_queue.Advance(n);
458 else if (n<0&&errno!=EAGAIN) goto abortClose;
459 else break;
461 if (!hadAct) break;
463 if (sendcnt) send_queue.Compact();
465 if (m_timeout_sec>0)
467 time_t now = time(NULL);
468 if (recvcnt)
470 m_last_recvt=now;
471 m_timeout_cnt=0;
473 else if (now > m_timeout_sec+m_last_recvt)
475 if (m_timeout_cnt >= 3) return -1;
476 m_timeout_cnt++;
477 m_last_recvt=now;
480 if (sendcnt||send_queue.GetSize()) m_next_keepalive = now + (m_timeout_sec+1)/2;
483 return sendcnt||recvcnt;
485 abortClose:
486 if (m_whichChan) return -1;
488 acquireListener();
489 recv_queue.Clear();
490 send_queue.Clear();
491 if (m_waitevt)
493 SWELL_InternalObjectHeader_SocketEvent *se = (SWELL_InternalObjectHeader_SocketEvent*)m_waitevt;
494 se->socket[0]=m_listen_socket;
496 close(m_socket);
497 m_socket=-1;
499 return m_listen_socket >= 0 ? 0 : -1;
502 void WDL_SHM_Connection::acquireListener()
504 // only ever called from whichChan==0
505 if (m_listen_socket>=0) return; // no need to re-open
507 unlink(m_tempfn.Get());
509 int s = socket(AF_UNIX,SOCK_STREAM,0);
510 if (s<0) return;
511 struct sockaddr_un *addr = (struct sockaddr_un *)m_sockaddr;
512 if (bind(s,(struct sockaddr*)addr,SUN_LEN(addr)) < 0 || listen(s,1) < 0)
514 close(s);
516 else
518 fcntl(s, F_SETFL, fcntl(s,F_GETFL) | O_NONBLOCK);
519 m_listen_socket = s;
522 #endif