backend/net: try to prevent hangup on conntection/read failure
[chiroptera.git] / receiver.d
blobb88eda1f809d216021b6b33c01bc50c45af54796
1 /* E-Mail Client
2 * coded by Ketmar // Invisible Vector <ketmar@ketmar.no-ip.org>
3 * Understanding is not required. Only obedience.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, version 3 of the License ONLY.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 // mail/nntp receiver thread
18 module receiver is aliced;
19 private:
21 //version = debug_filter_helper;
22 //version = debug_updater;
24 import std.concurrency;
26 import iv.cmdcon;
27 import iv.dynstring;
28 import iv.strex;
29 import iv.sq3;
30 import iv.timer : DurTimer = Timer;
31 import iv.utfutil;
32 import iv.vfs.io;
34 import iv.egra;
36 import chibackend;
37 import chibackend.net;
39 import chievents;
42 // ////////////////////////////////////////////////////////////////////////// //
43 class RealFilterHelper : FilterHelper {
44 public:
45 enum {
46 // only one of these can be set
47 ActFlagDelete = 1u<<0,
48 ActFlagPurge = 1u<<1,
50 // only one of these can be set
51 ActFlagSpam = 1u<<2,
52 ActFlagHam = 1u<<3,
54 ActFlagRead = 1u<<4,
55 ActFlagStop = 1u<<5,
58 public:
59 DynStr account;
60 DynStr tag; // destination tag
61 uint actFlags; // see above
62 DynStr message;
63 bool matched;
65 public:
66 ~this () nothrow @nogc { account.clear(); tag.clear(); message.clear(); }
68 final @property bool isDelete () const pure nothrow @safe @nogc { pragma(inline, true); return !!(actFlags&ActFlagDelete); }
69 final @property bool isPurge () const pure nothrow @safe @nogc { pragma(inline, true); return !!(actFlags&ActFlagPurge); }
70 final @property bool isSpam () const pure nothrow @safe @nogc { pragma(inline, true); return !!(actFlags&ActFlagSpam); }
71 final @property bool isHam () const pure nothrow @safe @nogc { pragma(inline, true); return !!(actFlags&ActFlagHam); }
72 final @property bool isRead () const pure nothrow @safe @nogc { pragma(inline, true); return !!(actFlags&ActFlagRead); }
73 final @property bool isStop () const pure nothrow @safe @nogc { pragma(inline, true); return !!(actFlags&ActFlagStop); }
75 // called if a filter was matched
76 override void filterMatched () {
77 matched = true;
80 override DynStr getAccount () {
81 return account;
84 override DynStr getHeaderField (const(char)[] header, out bool exists) {
85 auto headers = message.getData[0..findHeadersEnd(message.getData)];
86 auto value = findHeaderField(headers, header);
87 exists = (value !is null);
88 version(debug_filter_helper) writeln("...getHeaderField(", header, "): exists=", exists, "; res=", value);
89 return DynStr(value);
92 override DynStr getFromName () {
93 auto headers = message.getData[0..findHeadersEnd(message.getData)];
94 auto value = findHeaderField(headers, "From").extractName;
95 version(debug_filter_helper) writeln("...getFromName: res=", value);
96 return DynStr(value);
99 override DynStr getFromMail () {
100 auto headers = message.getData[0..findHeadersEnd(message.getData)];
101 auto value = findHeaderField(headers, "From").extractMail;
102 if (value.length == 0) value = "nobody@nowhere";
103 version(debug_filter_helper) writeln("...getFromMail: res=", value);
104 return DynStr(value);
107 override DynStr getToName () {
108 auto headers = message.getData[0..findHeadersEnd(message.getData)];
109 auto value = findHeaderField(headers, "To").extractName;
110 version(debug_filter_helper) writeln("...getToName: res=", value);
111 return DynStr(value);
114 override DynStr getToMail () {
115 auto headers = message.getData[0..findHeadersEnd(message.getData)];
116 auto value = findHeaderField(headers, "To").extractMail;
117 if (value.length == 0) value = "nobody@nowhere";
118 version(debug_filter_helper) writeln("...getToMail: res=", value);
119 return DynStr(value);
122 override DynStr getSubj (out bool exists) {
123 auto headers = message.getData[0..findHeadersEnd(message.getData)];
124 auto value = findHeaderField(headers, "Subject");
125 exists = (value !is null);
126 if (exists) value = value.decodeSubj.subjRemoveRe;
127 return DynStr(value);
130 override DynStr exec (const(char)[] command) {
131 /*version(debug_filter_helper)*/ writeln("...exec: <", command, ">");
132 //return DynStr("nothing");
133 import std.stdio : File;
134 import std.process;
135 try {
136 // write article to file
137 import std.uuid;
138 UUID id = randomUUID();
139 DynStr buf;
140 void deleteTempFile () {
141 if (buf.length) try { import std.file : remove; remove(buf.getData); } catch (Exception e) {}
143 scope(exit) deleteTempFile();
144 buf.reserve(2+16*2+42);
145 buf ~= "/tmp/_temp_";
146 foreach (immutable ubyte b; id.data[]) {
147 buf ~= "0123456789abcdef"[b>>4];
148 buf ~= "0123456789abcdef"[b&0x0f];
150 buf ~= ".eml";
152 auto fo = VFile(buf.getData, "w");
153 fo.rawWriteExact(message.getData);
154 fo.close();
156 //!conwriteln("EXEC filter '", command, "'... (", buf.getData, ")");
157 auto pid = pipeProcess([command, /*"-D",*/ buf.getData], Redirect.all, null, Config.none, "/tmp");
158 string action = pid.stdout.readln.xstrip;
159 bool doStop = (action.length && action[0] == '-');
160 if (doStop) action = action[1..$].xstrip;
161 version(none) {
162 while (!pid.stderr.eof) conwriteln(" :", pid.stderr.readln.xstrip, "|");
164 pid.pid.wait();
165 //!conwriteln("EXEC filter '", command, "' action: ", action, " (", doStop, ")");
166 return DynStr(action);
167 } catch (Exception e) {
168 conwriteln("EXEC filter error: ", e.msg);
170 return DynStr();
173 override void move (const(char)[] dest) {
174 version(debug_filter_helper) writeln("...move: <", dest, ">");
175 tag = dest;
178 override void performAction (Action action) {
179 version(debug_filter_helper) writeln("...performAction: ", action);
180 switch (action) {
181 case Action.Delete:
182 actFlags &= ~(ActFlagDelete|ActFlagPurge);
183 actFlags |= ActFlagPurge;
184 break;
185 case Action.SoftDelete:
186 actFlags &= ~(ActFlagDelete|ActFlagPurge);
187 actFlags |= ActFlagDelete;
188 break;
189 case Action.Spam:
190 actFlags &= ~(ActFlagSpam|ActFlagHam);
191 actFlags |= ActFlagSpam;
192 break;
193 case Action.Ham:
194 actFlags &= ~(ActFlagSpam|ActFlagHam);
195 actFlags |= ActFlagHam;
196 break;
197 case Action.Read:
198 actFlags |= ActFlagRead;
199 break;
200 case Action.Stop:
201 actFlags |= ActFlagStop;
202 break;
203 default:
205 import std.conv : to;
206 throw new FilterSyntaxException("action "~action.to!string~" should not end up in the handler");
211 override bool match (const(char)[] pat, const(char)[] str, bool casesens) {
212 version(debug_filter_helper) writeln("...match: casesens=", casesens, "; pat=<", pat, ">; str=<", str, ">");
213 immutable bool bol = (pat.length && pat[0] == '^');
214 if (bol) pat = pat[1..$];
215 immutable bool eol = (pat.length && pat[$-1] == '$');
216 if (eol) pat = pat[0..$-1];
217 version(debug_filter_helper) writeln("...match: bol=", bol, "; eol=", eol, "; pat=<", pat, ">");
218 if (pat.length == 0) return (bol && eol ? str.length == 0 : false);
219 if (str.length < pat.length) return false;
220 if (bol && eol) { if (str.length != pat.length) return false; }
221 else if (bol) str = str[0..pat.length];
222 else if (eol) str = str[str.length-pat.length..$];
223 if (casesens) {
224 return (str.indexOf(pat) >= 0);
225 } else {
226 while (str.length >= pat.length) {
227 if (str.startsWithCI(pat)) {
228 //writeln("...match: HIT! str=<", str, ">");
229 return true;
231 str = str[1..$];
232 //writeln("...match: skip; str=<", str, ">; pat=<", pat, ">");
234 //writeln("...match: FAIL!");
235 return false;
239 void writeResult() () const {
240 if (isDelete) write(" softdelete");
241 if (isPurge) write(" purge");
242 if (isSpam) write(" spam");
243 if (isHam) write(" ham");
244 if (isRead) write(" read");
245 write("; dest tag: ", tag.getData);
250 // ////////////////////////////////////////////////////////////////////////// //
251 __gshared bool updatesDisabled = false;
252 __gshared bool rcDisabled = false;
253 __gshared bool rcStarted = false;
254 __gshared Tid controlThreadId;
257 enum ControlReply {
258 Quit,
261 struct ControlCommand {
262 enum Kind {
263 Ping,
264 ForceUpdateAll,
265 Quit,
267 CheckDone,
268 CheckError,
270 DisableUpdates,
271 EnableUpdates,
273 Kind type;
274 // for CheckDone or CheckError
275 uint accid;
277 @disable this ();
278 this (Kind atype) nothrow @safe @nogc { type = atype; accid = 0; }
279 this (Kind atype, uint aid) nothrow @safe @nogc { type = atype; accid = aid; }
283 struct CheckCommand {
284 uint accid;
288 static stmtAccInfo = LazyStatement!"Conf"(`
289 SELECT
290 accid AS accid
291 , checktime AS checktime
292 , nosendauth AS nosendauth
293 , debuglog AS debuglog
294 , nntplastindex AS nntplastindex
295 , name AS name
296 , recvserver AS recvserver
297 , sendserver AS sendserver
298 , user AS user
299 , pass AS pass
300 , inbox AS inbox
301 , nntpgroup AS nntpgroup
302 , email AS email
303 FROM accounts
304 WHERE accid=:accid
305 LIMIT 1
306 ;`);
309 static stmtSetCheckTime = LazyStatement!"Conf"(`
310 INSERT INTO checktimes(accid,lastcheck) VALUES(:accid,:lastcheck)
311 ON CONFLICT(accid)
312 DO UPDATE SET lastcheck=:lastcheck
313 ;`);
316 //==========================================================================
318 // forEachTag
320 // return `false` from delegate to stop
322 //==========================================================================
323 void forEachTag (const(char)[] tags, bool delegate (const(char)[] tag) dg) {
324 if (dg is null) return;
325 auto anchor = tags;
326 while (tags.length) {
327 auto stp = tags.indexOf('|');
328 if (stp < 0) stp = cast(uint)tags.length;
329 auto tag = tags[0..stp];
330 tags = tags[(stp < tags.length ? stp+1 : tags.length)..$];
331 if (tag.length == 0) continue;
332 if (!dg(tag)) break;
337 //==========================================================================
339 // extractAccount
341 //==========================================================================
342 DynStr extractAccount (const(char)[] tags) {
343 auto stp = tags.indexOf("account:");
344 while (stp >= 0) {
345 if (stp == 0 || tags[stp-1] == '|') {
346 tags = tags[stp+8..$];
347 stp = tags.indexOf('|');
348 if (stp >= 0) tags = tags[0..stp];
349 return DynStr(tags);
352 return DynStr();
356 //==========================================================================
358 // extractFirstFolder
360 // can return empty string
362 //==========================================================================
363 DynStr extractFirstFolder (const(char)[] tags) {
364 DynStr res;
365 forEachTag(tags, (tag) {
366 if (tag[0] != '/') return true; // go on
367 res = tag;
368 return false; // stop
370 return res;
374 //==========================================================================
376 // removeFirstFolder
378 // can return empty tags string
380 //==========================================================================
381 DynStr removeFirstFolder (const(char)[] tags) {
382 DynStr res;
383 bool seenFolder = false;
384 forEachTag(tags, (tag) {
385 if (!seenFolder && tag[0] == '/') {
386 seenFolder = true;
387 } else {
388 if (res.length) res ~= "|";
389 res ~= tag;
391 return true; // go on
393 return res;
397 // ////////////////////////////////////////////////////////////////////////// //
398 static struct TagInfo {
399 uint tagid;
400 DynStr name;
401 bool wasUpdates;
405 //==========================================================================
407 // getMessageTags
409 //==========================================================================
410 void getMessageTags (ref TagInfo[] tags, uint uid) {
411 auto stGetTags = LazyStatement!"View"(`
412 SELECT
413 DISTINCT(threads.tagid) AS tagid
414 , tn.tag AS name
415 FROM threads
416 INNER JOIN tagnames AS tn USING(tagid)
417 WHERE uid=:uid
418 ;`);
420 tags.reserve(64);
421 foreach (auto row; stGetTags.st.bind(":uid", uid).range) {
422 tags ~= TagInfo(row.tagid!uint, DynStr(row.name!SQ3Text));
427 //==========================================================================
429 // updateTwittedThreadsInTag
431 //==========================================================================
432 void updateTwittedThreadsInTag (uint tagid, uint uid) {
433 auto stTempTbl = LazyStatement!"View"(`
434 INSERT INTO mutepairs
435 WITH RECURSIVE children(muid, paruid, mtagid, mmute) AS (
436 SELECT 0, :uid, :tagid, 666
437 UNION ALL
438 SELECT
439 tt.uid, tt.uid, mtagid, tt.mute
440 FROM children AS cc
441 INNER JOIN threads AS tt ON
442 tt.tagid=cc.mtagid AND
443 tt.parent=cc.paruid AND
444 tt.uid<>cc.muid AND
445 tt.uid<>cc.paruid
447 SELECT
448 muid AS muid
449 , mtagid AS mtagid
450 FROM children
451 WHERE muid<>0 AND mmute=0
452 ;`);
454 auto stFixMutes = LazyStatement!"View"(`
455 UPDATE threads
457 mute=:mute
458 , appearance=iif(appearance=0,1,appearance)
459 FROM (SELECT muid, mtagid FROM mutepairs) AS cc
460 WHERE uid=cc.muid AND tagid=cc.mtagid AND mute=0
461 ;`);
463 // update threads
464 dbView.execute(`DELETE FROM mutepairs;`);
465 stTempTbl.st
466 .bind(":uid", uid)
467 .bind(":tagid", tagid)
468 .doAll();
469 stFixMutes.st
470 .bind(":mute", Mute.ThreadOther)
471 .doAll();
475 //==========================================================================
477 // createTwitByMsgid
479 //==========================================================================
480 public void createTwitByMsgid (uint uid, const(char)[] glob="/dmars_ng/*") {
481 if (!uid) return;
482 if (glob.length == 0) return;
484 auto stGetMsgid = LazyStatement!"View"(`
485 SELECT
486 msgid AS msgid
487 FROM msgids
488 WHERE uid=:uid
489 LIMIT 1
490 ;`);
492 DynStr msgid;
493 foreach (auto row; stGetMsgid.st.bind(":uid", uid).range) msgid = row.msgid!SQ3Text;
494 if (msgid.length == 0) return;
496 auto stFindMsgidTwit = LazyStatement!"Conf"(`
497 SELECT
499 FROM msgidtwits
500 WHERE msgid=:msgid AND tagglob=:glob
501 LIMIT 1
502 ;`);
504 // check if we already have such twit
505 foreach (auto row; stFindMsgidTwit.st.bindConstText(":msgid", msgid.getData).bindConstText(":glob", glob).range) return;
507 transacted!"Conf"{
508 auto stAddMsgidTwit = LazyStatement!"Conf"(`
509 INSERT INTO msgidtwits
510 (etwitid, automatic, tagglob, msgid)
511 VALUES(0, 0,:tagglob,:msgid)
512 ;`);
514 stAddMsgidTwit.st
515 .bindConstText(":tagglob", glob)
516 .bindConstText(":msgid", msgid.getData)
517 .doAll();
520 TagInfo[] tags;
521 scope(exit) delete tags;
522 getMessageTags(ref tags, uid);
523 if (tags.length == 0) return; // just in case
525 twitPrepare();
527 auto stUpdateMute = LazyStatement!"View"(`
528 UPDATE threads
530 mute=:mute
531 , title=NULL
532 , appearance=iif(appearance=0,1,appearance)
533 WHERE uid=:uid AND tagid=:tagid AND mute=0
534 ;`);
536 // mark the message as twitted
537 transacted!"View"{
538 foreach (ref TagInfo ti; tags) {
539 stUpdateMute.st
540 .bind(":uid", uid)
541 .bind(":tagid", ti.tagid)
542 .bind(":mute", Mute.ThreadStart)
543 .doAll();
544 updateTwittedThreadsInTag(ti.tagid, uid);
550 //==========================================================================
552 // twitPrepare
554 //==========================================================================
555 public void twitPrepare () {
556 dbView.execute(`
557 CREATE TEMP TABLE IF NOT EXISTS mutepairs(
558 muid INTEGER
559 , mtagid INTEGER
565 //==========================================================================
567 // twitMessage
569 // set "mute" flag according to message filters
571 //==========================================================================
572 public void twitMessage (uint uid) {
573 if (!uid) return;
575 TagInfo[] tags;
576 scope(exit) delete tags;
577 getMessageTags(ref tags, uid);
578 if (tags.length == 0) return; // just in case
580 auto stUpdateMute = LazyStatement!"View"(`
581 UPDATE threads
583 mute=:mute
584 , title=:title
585 , appearance=iif(appearance=0,1,appearance)
586 WHERE uid=:uid AND tagid=:tagid AND mute=0
587 ;`);
589 DynStr fromMail, fromName;
591 foreach (auto row; dbView.statement(`
592 SELECT
593 from_name AS fromName
594 , from_mail AS fromMail
595 FROM info
596 WHERE uid=:uid
597 LIMIT 1
598 ;`).bind(":uid", uid).range)
600 fromMail = row.fromMail!SQ3Text;
601 fromName = row.fromName!SQ3Text;
604 if (!chiroGetMessageFrom(uid, ref fromMail, ref fromName)) return;
606 uint ttcount = 0;
607 if (fromMail.length != 0 || fromName.length != 0) {
608 foreach (auto trow; dbConf.statement(`
609 SELECT
610 tagglob AS tagglob
611 , email AS email
612 , name AS name
613 , title AS title
614 FROM emailtwits
615 ;`).range)
617 auto email = trow.email!SQ3Text;
618 auto name = trow.name!SQ3Text;
619 auto glob = trow.tagglob!SQ3Text;
620 if (glob.length == 0 || (!email.length && !name.length)) continue; // just in case
621 // check for filter match
622 if (email.length && !globmatchCI(fromMail, email)) continue;
623 if (name.length && !globmatchCI(fromName, name)) continue;
624 auto title = trow.title!SQ3Text;
625 // for all tags
626 foreach (ref TagInfo ti; tags) {
627 if (ti.wasUpdates) continue;
628 if (!globmatch(ti.name, glob)) continue;
629 stUpdateMute.st
630 .bind(":uid", uid)
631 .bind(":tagid", ti.tagid)
632 .bind(":mute", Mute.ThreadStart)
633 .bindConstText(":title", (title.length ? title : null), allowNull:true)
634 .doAll();
635 ti.wasUpdates = true;
636 ++ttcount;
638 if (ttcount == tags.length) break;
642 // mute it if it is not muted, but should be
643 static auto statFindParentFor = LazyStatement!"View"(`
644 SELECT mute AS mute, parent AS parent
645 FROM threads
646 WHERE tagid=:tagid AND uid=:uid
647 LIMIT 1
648 ;`);
650 foreach (TagInfo ti; tags) {
651 auto mute = chiroGetMessageMute(ti.tagid, uid);
652 if (mute > Mute.Normal) {
653 ti.wasUpdates = true; // just in case
654 if (!ttcount) ttcount = 1;
655 continue;
657 uint puid = uid;
658 while (puid) {
659 statFindParentFor.st
660 .bind(":tagid", ti.tagid)
661 .bind(":uid", puid);
662 puid = 0;
663 foreach (auto prow; statFindParentFor.st.range) {
664 if (prow.mute!int > Mute.Normal) {
665 chiroSetMessageMute(ti.tagid, uid, Mute.ThreadOther);
666 ti.wasUpdates = true; // just in case
667 if (!ttcount) ttcount = 1;
668 } else {
669 puid = prow.parent!uint;
676 if (!ttcount) return;
678 // update threads
679 foreach (ref TagInfo ti; tags) {
680 if (!ti.wasUpdates) continue;
681 updateTwittedThreadsInTag(ti.tagid, uid);
686 //==========================================================================
688 // updateViewDB
690 // check for new messages, and update view database
692 //==========================================================================
693 public void updateViewDB (bool skipFilters=false) {
694 uint maxViewUid = 0;
695 uint maxStoreUid = 0;
697 twitPrepare();
699 foreach (auto row; dbView.statement(`SELECT MAX(uid) AS uid FROM info;`).range) maxViewUid = row.uid!uint;
700 foreach (auto row; dbStore.statement(`SELECT MAX(uid) AS uid FROM messages;`).range) maxStoreUid = row.uid!uint;
702 if (maxViewUid >= maxStoreUid) return;
703 conwriteln("need to process around ", maxStoreUid-maxViewUid, " messages.");
705 uint[] relinkTids;
706 relinkTids.reserve(64);
707 scope(exit) delete relinkTids;
709 foreach (uint uid; maxViewUid+1..maxStoreUid+1) {
710 DynStr msg, tags;
711 foreach (auto row; dbStore.statement(`
712 SELECT tags AS tags, ChiroUnpack(data) AS data FROM messages WHERE uid=:uid LIMIT 1
713 ;`).bind(":uid", uid).range)
715 msg = row.data!SQ3Text;
716 tags = row.tags!SQ3Text;
718 if (msg.length == 0 || tags.length == 0) continue; // not interesting
720 conwriteln("============ message #", uid, " ============");
722 DynStr acc = tags.extractAccount();
723 DynStr origTags = tags;
724 RealFilterHelper hlp;
725 scope(exit) delete hlp;
726 bool markSpamHam = false; //!!!
728 if (!skipFilters) {
729 DynStr deftag = tags.extractFirstFolder();
730 tags = tags.removeFirstFolder();
732 hlp = new RealFilterHelper;
733 hlp.account = acc;
734 hlp.tag = deftag;
735 if (hlp.tag.length == 0) hlp.tag = "#hobo";
736 hlp.message = msg;
737 // filter
738 foreach (auto row; dbConf.statement(`SELECT filterid AS filterid, name AS name, body AS body FROM filters ORDER BY idx;`).range) {
739 //conwrite(" filter '", row.name!SQ3Text, "' (", row.filterid!uint, "): ");
740 bool goOn = false;
741 hlp.matched = false;
742 try {
743 //version(debug_filter_helper) writeln("::: <", row.body!SQ3Text, ">");
744 goOn = executeMailFilter(row.body!SQ3Text, hlp);
745 } catch (Exception e) {
746 conwriteln("ERROR IN FILTER '", row.name!SQ3Text, "': ", e.msg);
748 if (hlp.matched) {
749 conwriteln("...filter '", row.name!SQ3Text, " matched!");
751 //hlp.writeResult(); writeln;
752 //version(debug_filter_helper) writeln("::: <", row.body!SQ3Text, ">: goon=", goOn, "; isstop=", hlp.isStop);
753 //assert(hlp.isStop == !goOn);
754 if (hlp.isStop) break;
756 //write(" FINAL RESULT:"); hlp.writeResult(); writeln;
757 // done filtering
759 markSpamHam = false; //!!!
760 if (!hlp.isSpam && !hlp.isHam) {
761 auto bogo = messageBogoCheck(uid);
762 if (bogo == Bogo.Spam) {
763 bool exists;
764 conwriteln("BOGO: SPAM message #", uid, "; from={", hlp.getFromName.getData, "}:<", hlp.getFromMail.getData, ">; to={",
765 hlp.getToName.getData, "}:<", hlp.getToMail.getData, ">; subj=", hlp.getSubj(out exists).getData);
766 hlp.performAction(hlp.Action.Spam);
767 markSpamHam = false;
771 if (hlp.isSpam) hlp.tag = "#spam"; // always
773 if (hlp.tag.length == 0) hlp.tag = deftag; // just in case
774 bool hasTag = false;
775 forEachTag(tags, (xtag) {
776 if (xtag == hlp.tag) {
777 hasTag = true;
778 return false; // stop
780 return true; // go on
783 // `tags` should contain our new tags
784 if (!hasTag) {
785 DynStr tt = hlp.tag;
786 if (tags.length) {
787 tt ~= "|";
788 tt ~= tags;
790 tags = tt;
794 // update tags info in the storage
795 if (tags != origTags) {
796 transacted!"Store"{
797 dbStore.statement(`UPDATE messages SET tags=:tags WHERE uid=:uid;`)
798 .bind(":uid", uid)
799 .bindConstText(":tags", tags.getData)
800 .doAll();
804 // insert the message into the view db
805 int appearance = (skipFilters ? Appearance.Read : Appearance.Unread);
806 if (hlp !is null && hlp.isDelete) appearance = Appearance.SoftDeleteFilter;
807 else if (hlp !is null && hlp.isPurge) appearance = Appearance.SoftDeletePurge;
808 if (hlp !is null && appearance == Appearance.Unread && (hlp.isRead || hlp.isSpam)) appearance = Appearance.Read;
809 if (hlp !is null && markSpamHam) {
810 if (hlp.isSpam) messageBogoMarkSpam(uid);
811 if (hlp.isHam) messageBogoMarkHam(uid);
814 uint msgtime = 0;
815 DynStr hdr, body;
816 foreach (auto trow; dbStore.statement(`
817 SELECT
818 ChiroExtractHeaders(:msgdata) AS headers
819 , ChiroExtractBody(:msgdata) AS body
820 , ChiroHdr_RecvTime(:msgdata) AS msgtime
821 ;`).bindConstText(":msgdata", msg.getData).range)
823 msgtime = trow.msgtime!uint;
824 hdr = trow.headers!SQ3Text;
825 body = trow.body!SQ3Text;
828 conwriteln("putting msg ", uid, " (time:", msgtime, "; appr=", appearance, ") to '", tags.getData, "'; oldtags='", origTags.getData, "'");
830 transacted!"View"{
831 //dbView.beginTransaction();
832 //scope(success) dbView.commitTransaction();
833 //scope(failure) dbView.rollbackTransaction();
834 chiroParseAndInsertOneMessage(uid, msgtime, appearance, hdr, body, tags);
836 DynStr msgid;
837 foreach (auto mrow; dbView.statement(`SELECT msgid AS msgid FROM msgids WHERE uid=:uid LIMIT 1;`).bind(":uid", uid).range) {
838 msgid = mrow.msgid!SQ3Text;
840 //if (msgid.length == 0) return;
841 version(debug_updater) {
843 auto fo = VFile("zzz", "a");
844 fo.writeln("MSGUID: ", uid, "; MSGID: <", msgid.getData, ">");
847 conwriteln("MSGUID: ", uid, "; MSGID: <", msgid.getData, ">");
849 // collect tags to modify
850 int[] taglist;
851 scope(exit) delete taglist;
852 taglist.reserve(16);
854 foreach (auto trow; dbView.statement(`SELECT tagid AS tagid, parent AS parent FROM threads WHERE uid=:uid;`)
855 .bind(":uid", uid).range)
857 immutable uint tid = trow.tagid!uint;
858 if (tid) {
859 bool found = false;
860 foreach (immutable uint tt; relinkTids) if (tt == tid) { found = true; break; }
861 if (!found) relinkTids ~= tid;
863 if (!tid || trow.parent!uint || !chiroIsTagThreaded(tid)) continue;
864 conwriteln(" tagid: ", tid, " (", chiroGetTagName(tid).getData, ")");
865 version(debug_updater) {
867 auto fo = VFile("zzz", "a");
868 fo.writeln(" tagid: ", tid, " (", chiroGetTagName(tid).getData, ")");
871 taglist ~= tid;
874 foreach (immutable uint tid; taglist) {
875 uint setUsAsParentFor = 0;
876 bool needFullRelink = false;
877 // check if there are any references to us, and fix them by full relink
878 if (!msgid.length) continue;
879 foreach (auto nrow; dbView.statement(`
880 SELECT refids.uid AS uid, tt.parent AS parent
881 FROM refids
882 INNER JOIN(threads) AS tt
883 ON tt.tagid=:tagid AND tt.uid=refids.uid
884 WHERE idx=0 AND msgid=:msgid
885 LIMIT 1
886 ;`).bind(":tagid", tid).bindConstText(":msgid", msgid.getData).range)
888 if (nrow.parent!uint == 0) {
889 setUsAsParentFor = nrow.uid!uint;
890 } else {
891 needFullRelink = true;
895 if (needFullRelink) {
896 //FIXME: make this faster!
897 conwriteln(" tid: ", tid, " (", chiroGetTagName(tid).getData, "); performing full relink...");
898 chiroSupportRelinkTagThreads(tid);
899 continue;
902 if (setUsAsParentFor) {
903 conwriteln(" tid: ", tid, " (", chiroGetTagName(tid).getData, "); settuing us (", uid, ") as a parent for ", setUsAsParentFor);
904 dbView.statement(`
905 UPDATE threads
906 SET parent=:uid
907 WHERE uid=:xuid AND tagid=:tagid
908 ;`).bind(":uid", uid).bind(":xuid", setUsAsParentFor).bind(":tagid", tid).doAll();
911 // find parent for us
912 uint paruid = 0;
913 foreach (auto prow; dbView.statement(`
914 SELECT msgids.uid AS paruid
915 FROM msgids
916 INNER JOIN(threads) AS tt
917 ON tt.tagid=:tagid AND tt.uid=msgids.uid
918 WHERE msgids.uid<>:uid AND msgids.msgid IN (SELECT msgid FROM refids WHERE uid=:uid ORDER BY idx)
919 LIMIT 1
920 ;`).bind(":uid", uid).bind(":tagid", tid).range)
922 paruid = prow.paruid!uint;
924 conwriteln(" tid: ", tid, " (", chiroGetTagName(tid).getData, "); paruid=", paruid);
925 if (paruid && paruid != uid) {
926 dbView.statement(`UPDATE threads SET parent=:paruid WHERE uid=:uid AND tagid=:tagid;`)
927 .bind(":uid", uid)
928 .bind(":tagid", tid)
929 .bind(":paruid", paruid)
930 .doAll();
934 twitMessage(uid);
938 // relink threads
939 if (relinkTids.length) {
940 foreach (immutable uint tid; relinkTids) {
941 if (vbwin && !vbwin.closed) vbwin.postEvent(new TagThreadsUpdatedEvent(tid));
947 //==========================================================================
949 // checkerThread
951 //==========================================================================
952 void checkerThread (Tid ownerTid) {
953 uint accid = 0;
954 bool isError = false;
955 try {
956 receive(
957 (CheckCommand cmd) {
958 accid = cmd.accid;
962 if (accid == 0) {
963 ownerTid.send(ControlCommand(ControlCommand.Kind.CheckError, accid));
964 return;
967 bool found = false;
968 int checktime;
969 bool nosendauth;
970 bool debuglog;
971 uint nntplastindex;
972 DynStr name;
973 DynStr recvserver;
974 DynStr sendserver;
975 DynStr user;
976 DynStr pass;
977 DynStr inbox;
978 DynStr nntpgroup;
979 DynStr xemail;
981 foreach (auto arow; stmtAccInfo.st.bind(":accid", accid).range) {
982 // i found her!
983 found = true;
984 int upmins = arow.checktime!int;
985 if (upmins < 1) upmins = 1; else if (upmins > 100000) upmins = 100000;
986 checktime = upmins;
987 nosendauth = (arow.nosendauth!int > 0);
988 debuglog = (arow.debuglog!int > 0);
989 nntplastindex = arow.nntplastindex!uint;
990 name = arow.name!SQ3Text;
991 recvserver = arow.recvserver!SQ3Text;
992 sendserver = arow.sendserver!SQ3Text;
993 user = arow.user!SQ3Text;
994 pass = arow.pass!SQ3Text;
995 inbox = arow.inbox!SQ3Text;
996 nntpgroup = arow.nntpgroup!SQ3Text;
997 xemail = arow.email!SQ3Text;
1000 if (!found) {
1001 ownerTid.send(ControlCommand(ControlCommand.Kind.CheckError, accid));
1002 return;
1005 struct ToSend {
1006 uint uid;
1007 dynstring from;
1008 dynstring to;
1009 dynstring data;
1010 bool sent;
1012 ToSend[] sendQueue;
1013 scope(exit) {
1014 foreach (ref ToSend ss; sendQueue) { ss.from.clear; ss.to.clear; ss.data.clear; }
1015 sendQueue.length = 0;
1018 //FIXME: nntp sends!
1019 if (sendserver.length && (nntpgroup.length != 0 || xemail.length != 0)) {
1020 // check if we have something to send
1021 foreach (auto srow; dbView.statement(`
1022 SELECT uid AS uid, from_pop3 AS from_pop3, to_pop3 AS to_pop3, ChiroUnpack(data) AS data
1023 FROM unsent
1024 WHERE accid=:accid AND sendtime=0
1025 ;`).bind(":accid", accid).range)
1027 ToSend ss;
1028 ss.uid = srow.uid!uint;
1029 ss.from = srow.from_pop3!SQ3Text;
1030 ss.to = srow.to_pop3!SQ3Text;
1031 ss.data = srow.data!SQ3Text;
1032 sendQueue ~= ss;
1036 //FIXME: batch send!
1037 if (sendQueue.length) {
1038 conwriteln("sending ", sendQueue.length, " message", (sendQueue.length == 1 ? "" : "s"));
1039 foreach (ref ToSend ss; sendQueue) {
1040 try {
1041 if (nntpgroup.length == 0) {
1042 conwriteln("*** [", name, "]: connecting... (smtp)");
1043 SocketSMTP nsk = new SocketSMTP(sendserver.idup);
1044 scope(exit) { nsk.close(); delete nsk; }
1045 if (!nosendauth) {
1046 conwriteln("[", name, "]: authenticating...");
1047 nsk.auth(xemail.getData, user.getData, pass.getData);
1049 conwriteln("[", name, "]: sending (uid=", ss.uid, ")...");
1050 nsk.sendMessage(ss.from.getData, ss.to.getData, ss.data.getData);
1051 nsk.close();
1052 conwriteln("[", name, "]: closing...");
1053 } else {
1054 conwriteln("*** [", name, "]: connecting... (nntp)");
1055 SocketNNTP nsk = new SocketNNTP(recvserver.idup);
1056 scope(exit) { nsk.close(); delete nsk; }
1057 conwriteln("[", name, "]: selecting group (", nntpgroup, ")");
1058 nsk.selectGroup(nntpgroup.getData);
1059 conwriteln("[", name, "]: sending (uid=", ss.uid, ")...");
1060 nsk.doSend("POST");
1061 nsk.doSendRaw(ss.data.getData);
1062 conwriteln("[", name, "]: getting answer...");
1063 auto ln = nsk.readLine;
1064 conwriteln("[", name, "]: ", ln); // 340 Ok, recommended message-ID <o7dq4o$mpm$1@digitalmars.com>
1065 if (ln.length == 0 || ln[0] != '3') throw new Exception(ln.idup);
1066 conwriteln("[", name, "]: closing...");
1067 nsk.close();
1069 ss.sent = true;
1070 } catch (Exception e) {
1071 conwriteln("SENDING ERROR: ", e.msg);
1075 // mark sent messages
1076 transacted!"View"{
1077 foreach (ref ToSend ss; sendQueue) {
1078 if (ss.sent) {
1079 dbView.statement(`
1080 UPDATE unsent
1081 SET sendtime=CAST(strftime('%s','now') AS INTEGER), lastsendtime=CAST(strftime('%s','now') AS INTEGER)
1082 WHERE uid=:uid
1083 ;`).bind(":uid", ss.uid).doAll();
1084 } else {
1085 dbView.statement(`
1086 UPDATE unsent
1087 SET lastsendtime=CAST(strftime('%s','now') AS INTEGER)
1088 WHERE uid=:uid
1089 ;`).bind(":uid", ss.uid).doAll();
1096 conwriteln("checking account '", name, "' (", accid, ")...");
1098 stmtSetCheckTime.st.bind(":accid", accid).bind(":lastcheck", RunningAverageExp.GetTickCount()+checktime*60).doAll();
1100 // ////////////////////////////////////////////////////////////////// //
1101 void CheckNNTP () {
1102 auto nsk = new SocketNNTP(recvserver.idup);
1103 scope(exit) { nsk.close(); delete nsk; }
1105 nsk.selectGroup(nntpgroup);
1106 if (nsk.emptyGroup) {
1107 conwriteln("[", name, ":", nntpgroup, "]: no new articles (empty group)");
1108 return;
1111 uint stnum = nntplastindex+1;
1112 if (stnum > nsk.hiwater) {
1113 conwriteln("[", name, ":", nntpgroup, "]: no new articles");
1114 return;
1117 conwriteln("[", name, ":", nntpgroup, "]: ", nsk.hiwater+1-stnum, " (possible) new articles");
1119 // download new articles
1120 foreach (immutable uint anum; stnum..nsk.hiwater+1) {
1121 DynStr msg;
1122 try {
1123 msg = nsk.getArticle(anum);
1124 } catch (Exception e) {
1125 conwriteln("[", name, ":", nntpgroup, "]: error downloading article #", anum);
1126 break;
1128 if (msg.length == 0) continue; // this article is empty
1129 // insert article into the storage
1130 // filtering will be done later, for now, insert with the default inbox
1131 DynStr tags;
1132 if (inbox.length) tags ~= inbox;
1133 if (name.length) {
1134 if (tags.length) tags ~= "|";
1135 tags ~= "account:";
1136 tags ~= name;
1138 if (tags.length == 0) tags = "#hobo";
1139 conwriteln("[", name, ":", nntpgroup, "]: storing article #", anum, " for '", tags.getData, "'...");
1140 transacted!"Store"{
1141 dbStore.statement(`INSERT INTO messages(tags, data) VALUES(:tags, ChiroPack(:data));`)
1142 .bindConstText(":tags", tags.getData)
1143 .bindConstBlob(":data", msg.getData)
1144 .doAll();
1146 // update account with the new highest nntp index
1147 transacted!"Conf"{
1148 dbConf.statement(`UPDATE accounts SET nntplastindex=:anum WHERE accid=:accid;`)
1149 .bind(":accid", accid)
1150 .bind(":anum", anum)
1151 .doAll();
1156 // ////////////////////////////////////////////////////////////////// //
1157 void CheckSMTP () {
1158 conwriteln("*** [", name, "]: connecting...");
1159 auto pop3 = new SocketPOP3(recvserver.idup);
1160 scope(exit) { pop3.close(); delete pop3; }
1161 if (user.length) {
1162 conwriteln("[", name, "]: authenticating...");
1163 pop3.auth(user, pass);
1165 auto newmsg = pop3.getNewMailCount;
1166 if (newmsg == 0) {
1167 conwriteln("[", name, "]: no new messages");
1168 return;
1170 conwriteln("[", name, "]: ", newmsg, " new message", (newmsg > 1 ? "s" : ""));
1171 foreach (immutable int popidx; 1..newmsg+1) {
1172 DynStr msg;
1173 try {
1174 msg = pop3.getMessage(popidx); // full message, with the ending dot
1175 } catch (Exception e) {
1176 conwriteln("[", name, "]: error downloading message #", popidx);
1177 break;
1179 if (msg.length != 0) {
1180 DynStr tags;
1181 if (inbox.length) tags ~= inbox;
1182 if (name.length) {
1183 if (tags.length) tags ~= "|";
1184 tags ~= "account:";
1185 tags ~= name;
1187 if (tags.length == 0) tags = "#hobo";
1188 conwriteln("[", name, ":", nntpgroup, "]: storing message #", popidx, " for '", tags.getData, "'...");
1189 transacted!"Store"{
1190 dbStore.statement(`INSERT INTO messages(tags, data) VALUES(:tags, ChiroPack(:data));`)
1191 .bindConstText(":tags", tags.getData)
1192 .bindConstBlob(":data", msg.getData)
1193 .doAll();
1196 //auto msg = pop3.getMessage!true(popidx); // full message, with the ending dot, and exact terminators
1197 // process
1198 pop3.deleteMessage(popidx);
1202 // ////////////////////////////////////////////////////////////////// //
1203 try {
1204 if (nntpgroup.length) CheckNNTP(); else CheckSMTP();
1205 } catch (Throwable e) {
1206 conwriteln("ERROR checking account '", name, "' (", accid, "): ", e.msg);
1207 isError = true;
1210 conwriteln("done checking account '", name, "' (", accid, ")...");
1212 if (vbwin && !vbwin.closed) {
1213 //{ import core.stdc.stdio : fprintf, stderr; fprintf(stderr, "POSTING %s: accid=%u\n", (isError ? "ERROR".ptr : "DONE".ptr), accid); }
1214 vbwin.postEvent(new UpdatingAccountCompleteEvent(accid));
1215 //{ import core.stdc.stdio : fprintf, stderr; fprintf(stderr, "POSTED %s: accid=%u\n", (isError ? "ERROR".ptr : "DONE".ptr), accid); }
1216 //sqlite3_sleep(1000);
1218 } catch (Throwable e) {
1219 // here, we are dead and fucked (the exact order doesn't matter)
1220 //import core.stdc.stdlib : abort;
1221 import core.stdc.stdio : fprintf, stderr;
1222 //import core.memory : GC;
1223 import core.thread : thread_suspendAll;
1224 //GC.disable(); // yeah
1225 //thread_suspendAll(); // stop right here, you criminal scum!
1226 auto s = e.toString();
1227 fprintf(stderr, "\n=== FATAL ===\n%.*s\n", cast(uint)s.length, s.ptr);
1228 //abort(); // die, you bitch!
1229 ownerTid.send(ControlCommand(ControlCommand.Kind.CheckError, accid));
1230 return;
1232 //if (vbwin) vbwin.postEvent(evDoConCommands); }
1233 //{ import core.stdc.stdio : fprintf, stderr; fprintf(stderr, "SENDING %s: accid=%u\n", (isError ? "ERROR".ptr : "DONE".ptr), accid); }
1234 ownerTid.send(ControlCommand((isError ? ControlCommand.Kind.CheckError : ControlCommand.Kind.CheckDone), accid));
1235 //{ import core.stdc.stdio : fprintf, stderr; fprintf(stderr, "SENDT %s: accid=%u\n", (isError ? "ERROR".ptr : "DONE".ptr), accid); }
1237 import core.memory : GC;
1238 GC.collect();
1239 GC.minimize();
1241 //{ import core.stdc.stdio : fprintf, stderr; fprintf(stderr, "DONE with accid=%u\n", accid); }
1245 //==========================================================================
1247 // controlThread
1249 //==========================================================================
1250 void controlThread (Tid ownerTid) {
1251 import core.time;
1252 bool doQuit = false;
1253 try {
1254 static struct AccCheck {
1255 uint accid;
1256 bool inprogress;
1257 Tid tid;
1260 AccCheck[] accidCheckList;
1261 accidCheckList.reserve(128);
1263 dbConf.execute(`
1264 CREATE TEMP TABLE IF NOT EXISTS checktimes (
1265 accid INTEGER PRIMARY KEY UNIQUE /* unique, never zero */
1266 , lastcheck INTEGER NOT NULL DEFAULT 0
1267 , checking INTEGER NOT NULL DEFAULT 0
1271 static stmtAllAccs = LazyStatement!"Conf"(`
1272 SELECT
1273 accid AS accid
1274 , checktime AS checktime
1275 FROM accounts
1276 WHERE nocheck=0 AND inbox<>''
1277 ORDER BY accid
1278 ;`);
1280 static stmtGetCheckTime = LazyStatement!"Conf"(`
1281 SELECT lastcheck AS lastcheck FROM checktimes WHERE accid=:accid LIMIT 1
1282 ;`);
1284 updateViewDB();
1286 MonoTime lastCollect = MonoTime.currTime;
1287 //accidCheckList ~= AccCheck();
1289 bool needUpdates = false;
1290 bool forceAll = false;
1291 for (;;) {
1292 if (doQuit && accidCheckList.length == 0) break;
1293 receiveTimeout((doQuit ? 50.msecs : accidCheckList.length || needUpdates || forceAll ? 1.seconds : 60.seconds),
1294 (ControlCommand cmd) {
1295 final switch (cmd.type) {
1296 case ControlCommand.Kind.ForceUpdateAll: forceAll = true; break;
1297 case ControlCommand.Kind.Ping: break;
1298 case ControlCommand.Kind.Quit: doQuit = true; break;
1299 case ControlCommand.Kind.DisableUpdates: updatesDisabled = true; break;
1300 case ControlCommand.Kind.EnableUpdates: updatesDisabled = false; break;
1301 case ControlCommand.Kind.CheckDone:
1302 case ControlCommand.Kind.CheckError:
1303 needUpdates = true;
1304 if (accidCheckList.length) {
1305 foreach (immutable idx, const ref AccCheck nfo; accidCheckList) {
1306 if (nfo.accid == cmd.accid) {
1307 //if (!doQuit && vbwin && !vbwin.closed) vbwin.postEvent(new UpdatingAccountCompleteEvent(nfo.accid));
1308 foreach (immutable c; idx+1..accidCheckList.length) accidCheckList[c-1] = accidCheckList[c];
1309 accidCheckList.length -= 1;
1310 break;
1313 if (!doQuit && vbwin && !vbwin.closed && accidCheckList.length == 0) vbwin.postEvent(new UpdatingCompleteEvent());
1315 break;
1320 for (usize idx = 0; idx < accidCheckList.length; ) {
1321 if (accidCheckList[idx].accid != 0) {
1322 ++idx;
1323 } else {
1324 foreach (immutable c; idx+1..accidCheckList.length) accidCheckList[c-1] = accidCheckList[c];
1325 accidCheckList.length -= 1;
1329 if (doQuit) {
1330 for (usize idx = 0; idx < accidCheckList.length; ) {
1331 if (accidCheckList[idx].inprogress) {
1332 ++idx;
1333 } else {
1334 foreach (immutable c; idx+1..accidCheckList.length) accidCheckList[c-1] = accidCheckList[c];
1335 accidCheckList.length -= 1;
1338 continue;
1341 if (!needUpdates && !updatesDisabled) {
1342 ulong ctt = RunningAverageExp.GetTickCount();
1343 foreach (auto arow; stmtAllAccs.st.range) {
1344 bool found = false;
1345 foreach (const ref AccCheck nfo; accidCheckList) if (nfo.accid == arow.accid!uint) { found = true; break; }
1346 if (found) continue;
1347 // forced update?
1348 if (forceAll) {
1349 accidCheckList ~= AccCheck(arow.accid!uint);
1350 continue;
1352 // check timeout
1353 int upmins = arow.checktime!int;
1354 if (upmins < 1) upmins = 1; else if (upmins > 100000) upmins = 100000;
1355 ulong lastcheck = 0;
1356 foreach (auto crow; stmtGetCheckTime.st.bind(":accid", arow.accid!uint).range) lastcheck = crow.lastcheck!ulong;
1357 lastcheck += upmins*60; // next check time
1358 if (lastcheck < ctt) {
1359 // i found her!
1360 accidCheckList ~= AccCheck(arow.accid!uint);
1363 else {
1364 conwriteln("check for accid ", arow.accid!uint, " in ", (lastcheck-ctt)/60, " minutes...");
1368 forceAll = false;
1371 if (!updatesDisabled) {
1372 foreach (ref AccCheck nfo; accidCheckList) {
1373 if (nfo.inprogress) break;
1374 if (vbwin) vbwin.postEvent(new UpdatingAccountEvent(nfo.accid));
1375 nfo.tid = spawn(&checkerThread, thisTid);
1376 nfo.inprogress = true;
1377 nfo.tid.send(CheckCommand(nfo.accid));
1378 break;
1382 bool hasProgress = false;
1383 foreach (ref AccCheck nfo; accidCheckList) if (nfo.inprogress) { hasProgress = true; break; }
1384 if (!hasProgress) {
1385 updateViewDB();
1386 needUpdates = false;
1389 if (!doQuit) {
1390 immutable ctt = MonoTime.currTime;
1391 if ((ctt-lastCollect).total!"minutes" >= 1) {
1392 import core.memory : GC;
1393 lastCollect = ctt;
1394 GC.collect();
1395 GC.minimize();
1399 ownerTid.send(ControlReply.Quit);
1400 } catch (Throwable e) {
1401 // here, we are dead and fucked (the exact order doesn't matter)
1402 import core.stdc.stdlib : abort;
1403 import core.stdc.stdio : fprintf, stderr;
1404 import core.memory : GC;
1405 import core.thread : thread_suspendAll;
1406 GC.disable(); // yeah
1407 thread_suspendAll(); // stop right here, you criminal scum!
1408 auto s = e.toString();
1409 fprintf(stderr, "\n=== FATAL ===\n%.*s\n", cast(uint)s.length, s.ptr);
1410 abort(); // die, you bitch!
1415 //==========================================================================
1417 // receiverDisable
1419 //==========================================================================
1420 public void receiverDisable () {
1421 rcDisabled = true;
1425 //==========================================================================
1427 // disableMailboxUpdates
1429 //==========================================================================
1430 public void disableMailboxUpdates () {
1431 if (!rcStarted) return;
1432 controlThreadId.send(ControlCommand(ControlCommand.Kind.DisableUpdates));
1436 //==========================================================================
1438 // enableMailboxUpdates
1440 //==========================================================================
1441 public void enableMailboxUpdates () {
1442 if (!rcStarted) return;
1443 controlThreadId.send(ControlCommand(ControlCommand.Kind.EnableUpdates));
1447 //==========================================================================
1449 // receiverForceUpdateAll
1451 //==========================================================================
1452 public void receiverForceUpdateAll () {
1453 if (!rcStarted) return;
1454 controlThreadId.send(ControlCommand(ControlCommand.Kind.ForceUpdateAll));
1458 //==========================================================================
1460 // receiverInit
1462 //==========================================================================
1463 public void receiverInit () {
1464 if (rcStarted) return;
1465 if (rcDisabled) return;
1466 controlThreadId = spawn(&controlThread, thisTid);
1467 rcStarted = true;
1471 //==========================================================================
1473 // receiverDeinit
1475 //==========================================================================
1476 public void receiverDeinit () {
1477 if (!rcStarted) return;
1478 controlThreadId.send(ControlCommand(ControlCommand.Kind.Quit));
1479 bool done = false;
1480 while (!done) {
1481 receive(
1482 (ControlReply reply) {
1483 if (reply == ControlReply.Quit) done = true;