slightly better "mark unread" timer management
[chiroptera.git] / receiver.d
blobbef39536e3160b06ba1ee674a97047a101843baa
1 /* E-Mail Client
2 * coded by Ketmar // Invisible Vector <ketmar@ketmar.no-ip.org>
3 * Understanding is not required. Only obedience.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, version 3 of the License ONLY.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 // mail/nntp receiver thread
18 module receiver is aliced;
19 private:
21 //version = debug_filter_helper;
22 //version = debug_updater;
24 import std.concurrency;
26 import iv.cmdcon;
27 import iv.strex;
28 import iv.sq3;
29 import iv.timer : DurTimer = Timer;
30 import iv.utfutil;
31 import iv.vfs.io;
33 import egui;
35 import chibackend;
36 import chibackend.net;
38 import chievents;
41 // ////////////////////////////////////////////////////////////////////////// //
42 class RealFilterHelper : FilterHelper {
43 public:
44 enum {
45 // only one of these can be set
46 ActFlagDelete = 1u<<0,
47 ActFlagPurge = 1u<<1,
49 // only one of these can be set
50 ActFlagSpam = 1u<<2,
51 ActFlagHam = 1u<<3,
53 ActFlagRead = 1u<<4,
54 ActFlagStop = 1u<<5,
57 public:
58 DynStr account;
59 DynStr tag; // destination tag
60 uint actFlags; // see above
61 DynStr message;
62 bool matched;
64 public:
65 ~this () nothrow @nogc { account.clear(); tag.clear(); message.clear(); }
67 final @property bool isDelete () const pure nothrow @safe @nogc { pragma(inline, true); return !!(actFlags&ActFlagDelete); }
68 final @property bool isPurge () const pure nothrow @safe @nogc { pragma(inline, true); return !!(actFlags&ActFlagPurge); }
69 final @property bool isSpam () const pure nothrow @safe @nogc { pragma(inline, true); return !!(actFlags&ActFlagSpam); }
70 final @property bool isHam () const pure nothrow @safe @nogc { pragma(inline, true); return !!(actFlags&ActFlagHam); }
71 final @property bool isRead () const pure nothrow @safe @nogc { pragma(inline, true); return !!(actFlags&ActFlagRead); }
72 final @property bool isStop () const pure nothrow @safe @nogc { pragma(inline, true); return !!(actFlags&ActFlagStop); }
74 // called if a filter was matched
75 override void filterMatched () {
76 matched = true;
79 override DynStr getAccount () {
80 return account;
83 override DynStr getHeaderField (const(char)[] header, out bool exists) {
84 auto headers = message.getData[0..findHeadersEnd(message.getData)];
85 auto value = findHeaderField(headers, header);
86 exists = (value !is null);
87 version(debug_filter_helper) writeln("...getHeaderField(", header, "): exists=", exists, "; res=", value);
88 return DynStr(value);
91 override DynStr getFromName () {
92 auto headers = message.getData[0..findHeadersEnd(message.getData)];
93 auto value = findHeaderField(headers, "From").extractName;
94 version(debug_filter_helper) writeln("...getFromName: res=", value);
95 return DynStr(value);
98 override DynStr getFromMail () {
99 auto headers = message.getData[0..findHeadersEnd(message.getData)];
100 auto value = findHeaderField(headers, "From").extractMail;
101 if (value.length == 0) value = "nobody@nowhere";
102 version(debug_filter_helper) writeln("...getFromMail: res=", value);
103 return DynStr(value);
106 override DynStr getToName () {
107 auto headers = message.getData[0..findHeadersEnd(message.getData)];
108 auto value = findHeaderField(headers, "To").extractName;
109 version(debug_filter_helper) writeln("...getToName: res=", value);
110 return DynStr(value);
113 override DynStr getToMail () {
114 auto headers = message.getData[0..findHeadersEnd(message.getData)];
115 auto value = findHeaderField(headers, "To").extractMail;
116 if (value.length == 0) value = "nobody@nowhere";
117 version(debug_filter_helper) writeln("...getToMail: res=", value);
118 return DynStr(value);
121 override DynStr getSubj (out bool exists) {
122 auto headers = message.getData[0..findHeadersEnd(message.getData)];
123 auto value = findHeaderField(headers, "Subject");
124 exists = (value !is null);
125 if (exists) value = value.decodeSubj.subjRemoveRe;
126 return DynStr(value);
129 override DynStr exec (const(char)[] command) {
130 /*version(debug_filter_helper)*/ writeln("...exec: <", command, ">");
131 //return DynStr("nothing");
132 import std.stdio : File;
133 import std.process;
134 try {
135 // write article to file
136 import std.uuid;
137 UUID id = randomUUID();
138 DynStr buf;
139 void deleteTempFile () {
140 if (buf.length) try { import std.file : remove; remove(buf.getData); } catch (Exception e) {}
142 scope(exit) deleteTempFile();
143 buf.reserve(2+16*2+42);
144 buf ~= "/tmp/_temp_";
145 foreach (immutable ubyte b; id.data[]) {
146 buf ~= "0123456789abcdef"[b>>4];
147 buf ~= "0123456789abcdef"[b&0x0f];
149 buf ~= ".eml";
151 auto fo = VFile(buf.getData, "w");
152 fo.rawWriteExact(message.getData);
153 fo.close();
155 //!conwriteln("EXEC filter '", command, "'... (", buf.getData, ")");
156 auto pid = pipeProcess([command, /*"-D",*/ buf.getData], Redirect.all, null, Config.none, "/tmp");
157 string action = pid.stdout.readln.xstrip;
158 bool doStop = (action.length && action[0] == '-');
159 if (doStop) action = action[1..$].xstrip;
160 version(none) {
161 while (!pid.stderr.eof) conwriteln(" :", pid.stderr.readln.xstrip, "|");
163 pid.pid.wait();
164 //!conwriteln("EXEC filter '", command, "' action: ", action, " (", doStop, ")");
165 return DynStr(action);
166 } catch (Exception e) {
167 conwriteln("EXEC filter error: ", e.msg);
169 return DynStr();
172 override void move (const(char)[] dest) {
173 version(debug_filter_helper) writeln("...move: <", dest, ">");
174 tag = dest;
177 override void performAction (Action action) {
178 version(debug_filter_helper) writeln("...performAction: ", action);
179 switch (action) {
180 case Action.Delete:
181 actFlags &= ~(ActFlagDelete|ActFlagPurge);
182 actFlags |= ActFlagPurge;
183 break;
184 case Action.SoftDelete:
185 actFlags &= ~(ActFlagDelete|ActFlagPurge);
186 actFlags |= ActFlagDelete;
187 break;
188 case Action.Spam:
189 actFlags &= ~(ActFlagSpam|ActFlagHam);
190 actFlags |= ActFlagSpam;
191 break;
192 case Action.Ham:
193 actFlags &= ~(ActFlagSpam|ActFlagHam);
194 actFlags |= ActFlagHam;
195 break;
196 case Action.Read:
197 actFlags |= ActFlagRead;
198 break;
199 case Action.Stop:
200 actFlags |= ActFlagStop;
201 break;
202 default:
204 import std.conv : to;
205 throw new FilterSyntaxException("action "~action.to!string~" should not end up in the handler");
210 override bool match (const(char)[] pat, const(char)[] str, bool casesens) {
211 version(debug_filter_helper) writeln("...match: casesens=", casesens, "; pat=<", pat, ">; str=<", str, ">");
212 immutable bool bol = (pat.length && pat[0] == '^');
213 if (bol) pat = pat[1..$];
214 immutable bool eol = (pat.length && pat[$-1] == '$');
215 if (eol) pat = pat[0..$-1];
216 version(debug_filter_helper) writeln("...match: bol=", bol, "; eol=", eol, "; pat=<", pat, ">");
217 if (pat.length == 0) return (bol && eol ? str.length == 0 : false);
218 if (str.length < pat.length) return false;
219 if (bol && eol) { if (str.length != pat.length) return false; }
220 else if (bol) str = str[0..pat.length];
221 else if (eol) str = str[str.length-pat.length..$];
222 if (casesens) {
223 return (str.indexOf(pat) >= 0);
224 } else {
225 while (str.length >= pat.length) {
226 if (str.startsWithCI(pat)) {
227 //writeln("...match: HIT! str=<", str, ">");
228 return true;
230 str = str[1..$];
231 //writeln("...match: skip; str=<", str, ">; pat=<", pat, ">");
233 //writeln("...match: FAIL!");
234 return false;
238 void writeResult() () const {
239 if (isDelete) write(" softdelete");
240 if (isPurge) write(" purge");
241 if (isSpam) write(" spam");
242 if (isHam) write(" ham");
243 if (isRead) write(" read");
244 write("; dest tag: ", tag.getData);
249 // ////////////////////////////////////////////////////////////////////////// //
250 __gshared bool updatesDisabled = false;
251 __gshared bool rcDisabled = false;
252 __gshared bool rcStarted = false;
253 __gshared Tid controlThreadId;
256 enum ControlReply {
257 Quit,
260 struct ControlCommand {
261 enum Kind {
262 Ping,
263 ForceUpdateAll,
264 Quit,
266 CheckDone,
267 CheckError,
269 DisableUpdates,
270 EnableUpdates,
272 Kind type;
273 // for CheckDone or CheckError
274 uint accid;
276 @disable this ();
277 this (Kind atype) nothrow @safe @nogc { type = atype; accid = 0; }
278 this (Kind atype, uint aid) nothrow @safe @nogc { type = atype; accid = aid; }
282 struct CheckCommand {
283 uint accid;
287 static stmtAccInfo = LazyStatement!"Conf"(`
288 SELECT
289 accid AS accid
290 , checktime AS checktime
291 , nosendauth AS nosendauth
292 , debuglog AS debuglog
293 , nntplastindex AS nntplastindex
294 , name AS name
295 , recvserver AS recvserver
296 , sendserver AS sendserver
297 , user AS user
298 , pass AS pass
299 , inbox AS inbox
300 , nntpgroup AS nntpgroup
301 FROM accounts
302 WHERE accid=:accid
303 LIMIT 1
304 ;`);
307 static stmtSetCheckTime = LazyStatement!"Conf"(`
308 INSERT INTO checktimes(accid,lastcheck) VALUES(:accid,:lastcheck)
309 ON CONFLICT(accid)
310 DO UPDATE SET lastcheck=:lastcheck
311 ;`);
314 //==========================================================================
316 // forEachTag
318 // return `false` from delegate to stop
320 //==========================================================================
321 void forEachTag (const(char)[] tags, bool delegate (const(char)[] tag) dg) {
322 if (dg is null) return;
323 auto anchor = tags;
324 while (tags.length) {
325 auto stp = tags.indexOf('|');
326 if (stp < 0) stp = cast(uint)tags.length;
327 auto tag = tags[0..stp];
328 tags = tags[(stp < tags.length ? stp+1 : tags.length)..$];
329 if (tag.length == 0) continue;
330 if (!dg(tag)) break;
335 //==========================================================================
337 // extractAccount
339 //==========================================================================
340 DynStr extractAccount (const(char)[] tags) {
341 auto stp = tags.indexOf("account:");
342 while (stp >= 0) {
343 if (stp == 0 || tags[stp-1] == '|') {
344 tags = tags[stp+8..$];
345 stp = tags.indexOf('|');
346 if (stp >= 0) tags = tags[0..stp];
347 return DynStr(tags);
350 return DynStr();
354 //==========================================================================
356 // extractFirstFolder
358 // can return empty string
360 //==========================================================================
361 DynStr extractFirstFolder (const(char)[] tags) {
362 DynStr res;
363 forEachTag(tags, (tag) {
364 if (tag[0] != '/') return true; // go on
365 res = tag;
366 return false; // stop
368 return res;
372 //==========================================================================
374 // removeFirstFolder
376 // can return empty tags string
378 //==========================================================================
379 DynStr removeFirstFolder (const(char)[] tags) {
380 DynStr res;
381 bool seenFolder = false;
382 forEachTag(tags, (tag) {
383 if (!seenFolder && tag[0] == '/') {
384 seenFolder = true;
385 } else {
386 if (res.length) res ~= "|";
387 res ~= tag;
389 return true; // go on
391 return res;
395 // ////////////////////////////////////////////////////////////////////////// //
396 static struct TagInfo {
397 uint tagid;
398 DynStr name;
399 bool wasUpdates;
403 //==========================================================================
405 // getMessageTags
407 //==========================================================================
408 void getMessageTags (ref TagInfo[] tags, uint uid) {
409 auto stGetTags = LazyStatement!"View"(`
410 SELECT
411 DISTINCT(threads.tagid) AS tagid
412 , tn.tag AS name
413 FROM threads
414 INNER JOIN tagnames AS tn USING(tagid)
415 WHERE uid=:uid
416 ;`);
418 tags.reserve(64);
419 foreach (auto row; stGetTags.st.bind(":uid", uid).range) {
420 tags ~= TagInfo(row.tagid!uint, DynStr(row.name!SQ3Text));
425 //==========================================================================
427 // updateTwittedThreadsInTag
429 //==========================================================================
430 void updateTwittedThreadsInTag (uint tagid, uint uid) {
431 auto stTempTbl = LazyStatement!"View"(`
432 INSERT INTO mutepairs
433 WITH RECURSIVE children(muid, paruid, mtagid, mmute) AS (
434 SELECT 0, :uid, :tagid, 666
435 UNION ALL
436 SELECT
437 tt.uid, tt.uid, mtagid, tt.mute
438 FROM children AS cc
439 INNER JOIN threads AS tt ON
440 tt.tagid=cc.mtagid AND
441 tt.parent=cc.paruid AND
442 tt.uid<>cc.muid AND
443 tt.uid<>cc.paruid
445 SELECT
446 muid AS muid
447 , mtagid AS mtagid
448 FROM children
449 WHERE muid<>0 AND mmute=0
450 ;`);
452 auto stFixMutes = LazyStatement!"View"(`
453 UPDATE threads
455 mute=:mute
456 , appearance=iif(appearance=0,1,appearance)
457 FROM (SELECT muid, mtagid FROM mutepairs) AS cc
458 WHERE uid=cc.muid AND tagid=cc.mtagid AND mute=0
459 ;`);
461 // update threads
462 dbView.execute(`DELETE FROM mutepairs;`);
463 stTempTbl.st
464 .bind(":uid", uid)
465 .bind(":tagid", tagid)
466 .doAll();
467 stFixMutes.st
468 .bind(":mute", Mute.ThreadOther)
469 .doAll();
473 //==========================================================================
475 // createTwitByMsgid
477 //==========================================================================
478 public void createTwitByMsgid (uint uid, const(char)[] glob="/dmars_ng/*") {
479 if (!uid) return;
480 if (glob.length == 0) return;
482 auto stGetMsgid = LazyStatement!"View"(`
483 SELECT
484 msgid AS msgid
485 FROM msgids
486 WHERE uid=:uid
487 LIMIT 1
488 ;`);
490 DynStr msgid;
491 foreach (auto row; stGetMsgid.st.bind(":uid", uid).range) msgid = row.msgid!SQ3Text;
492 if (msgid.length == 0) return;
494 auto stFindMsgidTwit = LazyStatement!"Conf"(`
495 SELECT
497 FROM msgidtwits
498 WHERE msgid=:msgid AND tagglob=:glob
499 LIMIT 1
500 ;`);
502 // check if we already have such twit
503 foreach (auto row; stFindMsgidTwit.st.bindConstText(":msgid", msgid.getData).bindConstText(":glob", glob).range) return;
505 transacted!"Conf"{
506 auto stAddMsgidTwit = LazyStatement!"Conf"(`
507 INSERT INTO msgidtwits
508 (etwitid, automatic, tagglob, msgid)
509 VALUES(0, 0,:tagglob,:msgid)
510 ;`);
512 stAddMsgidTwit.st
513 .bindConstText(":tagglob", glob)
514 .bindConstText(":msgid", msgid.getData)
515 .doAll();
518 TagInfo[] tags;
519 scope(exit) delete tags;
520 getMessageTags(ref tags, uid);
521 if (tags.length == 0) return; // just in case
523 twitPrepare();
525 auto stUpdateMute = LazyStatement!"View"(`
526 UPDATE threads
528 mute=:mute
529 , title=NULL
530 , appearance=iif(appearance=0,1,appearance)
531 WHERE uid=:uid AND tagid=:tagid AND mute=0
532 ;`);
534 // mark the message as twitted
535 transacted!"View"{
536 foreach (ref TagInfo ti; tags) {
537 stUpdateMute.st
538 .bind(":uid", uid)
539 .bind(":tagid", ti.tagid)
540 .bind(":mute", Mute.ThreadStart)
541 .doAll();
542 updateTwittedThreadsInTag(ti.tagid, uid);
548 //==========================================================================
550 // twitPrepare
552 //==========================================================================
553 public void twitPrepare () {
554 dbView.execute(`
555 CREATE TEMP TABLE IF NOT EXISTS mutepairs(
556 muid INTEGER
557 , mtagid INTEGER
563 //==========================================================================
565 // twitMessage
567 // set "mute" flag according to message filters
569 //==========================================================================
570 public void twitMessage (uint uid) {
571 if (!uid) return;
573 TagInfo[] tags;
574 scope(exit) delete tags;
575 getMessageTags(ref tags, uid);
576 if (tags.length == 0) return; // just in case
578 auto stUpdateMute = LazyStatement!"View"(`
579 UPDATE threads
581 mute=:mute
582 , title=:title
583 , appearance=iif(appearance=0,1,appearance)
584 WHERE uid=:uid AND tagid=:tagid AND mute=0
585 ;`);
587 DynStr fromMail, fromName;
589 foreach (auto row; dbView.statement(`
590 SELECT
591 from_name AS fromName
592 , from_mail AS fromMail
593 FROM info
594 WHERE uid=:uid
595 LIMIT 1
596 ;`).bind(":uid", uid).range)
598 fromMail = row.fromMail!SQ3Text;
599 fromName = row.fromName!SQ3Text;
602 if (!chiroGetMessageFrom(uid, ref fromMail, ref fromName)) return;
603 if (fromMail.length == 0 && fromName.length == 0) return; // just in case
605 uint ttcount = 0;
606 foreach (auto trow; dbConf.statement(`
607 SELECT
608 tagglob AS tagglob
609 , email AS email
610 , name AS name
611 , title AS title
612 FROM emailtwits
613 ;`).range)
615 auto email = trow.email!SQ3Text;
616 auto name = trow.name!SQ3Text;
617 auto glob = trow.tagglob!SQ3Text;
618 if (glob.length == 0 || (!email.length && !name.length)) continue; // just in case
619 // check for filter match
620 if (email.length && !globmatchCI(fromMail, email)) continue;
621 if (name.length && !globmatchCI(fromName, name)) continue;
622 auto title = trow.title!SQ3Text;
623 // for all tags
624 foreach (ref TagInfo ti; tags) {
625 if (ti.wasUpdates) continue;
626 if (!globmatch(ti.name, glob)) continue;
627 stUpdateMute.st
628 .bind(":uid", uid)
629 .bind(":tagid", ti.tagid)
630 .bind(":mute", Mute.ThreadStart)
631 .bindConstText(":title", (title.length ? title : null), allowNull:true)
632 .doAll();
633 ti.wasUpdates = true;
634 ++ttcount;
636 if (ttcount == tags.length) break;
639 if (!ttcount) return;
641 // update threads
642 foreach (ref TagInfo ti; tags) {
643 if (!ti.wasUpdates) continue;
644 updateTwittedThreadsInTag(ti.tagid, uid);
649 //==========================================================================
651 // updateViewDB
653 // check for new messages, and update view database
655 //==========================================================================
656 void updateViewDB () {
657 uint maxViewUid = 0;
658 uint maxStoreUid = 0;
660 twitPrepare();
662 foreach (auto row; dbView.statement(`SELECT MAX(uid) AS uid FROM info;`).range) maxViewUid = row.uid!uint;
663 foreach (auto row; dbStore.statement(`SELECT MAX(uid) AS uid FROM messages;`).range) maxStoreUid = row.uid!uint;
665 if (maxViewUid >= maxStoreUid) return;
666 conwriteln("need to process around ", maxStoreUid-maxViewUid, " messages.");
668 uint[] relinkTids;
669 relinkTids.reserve(64);
670 scope(exit) delete relinkTids;
672 foreach (uint uid; maxViewUid+1..maxStoreUid+1) {
673 conwriteln("============ message #", uid, " ============");
674 DynStr msg, tags;
675 foreach (auto row; dbStore.statement(`
676 SELECT tags AS tags, ChiroUnpack(data) AS data FROM messages WHERE uid=:uid LIMIT 1
677 ;`).bind(":uid", uid).range)
679 msg = row.data!SQ3Text;
680 tags = row.tags!SQ3Text;
682 if (msg.length == 0 || tags.length == 0) continue; // not interesting
684 DynStr acc = tags.extractAccount();
685 DynStr origTags = tags;
686 DynStr deftag = tags.extractFirstFolder();
687 tags = tags.removeFirstFolder();
689 auto hlp = new RealFilterHelper;
690 scope(exit) delete hlp;
691 hlp.account = acc;
692 hlp.tag = deftag;
693 if (hlp.tag.length == 0) hlp.tag = "#hobo";
694 hlp.message = msg;
695 // filter
696 foreach (auto row; dbConf.statement(`SELECT filterid AS filterid, name AS name, body AS body FROM filters ORDER BY idx;`).range) {
697 //conwrite(" filter '", row.name!SQ3Text, "' (", row.filterid!uint, "): ");
698 bool goOn = false;
699 hlp.matched = false;
700 try {
701 //version(debug_filter_helper) writeln("::: <", row.body!SQ3Text, ">");
702 goOn = executeMailFilter(row.body!SQ3Text, hlp);
703 } catch (Exception e) {
704 conwriteln("ERROR IN FILTER '", row.name!SQ3Text, "': ", e.msg);
706 if (hlp.matched) {
707 conwriteln("...filter '", row.name!SQ3Text, " matched!");
709 //hlp.writeResult(); writeln;
710 //version(debug_filter_helper) writeln("::: <", row.body!SQ3Text, ">: goon=", goOn, "; isstop=", hlp.isStop);
711 //assert(hlp.isStop == !goOn);
712 if (hlp.isStop) break;
714 //write(" FINAL RESULT:"); hlp.writeResult(); writeln;
715 // done filtering
717 bool markSpamHam = false; //!!!
718 if (!hlp.isSpam && !hlp.isHam) {
719 auto bogo = messageBogoCheck(uid);
720 if (bogo == Bogo.Spam) {
721 bool exists;
722 conwriteln("BOGO: SPAM message #", uid, "; from={", hlp.getFromName.getData, "}:<", hlp.getFromMail.getData, ">; to={",
723 hlp.getToName.getData, "}:<", hlp.getToMail.getData, ">; subj=", hlp.getSubj(out exists).getData);
724 hlp.performAction(hlp.Action.Spam);
725 markSpamHam = false;
729 if (hlp.isSpam) hlp.tag = "#spam"; // always
731 if (hlp.tag.length == 0) hlp.tag = deftag; // just in case
732 bool hasTag = false;
733 forEachTag(tags, (xtag) {
734 if (xtag == hlp.tag) {
735 hasTag = true;
736 return false; // stop
738 return true; // go on
741 // `tags` should contain our new tags
742 if (!hasTag) {
743 DynStr tt = hlp.tag;
744 if (tags.length) {
745 tt ~= "|";
746 tt ~= tags;
748 tags = tt;
751 // update tags info in the storage
752 if (tags != origTags) {
753 transacted!"Store"{
754 dbStore.statement(`UPDATE messages SET tags=:tags WHERE uid=:uid;`)
755 .bind(":uid", uid)
756 .bindConstText(":tags", tags.getData)
757 .doAll();
761 // insert the message into the view db
762 int appearance = Appearance.Unread;
763 if (hlp.isDelete) appearance = Appearance.SoftDeleteFilter;
764 else if (hlp.isPurge) appearance = Appearance.SoftDeletePurge;
765 if (appearance == Appearance.Unread && (hlp.isRead || hlp.isSpam)) appearance = Appearance.Read;
766 if (markSpamHam) {
767 if (hlp.isSpam) messageBogoMarkSpam(uid);
768 if (hlp.isHam) messageBogoMarkHam(uid);
771 uint msgtime = 0;
772 DynStr hdr, body;
773 foreach (auto trow; dbStore.statement(`
774 SELECT
775 ChiroExtractHeaders(:msgdata) AS headers
776 , ChiroExtractBody(:msgdata) AS body
777 , ChiroHdr_RecvTime(:msgdata) AS msgtime
778 ;`).bindConstText(":msgdata", msg.getData).range)
780 msgtime = trow.msgtime!uint;
781 hdr = trow.headers!SQ3Text;
782 body = trow.body!SQ3Text;
785 conwriteln("putting msg ", uid, " (time:", msgtime, "; appr=", appearance, ") to '", tags.getData, "'; oldtags='", origTags.getData, "'");
787 transacted!"View"{
788 //dbView.beginTransaction();
789 //scope(success) dbView.commitTransaction();
790 //scope(failure) dbView.rollbackTransaction();
791 chiroParseAndInsertOneMessage(uid, msgtime, appearance, hdr, body, tags);
793 DynStr msgid;
794 foreach (auto mrow; dbView.statement(`SELECT msgid AS msgid FROM msgids WHERE uid=:uid LIMIT 1;`).bind(":uid", uid).range) {
795 msgid = mrow.msgid!SQ3Text;
797 //if (msgid.length == 0) return;
798 version(debug_updater) {
800 auto fo = VFile("zzz", "a");
801 fo.writeln("MSGUID: ", uid, "; MSGID: <", msgid.getData, ">");
804 conwriteln("MSGUID: ", uid, "; MSGID: <", msgid.getData, ">");
806 // collect tags to modify
807 int[] taglist;
808 scope(exit) delete taglist;
809 taglist.reserve(16);
811 foreach (auto trow; dbView.statement(`SELECT tagid AS tagid, parent AS parent FROM threads WHERE uid=:uid;`)
812 .bind(":uid", uid).range)
814 immutable uint tid = trow.tagid!uint;
815 if (tid) {
816 bool found = false;
817 foreach (immutable uint tt; relinkTids) if (tt == tid) { found = true; break; }
818 if (!found) relinkTids ~= tid;
820 if (!tid || trow.parent!uint || !chiroIsTagThreaded(tid)) continue;
821 conwriteln(" tagid: ", tid, " (", chiroGetTagName(tid).getData, ")");
822 version(debug_updater) {
824 auto fo = VFile("zzz", "a");
825 fo.writeln(" tagid: ", tid, " (", chiroGetTagName(tid).getData, ")");
828 taglist ~= tid;
831 foreach (immutable uint tid; taglist) {
832 uint setUsAsParentFor = 0;
833 bool needFullRelink = false;
834 // check if there are any references to us, and fix them by full relink
835 if (!msgid.length) continue;
836 foreach (auto nrow; dbView.statement(`
837 SELECT refids.uid AS uid, tt.parent AS parent
838 FROM refids
839 INNER JOIN(threads) AS tt
840 ON tt.tagid=:tagid AND tt.uid=refids.uid
841 WHERE idx=0 AND msgid=:msgid
842 LIMIT 1
843 ;`).bind(":tagid", tid).bindConstText(":msgid", msgid.getData).range)
845 if (nrow.parent!uint == 0) {
846 setUsAsParentFor = nrow.uid!uint;
847 } else {
848 needFullRelink = true;
852 if (needFullRelink) {
853 //FIXME: make this faster!
854 conwriteln(" tid: ", tid, " (", chiroGetTagName(tid).getData, "); performing full relink...");
855 chiroSupportRelinkTagThreads(tid);
856 continue;
859 if (setUsAsParentFor) {
860 conwriteln(" tid: ", tid, " (", chiroGetTagName(tid).getData, "); settuing us (", uid, ") as a parent for ", setUsAsParentFor);
861 dbView.statement(`
862 UPDATE threads
863 SET parent=:uid
864 WHERE uid=:xuid AND tagid=:tid
865 ;`).bind(":uid", uid).bind(":xuid", setUsAsParentFor).bind(":tagid", tid).doAll();
868 // find parent for us
869 uint paruid = 0;
870 foreach (auto prow; dbView.statement(`
871 SELECT msgids.uid AS paruid
872 FROM msgids
873 INNER JOIN(threads) AS tt
874 ON tt.tagid=:tagid AND tt.uid=msgids.uid
875 WHERE msgids.uid<>:uid AND msgids.msgid IN (SELECT msgid FROM refids WHERE uid=:uid ORDER BY idx)
876 LIMIT 1
877 ;`).bind(":uid", uid).bind(":tagid", tid).range)
879 paruid = prow.paruid!uint;
881 conwriteln(" tid: ", tid, " (", chiroGetTagName(tid).getData, "); paruid=", paruid);
882 if (paruid && paruid != uid) {
883 dbView.statement(`UPDATE threads SET parent=:paruid WHERE uid=:uid AND tagid=:tagid;`)
884 .bind(":uid", uid)
885 .bind(":tagid", tid)
886 .bind(":paruid", paruid)
887 .doAll();
891 twitMessage(uid);
895 // relink threads
896 if (relinkTids.length) {
897 foreach (immutable uint tid; relinkTids) {
898 if (vbwin && !vbwin.closed) vbwin.postEvent(new TagThreadsUpdatedEvent(tid));
904 //==========================================================================
906 // checkerThread
908 //==========================================================================
909 void checkerThread (Tid ownerTid) {
910 uint accid = 0;
911 bool isError = false;
912 try {
913 receive(
914 (CheckCommand cmd) {
915 accid = cmd.accid;
919 if (accid == 0) {
920 ownerTid.send(ControlCommand(ControlCommand.Kind.CheckError, accid));
921 return;
924 bool found = false;
925 int checktime;
926 bool nosendauth;
927 bool debuglog;
928 uint nntplastindex;
929 DynStr name;
930 DynStr recvserver;
931 DynStr sendserver;
932 DynStr user;
933 DynStr pass;
934 DynStr inbox;
935 DynStr nntpgroup;
937 foreach (auto arow; stmtAccInfo.st.bind(":accid", accid).range) {
938 // i found her!
939 found = true;
940 int upmins = arow.checktime!int;
941 if (upmins < 1) upmins = 1; else if (upmins > 100000) upmins = 100000;
942 checktime = upmins;
943 nosendauth = (arow.nosendauth!int > 0);
944 debuglog = (arow.debuglog!int > 0);
945 nntplastindex = arow.nntplastindex!uint;
946 name = arow.name!SQ3Text;
947 recvserver = arow.recvserver!SQ3Text;
948 sendserver = arow.sendserver!SQ3Text;
949 user = arow.user!SQ3Text;
950 pass = arow.pass!SQ3Text;
951 inbox = arow.inbox!SQ3Text;
952 nntpgroup = arow.nntpgroup!SQ3Text;
955 if (!found) {
956 ownerTid.send(ControlCommand(ControlCommand.Kind.CheckError, accid));
957 return;
960 conwriteln("checking account '", name, "' (", accid, ")...");
962 stmtSetCheckTime.st.bind(":accid", accid).bind(":lastcheck", RunningAverageExp.GetTickCount()+checktime*60).doAll();
964 // ////////////////////////////////////////////////////////////////// //
965 void CheckNNTP () {
966 auto nsk = new SocketNNTP(recvserver);
967 scope(exit) nsk.close();
969 nsk.selectGroup(nntpgroup);
970 if (nsk.emptyGroup) {
971 conwriteln("[", name, ":", nntpgroup, "]: no new articles (empty group)");
972 return;
975 uint stnum = nntplastindex+1;
976 if (stnum > nsk.hiwater) {
977 conwriteln("[", name, ":", nntpgroup, "]: no new articles");
978 return;
981 conwriteln("[", name, ":", nntpgroup, "]: ", nsk.hiwater+1-stnum, " (possible) new articles");
983 // download new articles
984 foreach (immutable uint anum; stnum..nsk.hiwater+1) {
985 DynStr msg;
986 try {
987 msg = nsk.getArticle(anum);
988 } catch (Exception e) {
989 conwriteln("[", name, ":", nntpgroup, "]: error downloading article #", anum);
990 break;
992 if (msg.length == 0) continue; // this article is empty
993 // insert article into the storage
994 // filtering will be done later, for now, insert with the default inbox
995 DynStr tags;
996 if (inbox.length) tags ~= inbox;
997 if (name.length) {
998 if (tags.length) tags ~= "|";
999 tags ~= "account:";
1000 tags ~= name;
1002 if (tags.length == 0) tags = "#hobo";
1003 conwriteln("[", name, ":", nntpgroup, "]: storing article #", anum, " for '", tags.getData, "'...");
1004 transacted!"Store"{
1005 dbStore.statement(`INSERT INTO messages(tags, data) VALUES(:tags, ChiroPack(:data));`)
1006 .bindConstText(":tags", tags.getData)
1007 .bindConstBlob(":data", msg.getData)
1008 .doAll();
1010 // update account with the new highest nntp index
1011 transacted!"Conf"{
1012 dbConf.statement(`UPDATE accounts SET nntplastindex=:anum WHERE accid=:accid;`)
1013 .bind(":accid", accid)
1014 .bind(":anum", anum)
1015 .doAll();
1020 // ////////////////////////////////////////////////////////////////// //
1021 void CheckSMTP () {
1022 conwriteln("*** [", name, "]: connecting...");
1023 auto pop3 = new SocketPOP3(recvserver);
1024 scope(exit) pop3.close();
1025 if (user.length) {
1026 conwriteln("[", name, "]: authenticating...");
1027 pop3.auth(user, pass);
1029 auto newmsg = pop3.getNewMailCount;
1030 if (newmsg == 0) {
1031 conwriteln("[", name, "]: no new messages");
1032 return;
1034 conwriteln("[", name, "]: ", newmsg, " new message", (newmsg > 1 ? "s" : ""));
1035 foreach (immutable int popidx; 1..newmsg+1) {
1036 DynStr msg;
1037 try {
1038 msg = pop3.getMessage(popidx); // full message, with the ending dot
1039 } catch (Exception e) {
1040 conwriteln("[", name, "]: error downloading message #", popidx);
1041 break;
1043 if (msg.length != 0) {
1044 DynStr tags;
1045 if (inbox.length) tags ~= inbox;
1046 if (name.length) {
1047 if (tags.length) tags ~= "|";
1048 tags ~= "account:";
1049 tags ~= name;
1051 if (tags.length == 0) tags = "#hobo";
1052 conwriteln("[", name, ":", nntpgroup, "]: storing message #", popidx, " for '", tags.getData, "'...");
1053 transacted!"Store"{
1054 dbStore.statement(`INSERT INTO messages(tags, data) VALUES(:tags, ChiroPack(:data));`)
1055 .bindConstText(":tags", tags.getData)
1056 .bindConstBlob(":data", msg.getData)
1057 .doAll();
1060 //auto msg = pop3.getMessage!true(popidx); // full message, with the ending dot, and exact terminators
1061 // process
1062 pop3.deleteMessage(popidx);
1066 // ////////////////////////////////////////////////////////////////// //
1067 try {
1068 if (nntpgroup.length) CheckNNTP(); else CheckSMTP();
1069 } catch (Exception e) {
1070 conwriteln("ERROR checking account '", name, "' (", accid, "): ", e.msg);
1071 isError = true;
1074 conwriteln("done checking account '", name, "' (", accid, ")...");
1076 if (vbwin && !vbwin.closed) {
1077 vbwin.postEvent(new UpdatingAccountCompleteEvent(accid));
1078 //sqlite3_sleep(1000);
1080 } catch (Throwable e) {
1081 // here, we are dead and fucked (the exact order doesn't matter)
1082 //import core.stdc.stdlib : abort;
1083 import core.stdc.stdio : fprintf, stderr;
1084 //import core.memory : GC;
1085 import core.thread : thread_suspendAll;
1086 //GC.disable(); // yeah
1087 //thread_suspendAll(); // stop right here, you criminal scum!
1088 auto s = e.toString();
1089 fprintf(stderr, "\n=== FATAL ===\n%.*s\n", cast(uint)s.length, s.ptr);
1090 //abort(); // die, you bitch!
1091 ownerTid.send(ControlCommand(ControlCommand.Kind.CheckError, accid));
1092 return;
1094 //if (vbwin) vbwin.postEvent(evDoConCommands); }
1095 ownerTid.send(ControlCommand((isError ? ControlCommand.Kind.CheckError : ControlCommand.Kind.CheckDone), accid));
1099 //==========================================================================
1101 // controlThread
1103 //==========================================================================
1104 void controlThread (Tid ownerTid) {
1105 import core.time;
1106 bool doQuit = false;
1107 try {
1108 static struct AccCheck {
1109 uint accid;
1110 bool inprogress;
1111 Tid tid;
1114 AccCheck[] accidCheckList;
1115 accidCheckList.reserve(128);
1117 dbConf.execute(`
1118 CREATE TEMP TABLE IF NOT EXISTS checktimes (
1119 accid INTEGER PRIMARY KEY UNIQUE /* unique, never zero */
1120 , lastcheck INTEGER NOT NULL DEFAULT 0
1121 , checking INTEGER NOT NULL DEFAULT 0
1125 static stmtAllAccs = LazyStatement!"Conf"(`
1126 SELECT
1127 accid AS accid
1128 , checktime AS checktime
1129 FROM accounts
1130 WHERE nocheck=0 AND inbox<>''
1131 ORDER BY accid
1132 ;`);
1134 static stmtGetCheckTime = LazyStatement!"Conf"(`
1135 SELECT lastcheck AS lastcheck FROM checktimes WHERE accid=:accid LIMIT 1
1136 ;`);
1138 updateViewDB();
1140 MonoTime lastCollect = MonoTime.currTime;
1141 //accidCheckList ~= AccCheck();
1143 bool needUpdates = false;
1144 bool forceAll = false;
1145 for (;;) {
1146 if (doQuit && accidCheckList.length == 0) break;
1147 receiveTimeout((doQuit ? 50.msecs : accidCheckList.length || needUpdates || forceAll ? 1.seconds : 60.seconds),
1148 (ControlCommand cmd) {
1149 final switch (cmd.type) {
1150 case ControlCommand.Kind.ForceUpdateAll: forceAll = true; break;
1151 case ControlCommand.Kind.Ping: break;
1152 case ControlCommand.Kind.Quit: doQuit = true; break;
1153 case ControlCommand.Kind.DisableUpdates: updatesDisabled = true; break;
1154 case ControlCommand.Kind.EnableUpdates: updatesDisabled = false; break;
1155 case ControlCommand.Kind.CheckDone:
1156 case ControlCommand.Kind.CheckError:
1157 needUpdates = true;
1158 if (accidCheckList.length) {
1159 foreach (immutable idx, const ref AccCheck nfo; accidCheckList) {
1160 if (nfo.accid == cmd.accid) {
1161 //if (!doQuit && vbwin && !vbwin.closed) vbwin.postEvent(new UpdatingAccountCompleteEvent(nfo.accid));
1162 foreach (immutable c; idx+1..accidCheckList.length) accidCheckList[c-1] = accidCheckList[c];
1163 accidCheckList.length -= 1;
1164 break;
1167 if (!doQuit && vbwin && !vbwin.closed && accidCheckList.length == 0) vbwin.postEvent(new UpdatingCompleteEvent());
1169 break;
1174 for (usize idx = 0; idx < accidCheckList.length; ) {
1175 if (accidCheckList[idx].accid != 0) {
1176 ++idx;
1177 } else {
1178 foreach (immutable c; idx+1..accidCheckList.length) accidCheckList[c-1] = accidCheckList[c];
1179 accidCheckList.length -= 1;
1183 if (doQuit) {
1184 for (usize idx = 0; idx < accidCheckList.length; ) {
1185 if (accidCheckList[idx].inprogress) {
1186 ++idx;
1187 } else {
1188 foreach (immutable c; idx+1..accidCheckList.length) accidCheckList[c-1] = accidCheckList[c];
1189 accidCheckList.length -= 1;
1192 continue;
1195 if (!needUpdates && !updatesDisabled) {
1196 ulong ctt = RunningAverageExp.GetTickCount();
1197 foreach (auto arow; stmtAllAccs.st.range) {
1198 bool found = false;
1199 foreach (const ref AccCheck nfo; accidCheckList) if (nfo.accid == arow.accid!uint) { found = true; break; }
1200 if (found) continue;
1201 // forced update?
1202 if (forceAll) {
1203 accidCheckList ~= AccCheck(arow.accid!uint);
1204 continue;
1206 // check timeout
1207 int upmins = arow.checktime!int;
1208 if (upmins < 1) upmins = 1; else if (upmins > 100000) upmins = 100000;
1209 ulong lastcheck = 0;
1210 foreach (auto crow; stmtGetCheckTime.st.bind(":accid", arow.accid!uint).range) lastcheck = crow.lastcheck!ulong;
1211 lastcheck += upmins*60; // next check time
1212 if (lastcheck < ctt) {
1213 // i found her!
1214 accidCheckList ~= AccCheck(arow.accid!uint);
1217 else {
1218 conwriteln("check for accid ", arow.accid!uint, " in ", (lastcheck-ctt)/60, " minutes...");
1222 forceAll = false;
1225 if (!updatesDisabled) {
1226 foreach (ref AccCheck nfo; accidCheckList) {
1227 if (nfo.inprogress) break;
1228 if (vbwin) vbwin.postEvent(new UpdatingAccountEvent(nfo.accid));
1229 nfo.tid = spawn(&checkerThread, thisTid);
1230 nfo.inprogress = true;
1231 nfo.tid.send(CheckCommand(nfo.accid));
1232 break;
1236 bool hasProgress = false;
1237 foreach (ref AccCheck nfo; accidCheckList) if (nfo.inprogress) { hasProgress = true; break; }
1238 if (!hasProgress) {
1239 updateViewDB();
1240 needUpdates = false;
1243 if (!doQuit) {
1244 immutable ctt = MonoTime.currTime;
1245 if ((ctt-lastCollect).total!"minutes" >= 5) {
1246 import core.memory : GC;
1247 lastCollect = ctt;
1248 GC.collect();
1249 GC.minimize();
1253 ownerTid.send(ControlReply.Quit);
1254 } catch (Throwable e) {
1255 // here, we are dead and fucked (the exact order doesn't matter)
1256 import core.stdc.stdlib : abort;
1257 import core.stdc.stdio : fprintf, stderr;
1258 import core.memory : GC;
1259 import core.thread : thread_suspendAll;
1260 GC.disable(); // yeah
1261 thread_suspendAll(); // stop right here, you criminal scum!
1262 auto s = e.toString();
1263 fprintf(stderr, "\n=== FATAL ===\n%.*s\n", cast(uint)s.length, s.ptr);
1264 abort(); // die, you bitch!
1269 //==========================================================================
1271 // receiverDisable
1273 //==========================================================================
1274 public void receiverDisable () {
1275 rcDisabled = true;
1279 //==========================================================================
1281 // disableMailboxUpdates
1283 //==========================================================================
1284 public void disableMailboxUpdates () {
1285 if (!rcStarted) return;
1286 controlThreadId.send(ControlCommand(ControlCommand.Kind.DisableUpdates));
1290 //==========================================================================
1292 // enableMailboxUpdates
1294 //==========================================================================
1295 public void enableMailboxUpdates () {
1296 if (!rcStarted) return;
1297 controlThreadId.send(ControlCommand(ControlCommand.Kind.EnableUpdates));
1301 //==========================================================================
1303 // receiverForceUpdateAll
1305 //==========================================================================
1306 public void receiverForceUpdateAll () {
1307 if (!rcStarted) return;
1308 controlThreadId.send(ControlCommand(ControlCommand.Kind.ForceUpdateAll));
1312 //==========================================================================
1314 // receiverInit
1316 //==========================================================================
1317 public void receiverInit () {
1318 if (rcStarted) return;
1319 if (rcDisabled) return;
1320 controlThreadId = spawn(&controlThread, thisTid);
1321 rcStarted = true;
1325 //==========================================================================
1327 // receiverDeinit
1329 //==========================================================================
1330 public void receiverDeinit () {
1331 if (!rcStarted) return;
1332 controlThreadId.send(ControlCommand(ControlCommand.Kind.Quit));
1333 bool done = false;
1334 while (!done) {
1335 receive(
1336 (ControlReply reply) {
1337 if (reply == ControlReply.Quit) done = true;