1 #include "shm_connection.h"
5 #define SHM_HDRSIZE (4+4*4+2)
6 // 4 bytes: buffer size (each channel)
7 // 4 bytes: read pointer chan 0
8 // 4 bytes: write pointer chan 0
9 // 4 bytes: read pointer chan 1
10 // 4 bytes: write pointer chan 1
11 // 2 bytes: chan 0 refresh, chan 1 refresh
12 // data follows (2x buffer size)
15 WDL_SHM_Connection::WDL_SHM_Connection(bool whichChan
,
16 const char *uniquestring
, // identify
17 int shmsize
, // bytes, whoever opens first decides
19 int extra_flags
// unused on win32
24 m_timeout_sec
=timeout_sec
;
25 m_last_recvt
=time(NULL
)+2; // grace period
26 { // make shmsize the next power of two
29 while (shmsize
< SHM_MINSIZE
|| shmsize
<a
) shmsize
*=2;
32 m_file
=INVALID_HANDLE_VALUE
;
35 m_lockmutex
=m_events
[0]=m_events
[1]=NULL
;
37 m_whichChan
=whichChan
? 1 : 0;
40 GetTempPath(sizeof(buf
)-4,buf
);
41 if (!buf
[0]) strcpy(buf
,"C:\\");
42 if (buf
[strlen(buf
)-1] != '/' && buf
[strlen(buf
)-1] != '\\') strcat(buf
,"\\");
44 m_tempfn
.Append("WDL_SHM_");
45 m_tempfn
.Append(uniquestring
);
46 m_tempfn
.Append(".tmp");
48 WDL_String
tmp("Global\\WDL_SHM_");
49 #ifdef WDL_SUPPORT_WIN9X
50 if (GetVersion()&0x80000000) tmp
.Set("WDL_SHM_");
52 tmp
.Append(uniquestring
);
53 const size_t tmp_l
= strlen(tmp
.Get());
56 HANDLE mutex
= CreateMutex(NULL
,FALSE
,tmp
.Get());
58 if (mutex
) WaitForSingleObject(mutex
,INFINITE
);
61 tmp
.Append(whichChan
?".l1":".l0");
62 m_lockmutex
= CreateMutex(NULL
,FALSE
,tmp
.Get());
65 if (WaitForSingleObject(m_lockmutex
,100) == WAIT_OBJECT_0
)
67 DeleteFile(m_tempfn
.Get()); // this is designed to fail if another process has it locked
69 m_file
=CreateFile(m_tempfn
.Get(),GENERIC_READ
|GENERIC_WRITE
,
70 FILE_SHARE_READ
|FILE_SHARE_WRITE
,
71 NULL
,whichChan
? OPEN_EXISTING
: OPEN_ALWAYS
,FILE_ATTRIBUTE_TEMPORARY
,NULL
);
75 CloseHandle(m_lockmutex
);
81 if (m_file
!= INVALID_HANDLE_VALUE
&&
82 ((mapsize
=GetFileSize(m_file
,NULL
)) < SHM_HDRSIZE
+SHM_MINSIZE
*2 ||
83 mapsize
== 0xFFFFFFFF))
86 memset(buf
,0,sizeof(buf
));
89 int sz
=shmsize
*2 + SHM_HDRSIZE
;
94 if (a
>sizeof(buf
))a
=sizeof(buf
);
95 WriteFile(m_file
,buf
,a
,&d
,NULL
);
101 if (m_file
!=INVALID_HANDLE_VALUE
)
102 m_filemap
=CreateFileMapping(m_file
,NULL
,PAGE_READWRITE
,0,0,NULL
);
106 m_mem
=(unsigned char *)MapViewOfFile(m_filemap
,FILE_MAP_WRITE
,0,0,0);
110 m_events
[0]=CreateEvent(NULL
,false,false,tmp
.Get());
111 tmp
.Get()[strlen(tmp
.Get())-1]++;
112 m_events
[1]=CreateEvent(NULL
,false,false,tmp
.Get());
124 WDL_SHM_Connection::~WDL_SHM_Connection()
126 if (m_mem
) UnmapViewOfFile(m_mem
);
127 if (m_filemap
) CloseHandle(m_filemap
);
128 if (m_file
!= INVALID_HANDLE_VALUE
) CloseHandle(m_file
);
129 DeleteFile(m_tempfn
.Get());
131 if (m_events
[0]) CloseHandle(m_events
[0]);
132 if (m_events
[1]) CloseHandle(m_events
[1]);
135 ReleaseMutex(m_lockmutex
);
136 CloseHandle(m_lockmutex
);
140 bool WDL_SHM_Connection::WantSendKeepAlive() { return false; }
142 int WDL_SHM_Connection::Run()
144 if (!m_mem
) return -1;
146 int *hdr
= (int *)m_mem
;
148 int shm_size
= hdr
[0];
150 // todo: check to see if we just opened, if so, have a grace period
151 if (shm_size
< SHM_MINSIZE
) return -1;
153 m_mem
[4*5 + !!m_whichChan
] = 1;
154 if (m_timeout_sec
> 0)
156 if (m_mem
[4*5 + !m_whichChan
])
159 m_last_recvt
=time(NULL
);
160 m_mem
[4*5 + !m_whichChan
]=0;
164 if (time(NULL
) > m_timeout_sec
+m_last_recvt
)
166 if (m_timeout_cnt
>= 4) return -1;
169 m_last_recvt
=time(NULL
);
177 int send_avail
=send_queue
.Available();
180 int wc
= !m_whichChan
;
181 unsigned char *data
=m_mem
+SHM_HDRSIZE
+shm_size
*wc
;
182 int rdptr
= hdr
[1 + wc
*2]; // hopefully atomic
183 int wrptr
= hdr
[1 + wc
*2+1];
184 int wrlen
= shm_size
- (wrptr
-rdptr
);
187 if (wrlen
> send_avail
) wrlen
=send_avail
;
188 if (wrlen
> shm_size
) wrlen
=shm_size
; // should never happen !
190 int idx
= wrptr
& (shm_size
-1);
191 int l
= shm_size
- idx
;
192 if (l
> wrlen
) l
= wrlen
;
193 memcpy(data
+idx
,send_queue
.Get(),l
);
194 if (l
< wrlen
) memcpy(data
,(char*)send_queue
.Get() + l
,wrlen
-l
);
196 hdr
[1 + wc
*2+1] = wrptr
+ wrlen
; // advance write pointer, hopefluly atomic
200 send_queue
.Advance(wrlen
);
201 send_queue
.Compact();
208 int wc
= m_whichChan
;
209 unsigned char *data
=m_mem
+SHM_HDRSIZE
+shm_size
*wc
;
210 int rdptr
= hdr
[1 + wc
*2];
211 int wrptr
= hdr
[1 + wc
*2+1]; // hopefully atomic
212 int rdlen
= wrptr
-rdptr
;
215 if (rdlen
> shm_size
) rdlen
=shm_size
; // should never happen !
217 int idx
= rdptr
& (shm_size
-1);
218 int l
= shm_size
- idx
;
219 if (l
> rdlen
) l
= rdlen
;
220 recv_queue
.Add(data
+idx
,l
);
221 if (l
< rdlen
) recv_queue
.Add(data
,rdlen
-l
);
223 hdr
[1 + wc
*2] = wrptr
; // hopefully atomic, bring read pointer up to write pointer
230 if (m_events
[!m_whichChan
]) SetEvent(m_events
[!m_whichChan
]);
242 #include <sys/socket.h>
244 #include <sys/types.h>
245 #include <sys/time.h>
246 #include <sys/errno.h>
247 #include <sys/fcntl.h>
249 #include <sys/file.h>
251 #include "swell/swell-internal.h"
253 static void sigpipehandler(int sig
) { }
256 WDL_SHM_Connection::WDL_SHM_Connection(bool whichChan
, // first created must be whichChan=0
257 const char *uniquestring
, // identify
260 int extra_flags
// set 1 for lockfile use on master
263 m_sockbufsize
= shmsize
;
264 if (m_sockbufsize
<16384) m_sockbufsize
=16384;
265 else if (m_sockbufsize
>1024*1024) m_sockbufsize
=1024*1024;
267 m_rdbufsize
= wdl_min(m_sockbufsize
,65536);
268 m_rdbuf
= (char *)malloc(m_rdbufsize
);
270 static bool hasSigHandler
;
273 signal(SIGPIPE
,sigpipehandler
);
277 m_timeout_sec
=timeout_sec
;
278 m_last_recvt
=time(NULL
)+3; // grace period
279 m_next_keepalive
= time(NULL
)+1;
281 m_tempfn
.Set("/tmp/WDL_SHM.");
282 m_tempfn
.Append(uniquestring
);
283 m_tempfn
.Append(".tmp");
286 m_sockaddr
=malloc(sizeof(struct sockaddr_un
) + strlen(m_tempfn
.Get()));
291 m_whichChan
= whichChan
;
293 struct sockaddr_un
*addr
= (struct sockaddr_un
*)m_sockaddr
;
294 addr
->sun_family
= AF_UNIX
;
295 strcpy(addr
->sun_path
,m_tempfn
.Get());
297 int l
= SUN_LEN(addr
)+1;
306 m_lockfn
.Set(m_tempfn
.Get());
307 m_lockfn
.Append(".lock");
308 m_lockhandle
= open(m_lockfn
.Get(),O_RDWR
|O_CREAT
,0666);
309 if (m_lockhandle
< 0) return; // error getting lockfile, fail
310 if (flock(m_lockhandle
,LOCK_NB
|LOCK_EX
) < 0)
314 return; // could not lock
318 if (m_listen_socket
<0) return;
323 if (stat(addr
->sun_path
,&sbuf
))
328 int s
= socket(AF_UNIX
,SOCK_STREAM
,0);
331 int bsz
=m_sockbufsize
;
332 setsockopt(s
,SOL_SOCKET
,SO_SNDBUF
,(char *)&bsz
,sizeof(bsz
));
334 setsockopt(s
,SOL_SOCKET
,SO_RCVBUF
,(char *)&bsz
,sizeof(bsz
));
336 if (connect(s
,(struct sockaddr
*)addr
,SUN_LEN(addr
)))
342 fcntl(s
, F_SETFL
, fcntl(s
,F_GETFL
) | O_NONBLOCK
);
345 // clean up the filesystem, our connection has been made
346 unlink(m_tempfn
.Get());
350 if (m_socket
>=0 || m_listen_socket
>=0)
352 SWELL_InternalObjectHeader_SocketEvent
*se
= (SWELL_InternalObjectHeader_SocketEvent
*)malloc(sizeof(SWELL_InternalObjectHeader_SocketEvent
));
353 memset(se
,0,sizeof(SWELL_InternalObjectHeader_SocketEvent
));
354 se
->hdr
.type
= INTERNAL_OBJECT_EXTERNALSOCKET
;
356 se
->socket
[0]=m_socket
>=0? m_socket
: m_listen_socket
;
357 m_waitevt
= (HANDLE
)se
;
361 WDL_SHM_Connection::~WDL_SHM_Connection()
363 if (m_listen_socket
>=0 || m_socket
>=0)
365 if (m_socket
>=0) close(m_socket
);
366 if (m_listen_socket
>=0) close(m_listen_socket
);
368 // only delete temp socket file if the master and successfully had something open
369 if (!m_whichChan
&& m_tempfn
.Get()[0]) unlink(m_tempfn
.Get());
372 free(m_waitevt
); // don't CloseHandle(), since it's just referencing our socket
378 flock(m_lockhandle
,LOCK_UN
);
380 unlink(m_lockfn
.Get());
384 bool WDL_SHM_Connection::WantSendKeepAlive()
386 return !send_queue
.GetSize() && time(NULL
) >= m_next_keepalive
;
390 int WDL_SHM_Connection::Run()
394 if (m_listen_socket
< 0) return -1;
396 struct sockaddr_un remote
={0,};
397 socklen_t t
= sizeof(struct sockaddr_un
);
398 int s
= accept(m_listen_socket
,(struct sockaddr
*)&remote
,&t
);
401 close(m_listen_socket
);
404 fcntl(s
, F_SETFL
, fcntl(s
,F_GETFL
) | O_NONBLOCK
); // nonblocking
406 int bsz
=m_sockbufsize
;
407 setsockopt(s
,SOL_SOCKET
,SO_SNDBUF
,(char *)&bsz
,sizeof(bsz
));
409 setsockopt(s
,SOL_SOCKET
,SO_RCVBUF
,(char *)&bsz
,sizeof(bsz
));
413 SWELL_InternalObjectHeader_SocketEvent
*se
= (SWELL_InternalObjectHeader_SocketEvent
*)m_waitevt
;
420 if (m_timeout_sec
>0 && time(NULL
) > m_timeout_sec
+m_last_recvt
)
422 if (m_timeout_cnt
>= 2) return -1;
424 m_last_recvt
=time(NULL
);
435 while (recv_queue
.Available()<128*1024*1024)
437 int n
=read(m_socket
,m_rdbuf
,m_rdbufsize
);
440 recv_queue
.Add(m_rdbuf
,n
);
444 else if (n
<0&&errno
!=EAGAIN
) goto abortClose
;
447 while (send_queue
.Available()>0)
449 int n
= send_queue
.Available();
450 if (n
> m_rdbufsize
) n
=m_rdbufsize
;
451 n
= write(m_socket
,send_queue
.Get(),n
);
456 send_queue
.Advance(n
);
458 else if (n
<0&&errno
!=EAGAIN
) goto abortClose
;
463 if (sendcnt
) send_queue
.Compact();
467 time_t now
= time(NULL
);
473 else if (now
> m_timeout_sec
+m_last_recvt
)
475 if (m_timeout_cnt
>= 3) return -1;
480 if (sendcnt
||send_queue
.GetSize()) m_next_keepalive
= now
+ (m_timeout_sec
+1)/2;
483 return sendcnt
||recvcnt
;
486 if (m_whichChan
) return -1;
493 SWELL_InternalObjectHeader_SocketEvent
*se
= (SWELL_InternalObjectHeader_SocketEvent
*)m_waitevt
;
494 se
->socket
[0]=m_listen_socket
;
499 return m_listen_socket
>= 0 ? 0 : -1;
502 void WDL_SHM_Connection::acquireListener()
504 // only ever called from whichChan==0
505 if (m_listen_socket
>=0) return; // no need to re-open
507 unlink(m_tempfn
.Get());
509 int s
= socket(AF_UNIX
,SOCK_STREAM
,0);
511 struct sockaddr_un
*addr
= (struct sockaddr_un
*)m_sockaddr
;
512 if (bind(s
,(struct sockaddr
*)addr
,SUN_LEN(addr
)) < 0 || listen(s
,1) < 0)
518 fcntl(s
, F_SETFL
, fcntl(s
,F_GETFL
) | O_NONBLOCK
);