adapted for new egra
[chiroptera.git] / receiver.d
blobf35148d105bac00c72eef79e23baadffd818eff5
1 /* E-Mail Client
2 * coded by Ketmar // Invisible Vector <ketmar@ketmar.no-ip.org>
3 * Understanding is not required. Only obedience.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, version 3 of the License ONLY.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 // mail/nntp receiver thread
18 module receiver is aliced;
19 private:
21 //version = debug_filter_helper;
22 //version = debug_updater;
24 import std.concurrency;
26 import iv.cmdcon;
27 import iv.strex;
28 import iv.sq3;
29 import iv.timer : DurTimer = Timer;
30 import iv.utfutil;
31 import iv.vfs.io;
33 import iv.egra;
35 import chibackend;
36 import chibackend.net;
38 import chievents;
41 // ////////////////////////////////////////////////////////////////////////// //
42 class RealFilterHelper : FilterHelper {
43 public:
44 enum {
45 // only one of these can be set
46 ActFlagDelete = 1u<<0,
47 ActFlagPurge = 1u<<1,
49 // only one of these can be set
50 ActFlagSpam = 1u<<2,
51 ActFlagHam = 1u<<3,
53 ActFlagRead = 1u<<4,
54 ActFlagStop = 1u<<5,
57 public:
58 DynStr account;
59 DynStr tag; // destination tag
60 uint actFlags; // see above
61 DynStr message;
62 bool matched;
64 public:
65 ~this () nothrow @nogc { account.clear(); tag.clear(); message.clear(); }
67 final @property bool isDelete () const pure nothrow @safe @nogc { pragma(inline, true); return !!(actFlags&ActFlagDelete); }
68 final @property bool isPurge () const pure nothrow @safe @nogc { pragma(inline, true); return !!(actFlags&ActFlagPurge); }
69 final @property bool isSpam () const pure nothrow @safe @nogc { pragma(inline, true); return !!(actFlags&ActFlagSpam); }
70 final @property bool isHam () const pure nothrow @safe @nogc { pragma(inline, true); return !!(actFlags&ActFlagHam); }
71 final @property bool isRead () const pure nothrow @safe @nogc { pragma(inline, true); return !!(actFlags&ActFlagRead); }
72 final @property bool isStop () const pure nothrow @safe @nogc { pragma(inline, true); return !!(actFlags&ActFlagStop); }
74 // called if a filter was matched
75 override void filterMatched () {
76 matched = true;
79 override DynStr getAccount () {
80 return account;
83 override DynStr getHeaderField (const(char)[] header, out bool exists) {
84 auto headers = message.getData[0..findHeadersEnd(message.getData)];
85 auto value = findHeaderField(headers, header);
86 exists = (value !is null);
87 version(debug_filter_helper) writeln("...getHeaderField(", header, "): exists=", exists, "; res=", value);
88 return DynStr(value);
91 override DynStr getFromName () {
92 auto headers = message.getData[0..findHeadersEnd(message.getData)];
93 auto value = findHeaderField(headers, "From").extractName;
94 version(debug_filter_helper) writeln("...getFromName: res=", value);
95 return DynStr(value);
98 override DynStr getFromMail () {
99 auto headers = message.getData[0..findHeadersEnd(message.getData)];
100 auto value = findHeaderField(headers, "From").extractMail;
101 if (value.length == 0) value = "nobody@nowhere";
102 version(debug_filter_helper) writeln("...getFromMail: res=", value);
103 return DynStr(value);
106 override DynStr getToName () {
107 auto headers = message.getData[0..findHeadersEnd(message.getData)];
108 auto value = findHeaderField(headers, "To").extractName;
109 version(debug_filter_helper) writeln("...getToName: res=", value);
110 return DynStr(value);
113 override DynStr getToMail () {
114 auto headers = message.getData[0..findHeadersEnd(message.getData)];
115 auto value = findHeaderField(headers, "To").extractMail;
116 if (value.length == 0) value = "nobody@nowhere";
117 version(debug_filter_helper) writeln("...getToMail: res=", value);
118 return DynStr(value);
121 override DynStr getSubj (out bool exists) {
122 auto headers = message.getData[0..findHeadersEnd(message.getData)];
123 auto value = findHeaderField(headers, "Subject");
124 exists = (value !is null);
125 if (exists) value = value.decodeSubj.subjRemoveRe;
126 return DynStr(value);
129 override DynStr exec (const(char)[] command) {
130 /*version(debug_filter_helper)*/ writeln("...exec: <", command, ">");
131 //return DynStr("nothing");
132 import std.stdio : File;
133 import std.process;
134 try {
135 // write article to file
136 import std.uuid;
137 UUID id = randomUUID();
138 DynStr buf;
139 void deleteTempFile () {
140 if (buf.length) try { import std.file : remove; remove(buf.getData); } catch (Exception e) {}
142 scope(exit) deleteTempFile();
143 buf.reserve(2+16*2+42);
144 buf ~= "/tmp/_temp_";
145 foreach (immutable ubyte b; id.data[]) {
146 buf ~= "0123456789abcdef"[b>>4];
147 buf ~= "0123456789abcdef"[b&0x0f];
149 buf ~= ".eml";
151 auto fo = VFile(buf.getData, "w");
152 fo.rawWriteExact(message.getData);
153 fo.close();
155 //!conwriteln("EXEC filter '", command, "'... (", buf.getData, ")");
156 auto pid = pipeProcess([command, /*"-D",*/ buf.getData], Redirect.all, null, Config.none, "/tmp");
157 string action = pid.stdout.readln.xstrip;
158 bool doStop = (action.length && action[0] == '-');
159 if (doStop) action = action[1..$].xstrip;
160 version(none) {
161 while (!pid.stderr.eof) conwriteln(" :", pid.stderr.readln.xstrip, "|");
163 pid.pid.wait();
164 //!conwriteln("EXEC filter '", command, "' action: ", action, " (", doStop, ")");
165 return DynStr(action);
166 } catch (Exception e) {
167 conwriteln("EXEC filter error: ", e.msg);
169 return DynStr();
172 override void move (const(char)[] dest) {
173 version(debug_filter_helper) writeln("...move: <", dest, ">");
174 tag = dest;
177 override void performAction (Action action) {
178 version(debug_filter_helper) writeln("...performAction: ", action);
179 switch (action) {
180 case Action.Delete:
181 actFlags &= ~(ActFlagDelete|ActFlagPurge);
182 actFlags |= ActFlagPurge;
183 break;
184 case Action.SoftDelete:
185 actFlags &= ~(ActFlagDelete|ActFlagPurge);
186 actFlags |= ActFlagDelete;
187 break;
188 case Action.Spam:
189 actFlags &= ~(ActFlagSpam|ActFlagHam);
190 actFlags |= ActFlagSpam;
191 break;
192 case Action.Ham:
193 actFlags &= ~(ActFlagSpam|ActFlagHam);
194 actFlags |= ActFlagHam;
195 break;
196 case Action.Read:
197 actFlags |= ActFlagRead;
198 break;
199 case Action.Stop:
200 actFlags |= ActFlagStop;
201 break;
202 default:
204 import std.conv : to;
205 throw new FilterSyntaxException("action "~action.to!string~" should not end up in the handler");
210 override bool match (const(char)[] pat, const(char)[] str, bool casesens) {
211 version(debug_filter_helper) writeln("...match: casesens=", casesens, "; pat=<", pat, ">; str=<", str, ">");
212 immutable bool bol = (pat.length && pat[0] == '^');
213 if (bol) pat = pat[1..$];
214 immutable bool eol = (pat.length && pat[$-1] == '$');
215 if (eol) pat = pat[0..$-1];
216 version(debug_filter_helper) writeln("...match: bol=", bol, "; eol=", eol, "; pat=<", pat, ">");
217 if (pat.length == 0) return (bol && eol ? str.length == 0 : false);
218 if (str.length < pat.length) return false;
219 if (bol && eol) { if (str.length != pat.length) return false; }
220 else if (bol) str = str[0..pat.length];
221 else if (eol) str = str[str.length-pat.length..$];
222 if (casesens) {
223 return (str.indexOf(pat) >= 0);
224 } else {
225 while (str.length >= pat.length) {
226 if (str.startsWithCI(pat)) {
227 //writeln("...match: HIT! str=<", str, ">");
228 return true;
230 str = str[1..$];
231 //writeln("...match: skip; str=<", str, ">; pat=<", pat, ">");
233 //writeln("...match: FAIL!");
234 return false;
238 void writeResult() () const {
239 if (isDelete) write(" softdelete");
240 if (isPurge) write(" purge");
241 if (isSpam) write(" spam");
242 if (isHam) write(" ham");
243 if (isRead) write(" read");
244 write("; dest tag: ", tag.getData);
249 // ////////////////////////////////////////////////////////////////////////// //
250 __gshared bool updatesDisabled = false;
251 __gshared bool rcDisabled = false;
252 __gshared bool rcStarted = false;
253 __gshared Tid controlThreadId;
256 enum ControlReply {
257 Quit,
260 struct ControlCommand {
261 enum Kind {
262 Ping,
263 ForceUpdateAll,
264 Quit,
266 CheckDone,
267 CheckError,
269 DisableUpdates,
270 EnableUpdates,
272 Kind type;
273 // for CheckDone or CheckError
274 uint accid;
276 @disable this ();
277 this (Kind atype) nothrow @safe @nogc { type = atype; accid = 0; }
278 this (Kind atype, uint aid) nothrow @safe @nogc { type = atype; accid = aid; }
282 struct CheckCommand {
283 uint accid;
287 static stmtAccInfo = LazyStatement!"Conf"(`
288 SELECT
289 accid AS accid
290 , checktime AS checktime
291 , nosendauth AS nosendauth
292 , debuglog AS debuglog
293 , nntplastindex AS nntplastindex
294 , name AS name
295 , recvserver AS recvserver
296 , sendserver AS sendserver
297 , user AS user
298 , pass AS pass
299 , inbox AS inbox
300 , nntpgroup AS nntpgroup
301 FROM accounts
302 WHERE accid=:accid
303 LIMIT 1
304 ;`);
307 static stmtSetCheckTime = LazyStatement!"Conf"(`
308 INSERT INTO checktimes(accid,lastcheck) VALUES(:accid,:lastcheck)
309 ON CONFLICT(accid)
310 DO UPDATE SET lastcheck=:lastcheck
311 ;`);
314 //==========================================================================
316 // forEachTag
318 // return `false` from delegate to stop
320 //==========================================================================
321 void forEachTag (const(char)[] tags, bool delegate (const(char)[] tag) dg) {
322 if (dg is null) return;
323 auto anchor = tags;
324 while (tags.length) {
325 auto stp = tags.indexOf('|');
326 if (stp < 0) stp = cast(uint)tags.length;
327 auto tag = tags[0..stp];
328 tags = tags[(stp < tags.length ? stp+1 : tags.length)..$];
329 if (tag.length == 0) continue;
330 if (!dg(tag)) break;
335 //==========================================================================
337 // extractAccount
339 //==========================================================================
340 DynStr extractAccount (const(char)[] tags) {
341 auto stp = tags.indexOf("account:");
342 while (stp >= 0) {
343 if (stp == 0 || tags[stp-1] == '|') {
344 tags = tags[stp+8..$];
345 stp = tags.indexOf('|');
346 if (stp >= 0) tags = tags[0..stp];
347 return DynStr(tags);
350 return DynStr();
354 //==========================================================================
356 // extractFirstFolder
358 // can return empty string
360 //==========================================================================
361 DynStr extractFirstFolder (const(char)[] tags) {
362 DynStr res;
363 forEachTag(tags, (tag) {
364 if (tag[0] != '/') return true; // go on
365 res = tag;
366 return false; // stop
368 return res;
372 //==========================================================================
374 // removeFirstFolder
376 // can return empty tags string
378 //==========================================================================
379 DynStr removeFirstFolder (const(char)[] tags) {
380 DynStr res;
381 bool seenFolder = false;
382 forEachTag(tags, (tag) {
383 if (!seenFolder && tag[0] == '/') {
384 seenFolder = true;
385 } else {
386 if (res.length) res ~= "|";
387 res ~= tag;
389 return true; // go on
391 return res;
395 // ////////////////////////////////////////////////////////////////////////// //
396 static struct TagInfo {
397 uint tagid;
398 DynStr name;
399 bool wasUpdates;
403 //==========================================================================
405 // getMessageTags
407 //==========================================================================
408 void getMessageTags (ref TagInfo[] tags, uint uid) {
409 auto stGetTags = LazyStatement!"View"(`
410 SELECT
411 DISTINCT(threads.tagid) AS tagid
412 , tn.tag AS name
413 FROM threads
414 INNER JOIN tagnames AS tn USING(tagid)
415 WHERE uid=:uid
416 ;`);
418 tags.reserve(64);
419 foreach (auto row; stGetTags.st.bind(":uid", uid).range) {
420 tags ~= TagInfo(row.tagid!uint, DynStr(row.name!SQ3Text));
425 //==========================================================================
427 // updateTwittedThreadsInTag
429 //==========================================================================
430 void updateTwittedThreadsInTag (uint tagid, uint uid) {
431 auto stTempTbl = LazyStatement!"View"(`
432 INSERT INTO mutepairs
433 WITH RECURSIVE children(muid, paruid, mtagid, mmute) AS (
434 SELECT 0, :uid, :tagid, 666
435 UNION ALL
436 SELECT
437 tt.uid, tt.uid, mtagid, tt.mute
438 FROM children AS cc
439 INNER JOIN threads AS tt ON
440 tt.tagid=cc.mtagid AND
441 tt.parent=cc.paruid AND
442 tt.uid<>cc.muid AND
443 tt.uid<>cc.paruid
445 SELECT
446 muid AS muid
447 , mtagid AS mtagid
448 FROM children
449 WHERE muid<>0 AND mmute=0
450 ;`);
452 auto stFixMutes = LazyStatement!"View"(`
453 UPDATE threads
455 mute=:mute
456 , appearance=iif(appearance=0,1,appearance)
457 FROM (SELECT muid, mtagid FROM mutepairs) AS cc
458 WHERE uid=cc.muid AND tagid=cc.mtagid AND mute=0
459 ;`);
461 // update threads
462 dbView.execute(`DELETE FROM mutepairs;`);
463 stTempTbl.st
464 .bind(":uid", uid)
465 .bind(":tagid", tagid)
466 .doAll();
467 stFixMutes.st
468 .bind(":mute", Mute.ThreadOther)
469 .doAll();
473 //==========================================================================
475 // createTwitByMsgid
477 //==========================================================================
478 public void createTwitByMsgid (uint uid, const(char)[] glob="/dmars_ng/*") {
479 if (!uid) return;
480 if (glob.length == 0) return;
482 auto stGetMsgid = LazyStatement!"View"(`
483 SELECT
484 msgid AS msgid
485 FROM msgids
486 WHERE uid=:uid
487 LIMIT 1
488 ;`);
490 DynStr msgid;
491 foreach (auto row; stGetMsgid.st.bind(":uid", uid).range) msgid = row.msgid!SQ3Text;
492 if (msgid.length == 0) return;
494 auto stFindMsgidTwit = LazyStatement!"Conf"(`
495 SELECT
497 FROM msgidtwits
498 WHERE msgid=:msgid AND tagglob=:glob
499 LIMIT 1
500 ;`);
502 // check if we already have such twit
503 foreach (auto row; stFindMsgidTwit.st.bindConstText(":msgid", msgid.getData).bindConstText(":glob", glob).range) return;
505 transacted!"Conf"{
506 auto stAddMsgidTwit = LazyStatement!"Conf"(`
507 INSERT INTO msgidtwits
508 (etwitid, automatic, tagglob, msgid)
509 VALUES(0, 0,:tagglob,:msgid)
510 ;`);
512 stAddMsgidTwit.st
513 .bindConstText(":tagglob", glob)
514 .bindConstText(":msgid", msgid.getData)
515 .doAll();
518 TagInfo[] tags;
519 scope(exit) delete tags;
520 getMessageTags(ref tags, uid);
521 if (tags.length == 0) return; // just in case
523 twitPrepare();
525 auto stUpdateMute = LazyStatement!"View"(`
526 UPDATE threads
528 mute=:mute
529 , title=NULL
530 , appearance=iif(appearance=0,1,appearance)
531 WHERE uid=:uid AND tagid=:tagid AND mute=0
532 ;`);
534 // mark the message as twitted
535 transacted!"View"{
536 foreach (ref TagInfo ti; tags) {
537 stUpdateMute.st
538 .bind(":uid", uid)
539 .bind(":tagid", ti.tagid)
540 .bind(":mute", Mute.ThreadStart)
541 .doAll();
542 updateTwittedThreadsInTag(ti.tagid, uid);
548 //==========================================================================
550 // twitPrepare
552 //==========================================================================
553 public void twitPrepare () {
554 dbView.execute(`
555 CREATE TEMP TABLE IF NOT EXISTS mutepairs(
556 muid INTEGER
557 , mtagid INTEGER
563 //==========================================================================
565 // twitMessage
567 // set "mute" flag according to message filters
569 //==========================================================================
570 public void twitMessage (uint uid) {
571 if (!uid) return;
573 TagInfo[] tags;
574 scope(exit) delete tags;
575 getMessageTags(ref tags, uid);
576 if (tags.length == 0) return; // just in case
578 auto stUpdateMute = LazyStatement!"View"(`
579 UPDATE threads
581 mute=:mute
582 , title=:title
583 , appearance=iif(appearance=0,1,appearance)
584 WHERE uid=:uid AND tagid=:tagid AND mute=0
585 ;`);
587 DynStr fromMail, fromName;
589 foreach (auto row; dbView.statement(`
590 SELECT
591 from_name AS fromName
592 , from_mail AS fromMail
593 FROM info
594 WHERE uid=:uid
595 LIMIT 1
596 ;`).bind(":uid", uid).range)
598 fromMail = row.fromMail!SQ3Text;
599 fromName = row.fromName!SQ3Text;
602 if (!chiroGetMessageFrom(uid, ref fromMail, ref fromName)) return;
604 uint ttcount = 0;
605 if (fromMail.length != 0 || fromName.length != 0) {
606 foreach (auto trow; dbConf.statement(`
607 SELECT
608 tagglob AS tagglob
609 , email AS email
610 , name AS name
611 , title AS title
612 FROM emailtwits
613 ;`).range)
615 auto email = trow.email!SQ3Text;
616 auto name = trow.name!SQ3Text;
617 auto glob = trow.tagglob!SQ3Text;
618 if (glob.length == 0 || (!email.length && !name.length)) continue; // just in case
619 // check for filter match
620 if (email.length && !globmatchCI(fromMail, email)) continue;
621 if (name.length && !globmatchCI(fromName, name)) continue;
622 auto title = trow.title!SQ3Text;
623 // for all tags
624 foreach (ref TagInfo ti; tags) {
625 if (ti.wasUpdates) continue;
626 if (!globmatch(ti.name, glob)) continue;
627 stUpdateMute.st
628 .bind(":uid", uid)
629 .bind(":tagid", ti.tagid)
630 .bind(":mute", Mute.ThreadStart)
631 .bindConstText(":title", (title.length ? title : null), allowNull:true)
632 .doAll();
633 ti.wasUpdates = true;
634 ++ttcount;
636 if (ttcount == tags.length) break;
640 // mute it if it is not muted, but should be
641 static auto statFindParentFor = LazyStatement!"View"(`
642 SELECT mute AS mute, parent AS parent
643 FROM threads
644 WHERE tagid=:tagid AND uid=:uid
645 LIMIT 1
646 ;`);
648 foreach (TagInfo ti; tags) {
649 auto mute = chiroGetMessageMute(ti.tagid, uid);
650 if (mute > Mute.Normal) {
651 ti.wasUpdates = true; // just in case
652 if (!ttcount) ttcount = 1;
653 continue;
655 uint puid = uid;
656 while (puid) {
657 statFindParentFor.st
658 .bind(":tagid", ti.tagid)
659 .bind(":uid", puid);
660 puid = 0;
661 foreach (auto prow; statFindParentFor.st.range) {
662 if (prow.mute!int > Mute.Normal) {
663 chiroSetMessageMute(ti.tagid, uid, Mute.ThreadOther);
664 ti.wasUpdates = true; // just in case
665 if (!ttcount) ttcount = 1;
666 } else {
667 puid = prow.parent!uint;
674 if (!ttcount) return;
676 // update threads
677 foreach (ref TagInfo ti; tags) {
678 if (!ti.wasUpdates) continue;
679 updateTwittedThreadsInTag(ti.tagid, uid);
684 //==========================================================================
686 // updateViewDB
688 // check for new messages, and update view database
690 //==========================================================================
691 void updateViewDB () {
692 uint maxViewUid = 0;
693 uint maxStoreUid = 0;
695 twitPrepare();
697 foreach (auto row; dbView.statement(`SELECT MAX(uid) AS uid FROM info;`).range) maxViewUid = row.uid!uint;
698 foreach (auto row; dbStore.statement(`SELECT MAX(uid) AS uid FROM messages;`).range) maxStoreUid = row.uid!uint;
700 if (maxViewUid >= maxStoreUid) return;
701 conwriteln("need to process around ", maxStoreUid-maxViewUid, " messages.");
703 uint[] relinkTids;
704 relinkTids.reserve(64);
705 scope(exit) delete relinkTids;
707 foreach (uint uid; maxViewUid+1..maxStoreUid+1) {
708 conwriteln("============ message #", uid, " ============");
709 DynStr msg, tags;
710 foreach (auto row; dbStore.statement(`
711 SELECT tags AS tags, ChiroUnpack(data) AS data FROM messages WHERE uid=:uid LIMIT 1
712 ;`).bind(":uid", uid).range)
714 msg = row.data!SQ3Text;
715 tags = row.tags!SQ3Text;
717 if (msg.length == 0 || tags.length == 0) continue; // not interesting
719 DynStr acc = tags.extractAccount();
720 DynStr origTags = tags;
721 DynStr deftag = tags.extractFirstFolder();
722 tags = tags.removeFirstFolder();
724 auto hlp = new RealFilterHelper;
725 scope(exit) delete hlp;
726 hlp.account = acc;
727 hlp.tag = deftag;
728 if (hlp.tag.length == 0) hlp.tag = "#hobo";
729 hlp.message = msg;
730 // filter
731 foreach (auto row; dbConf.statement(`SELECT filterid AS filterid, name AS name, body AS body FROM filters ORDER BY idx;`).range) {
732 //conwrite(" filter '", row.name!SQ3Text, "' (", row.filterid!uint, "): ");
733 bool goOn = false;
734 hlp.matched = false;
735 try {
736 //version(debug_filter_helper) writeln("::: <", row.body!SQ3Text, ">");
737 goOn = executeMailFilter(row.body!SQ3Text, hlp);
738 } catch (Exception e) {
739 conwriteln("ERROR IN FILTER '", row.name!SQ3Text, "': ", e.msg);
741 if (hlp.matched) {
742 conwriteln("...filter '", row.name!SQ3Text, " matched!");
744 //hlp.writeResult(); writeln;
745 //version(debug_filter_helper) writeln("::: <", row.body!SQ3Text, ">: goon=", goOn, "; isstop=", hlp.isStop);
746 //assert(hlp.isStop == !goOn);
747 if (hlp.isStop) break;
749 //write(" FINAL RESULT:"); hlp.writeResult(); writeln;
750 // done filtering
752 bool markSpamHam = false; //!!!
753 if (!hlp.isSpam && !hlp.isHam) {
754 auto bogo = messageBogoCheck(uid);
755 if (bogo == Bogo.Spam) {
756 bool exists;
757 conwriteln("BOGO: SPAM message #", uid, "; from={", hlp.getFromName.getData, "}:<", hlp.getFromMail.getData, ">; to={",
758 hlp.getToName.getData, "}:<", hlp.getToMail.getData, ">; subj=", hlp.getSubj(out exists).getData);
759 hlp.performAction(hlp.Action.Spam);
760 markSpamHam = false;
764 if (hlp.isSpam) hlp.tag = "#spam"; // always
766 if (hlp.tag.length == 0) hlp.tag = deftag; // just in case
767 bool hasTag = false;
768 forEachTag(tags, (xtag) {
769 if (xtag == hlp.tag) {
770 hasTag = true;
771 return false; // stop
773 return true; // go on
776 // `tags` should contain our new tags
777 if (!hasTag) {
778 DynStr tt = hlp.tag;
779 if (tags.length) {
780 tt ~= "|";
781 tt ~= tags;
783 tags = tt;
786 // update tags info in the storage
787 if (tags != origTags) {
788 transacted!"Store"{
789 dbStore.statement(`UPDATE messages SET tags=:tags WHERE uid=:uid;`)
790 .bind(":uid", uid)
791 .bindConstText(":tags", tags.getData)
792 .doAll();
796 // insert the message into the view db
797 int appearance = Appearance.Unread;
798 if (hlp.isDelete) appearance = Appearance.SoftDeleteFilter;
799 else if (hlp.isPurge) appearance = Appearance.SoftDeletePurge;
800 if (appearance == Appearance.Unread && (hlp.isRead || hlp.isSpam)) appearance = Appearance.Read;
801 if (markSpamHam) {
802 if (hlp.isSpam) messageBogoMarkSpam(uid);
803 if (hlp.isHam) messageBogoMarkHam(uid);
806 uint msgtime = 0;
807 DynStr hdr, body;
808 foreach (auto trow; dbStore.statement(`
809 SELECT
810 ChiroExtractHeaders(:msgdata) AS headers
811 , ChiroExtractBody(:msgdata) AS body
812 , ChiroHdr_RecvTime(:msgdata) AS msgtime
813 ;`).bindConstText(":msgdata", msg.getData).range)
815 msgtime = trow.msgtime!uint;
816 hdr = trow.headers!SQ3Text;
817 body = trow.body!SQ3Text;
820 conwriteln("putting msg ", uid, " (time:", msgtime, "; appr=", appearance, ") to '", tags.getData, "'; oldtags='", origTags.getData, "'");
822 transacted!"View"{
823 //dbView.beginTransaction();
824 //scope(success) dbView.commitTransaction();
825 //scope(failure) dbView.rollbackTransaction();
826 chiroParseAndInsertOneMessage(uid, msgtime, appearance, hdr, body, tags);
828 DynStr msgid;
829 foreach (auto mrow; dbView.statement(`SELECT msgid AS msgid FROM msgids WHERE uid=:uid LIMIT 1;`).bind(":uid", uid).range) {
830 msgid = mrow.msgid!SQ3Text;
832 //if (msgid.length == 0) return;
833 version(debug_updater) {
835 auto fo = VFile("zzz", "a");
836 fo.writeln("MSGUID: ", uid, "; MSGID: <", msgid.getData, ">");
839 conwriteln("MSGUID: ", uid, "; MSGID: <", msgid.getData, ">");
841 // collect tags to modify
842 int[] taglist;
843 scope(exit) delete taglist;
844 taglist.reserve(16);
846 foreach (auto trow; dbView.statement(`SELECT tagid AS tagid, parent AS parent FROM threads WHERE uid=:uid;`)
847 .bind(":uid", uid).range)
849 immutable uint tid = trow.tagid!uint;
850 if (tid) {
851 bool found = false;
852 foreach (immutable uint tt; relinkTids) if (tt == tid) { found = true; break; }
853 if (!found) relinkTids ~= tid;
855 if (!tid || trow.parent!uint || !chiroIsTagThreaded(tid)) continue;
856 conwriteln(" tagid: ", tid, " (", chiroGetTagName(tid).getData, ")");
857 version(debug_updater) {
859 auto fo = VFile("zzz", "a");
860 fo.writeln(" tagid: ", tid, " (", chiroGetTagName(tid).getData, ")");
863 taglist ~= tid;
866 foreach (immutable uint tid; taglist) {
867 uint setUsAsParentFor = 0;
868 bool needFullRelink = false;
869 // check if there are any references to us, and fix them by full relink
870 if (!msgid.length) continue;
871 foreach (auto nrow; dbView.statement(`
872 SELECT refids.uid AS uid, tt.parent AS parent
873 FROM refids
874 INNER JOIN(threads) AS tt
875 ON tt.tagid=:tagid AND tt.uid=refids.uid
876 WHERE idx=0 AND msgid=:msgid
877 LIMIT 1
878 ;`).bind(":tagid", tid).bindConstText(":msgid", msgid.getData).range)
880 if (nrow.parent!uint == 0) {
881 setUsAsParentFor = nrow.uid!uint;
882 } else {
883 needFullRelink = true;
887 if (needFullRelink) {
888 //FIXME: make this faster!
889 conwriteln(" tid: ", tid, " (", chiroGetTagName(tid).getData, "); performing full relink...");
890 chiroSupportRelinkTagThreads(tid);
891 continue;
894 if (setUsAsParentFor) {
895 conwriteln(" tid: ", tid, " (", chiroGetTagName(tid).getData, "); settuing us (", uid, ") as a parent for ", setUsAsParentFor);
896 dbView.statement(`
897 UPDATE threads
898 SET parent=:uid
899 WHERE uid=:xuid AND tagid=:tid
900 ;`).bind(":uid", uid).bind(":xuid", setUsAsParentFor).bind(":tagid", tid).doAll();
903 // find parent for us
904 uint paruid = 0;
905 foreach (auto prow; dbView.statement(`
906 SELECT msgids.uid AS paruid
907 FROM msgids
908 INNER JOIN(threads) AS tt
909 ON tt.tagid=:tagid AND tt.uid=msgids.uid
910 WHERE msgids.uid<>:uid AND msgids.msgid IN (SELECT msgid FROM refids WHERE uid=:uid ORDER BY idx)
911 LIMIT 1
912 ;`).bind(":uid", uid).bind(":tagid", tid).range)
914 paruid = prow.paruid!uint;
916 conwriteln(" tid: ", tid, " (", chiroGetTagName(tid).getData, "); paruid=", paruid);
917 if (paruid && paruid != uid) {
918 dbView.statement(`UPDATE threads SET parent=:paruid WHERE uid=:uid AND tagid=:tagid;`)
919 .bind(":uid", uid)
920 .bind(":tagid", tid)
921 .bind(":paruid", paruid)
922 .doAll();
926 twitMessage(uid);
930 // relink threads
931 if (relinkTids.length) {
932 foreach (immutable uint tid; relinkTids) {
933 if (vbwin && !vbwin.closed) vbwin.postEvent(new TagThreadsUpdatedEvent(tid));
939 //==========================================================================
941 // checkerThread
943 //==========================================================================
944 void checkerThread (Tid ownerTid) {
945 uint accid = 0;
946 bool isError = false;
947 try {
948 receive(
949 (CheckCommand cmd) {
950 accid = cmd.accid;
954 if (accid == 0) {
955 ownerTid.send(ControlCommand(ControlCommand.Kind.CheckError, accid));
956 return;
959 bool found = false;
960 int checktime;
961 bool nosendauth;
962 bool debuglog;
963 uint nntplastindex;
964 DynStr name;
965 DynStr recvserver;
966 DynStr sendserver;
967 DynStr user;
968 DynStr pass;
969 DynStr inbox;
970 DynStr nntpgroup;
972 foreach (auto arow; stmtAccInfo.st.bind(":accid", accid).range) {
973 // i found her!
974 found = true;
975 int upmins = arow.checktime!int;
976 if (upmins < 1) upmins = 1; else if (upmins > 100000) upmins = 100000;
977 checktime = upmins;
978 nosendauth = (arow.nosendauth!int > 0);
979 debuglog = (arow.debuglog!int > 0);
980 nntplastindex = arow.nntplastindex!uint;
981 name = arow.name!SQ3Text;
982 recvserver = arow.recvserver!SQ3Text;
983 sendserver = arow.sendserver!SQ3Text;
984 user = arow.user!SQ3Text;
985 pass = arow.pass!SQ3Text;
986 inbox = arow.inbox!SQ3Text;
987 nntpgroup = arow.nntpgroup!SQ3Text;
990 if (!found) {
991 ownerTid.send(ControlCommand(ControlCommand.Kind.CheckError, accid));
992 return;
995 conwriteln("checking account '", name, "' (", accid, ")...");
997 stmtSetCheckTime.st.bind(":accid", accid).bind(":lastcheck", RunningAverageExp.GetTickCount()+checktime*60).doAll();
999 // ////////////////////////////////////////////////////////////////// //
1000 void CheckNNTP () {
1001 auto nsk = new SocketNNTP(recvserver);
1002 scope(exit) nsk.close();
1004 nsk.selectGroup(nntpgroup);
1005 if (nsk.emptyGroup) {
1006 conwriteln("[", name, ":", nntpgroup, "]: no new articles (empty group)");
1007 return;
1010 uint stnum = nntplastindex+1;
1011 if (stnum > nsk.hiwater) {
1012 conwriteln("[", name, ":", nntpgroup, "]: no new articles");
1013 return;
1016 conwriteln("[", name, ":", nntpgroup, "]: ", nsk.hiwater+1-stnum, " (possible) new articles");
1018 // download new articles
1019 foreach (immutable uint anum; stnum..nsk.hiwater+1) {
1020 DynStr msg;
1021 try {
1022 msg = nsk.getArticle(anum);
1023 } catch (Exception e) {
1024 conwriteln("[", name, ":", nntpgroup, "]: error downloading article #", anum);
1025 break;
1027 if (msg.length == 0) continue; // this article is empty
1028 // insert article into the storage
1029 // filtering will be done later, for now, insert with the default inbox
1030 DynStr tags;
1031 if (inbox.length) tags ~= inbox;
1032 if (name.length) {
1033 if (tags.length) tags ~= "|";
1034 tags ~= "account:";
1035 tags ~= name;
1037 if (tags.length == 0) tags = "#hobo";
1038 conwriteln("[", name, ":", nntpgroup, "]: storing article #", anum, " for '", tags.getData, "'...");
1039 transacted!"Store"{
1040 dbStore.statement(`INSERT INTO messages(tags, data) VALUES(:tags, ChiroPack(:data));`)
1041 .bindConstText(":tags", tags.getData)
1042 .bindConstBlob(":data", msg.getData)
1043 .doAll();
1045 // update account with the new highest nntp index
1046 transacted!"Conf"{
1047 dbConf.statement(`UPDATE accounts SET nntplastindex=:anum WHERE accid=:accid;`)
1048 .bind(":accid", accid)
1049 .bind(":anum", anum)
1050 .doAll();
1055 // ////////////////////////////////////////////////////////////////// //
1056 void CheckSMTP () {
1057 conwriteln("*** [", name, "]: connecting...");
1058 auto pop3 = new SocketPOP3(recvserver);
1059 scope(exit) pop3.close();
1060 if (user.length) {
1061 conwriteln("[", name, "]: authenticating...");
1062 pop3.auth(user, pass);
1064 auto newmsg = pop3.getNewMailCount;
1065 if (newmsg == 0) {
1066 conwriteln("[", name, "]: no new messages");
1067 return;
1069 conwriteln("[", name, "]: ", newmsg, " new message", (newmsg > 1 ? "s" : ""));
1070 foreach (immutable int popidx; 1..newmsg+1) {
1071 DynStr msg;
1072 try {
1073 msg = pop3.getMessage(popidx); // full message, with the ending dot
1074 } catch (Exception e) {
1075 conwriteln("[", name, "]: error downloading message #", popidx);
1076 break;
1078 if (msg.length != 0) {
1079 DynStr tags;
1080 if (inbox.length) tags ~= inbox;
1081 if (name.length) {
1082 if (tags.length) tags ~= "|";
1083 tags ~= "account:";
1084 tags ~= name;
1086 if (tags.length == 0) tags = "#hobo";
1087 conwriteln("[", name, ":", nntpgroup, "]: storing message #", popidx, " for '", tags.getData, "'...");
1088 transacted!"Store"{
1089 dbStore.statement(`INSERT INTO messages(tags, data) VALUES(:tags, ChiroPack(:data));`)
1090 .bindConstText(":tags", tags.getData)
1091 .bindConstBlob(":data", msg.getData)
1092 .doAll();
1095 //auto msg = pop3.getMessage!true(popidx); // full message, with the ending dot, and exact terminators
1096 // process
1097 pop3.deleteMessage(popidx);
1101 // ////////////////////////////////////////////////////////////////// //
1102 try {
1103 if (nntpgroup.length) CheckNNTP(); else CheckSMTP();
1104 } catch (Throwable e) {
1105 conwriteln("ERROR checking account '", name, "' (", accid, "): ", e.msg);
1106 isError = true;
1109 conwriteln("done checking account '", name, "' (", accid, ")...");
1111 if (vbwin && !vbwin.closed) {
1112 vbwin.postEvent(new UpdatingAccountCompleteEvent(accid));
1113 //sqlite3_sleep(1000);
1115 } catch (Throwable e) {
1116 // here, we are dead and fucked (the exact order doesn't matter)
1117 //import core.stdc.stdlib : abort;
1118 import core.stdc.stdio : fprintf, stderr;
1119 //import core.memory : GC;
1120 import core.thread : thread_suspendAll;
1121 //GC.disable(); // yeah
1122 //thread_suspendAll(); // stop right here, you criminal scum!
1123 auto s = e.toString();
1124 fprintf(stderr, "\n=== FATAL ===\n%.*s\n", cast(uint)s.length, s.ptr);
1125 //abort(); // die, you bitch!
1126 ownerTid.send(ControlCommand(ControlCommand.Kind.CheckError, accid));
1127 return;
1129 //if (vbwin) vbwin.postEvent(evDoConCommands); }
1130 ownerTid.send(ControlCommand((isError ? ControlCommand.Kind.CheckError : ControlCommand.Kind.CheckDone), accid));
1134 //==========================================================================
1136 // controlThread
1138 //==========================================================================
1139 void controlThread (Tid ownerTid) {
1140 import core.time;
1141 bool doQuit = false;
1142 try {
1143 static struct AccCheck {
1144 uint accid;
1145 bool inprogress;
1146 Tid tid;
1149 AccCheck[] accidCheckList;
1150 accidCheckList.reserve(128);
1152 dbConf.execute(`
1153 CREATE TEMP TABLE IF NOT EXISTS checktimes (
1154 accid INTEGER PRIMARY KEY UNIQUE /* unique, never zero */
1155 , lastcheck INTEGER NOT NULL DEFAULT 0
1156 , checking INTEGER NOT NULL DEFAULT 0
1160 static stmtAllAccs = LazyStatement!"Conf"(`
1161 SELECT
1162 accid AS accid
1163 , checktime AS checktime
1164 FROM accounts
1165 WHERE nocheck=0 AND inbox<>''
1166 ORDER BY accid
1167 ;`);
1169 static stmtGetCheckTime = LazyStatement!"Conf"(`
1170 SELECT lastcheck AS lastcheck FROM checktimes WHERE accid=:accid LIMIT 1
1171 ;`);
1173 updateViewDB();
1175 MonoTime lastCollect = MonoTime.currTime;
1176 //accidCheckList ~= AccCheck();
1178 bool needUpdates = false;
1179 bool forceAll = false;
1180 for (;;) {
1181 if (doQuit && accidCheckList.length == 0) break;
1182 receiveTimeout((doQuit ? 50.msecs : accidCheckList.length || needUpdates || forceAll ? 1.seconds : 60.seconds),
1183 (ControlCommand cmd) {
1184 final switch (cmd.type) {
1185 case ControlCommand.Kind.ForceUpdateAll: forceAll = true; break;
1186 case ControlCommand.Kind.Ping: break;
1187 case ControlCommand.Kind.Quit: doQuit = true; break;
1188 case ControlCommand.Kind.DisableUpdates: updatesDisabled = true; break;
1189 case ControlCommand.Kind.EnableUpdates: updatesDisabled = false; break;
1190 case ControlCommand.Kind.CheckDone:
1191 case ControlCommand.Kind.CheckError:
1192 needUpdates = true;
1193 if (accidCheckList.length) {
1194 foreach (immutable idx, const ref AccCheck nfo; accidCheckList) {
1195 if (nfo.accid == cmd.accid) {
1196 //if (!doQuit && vbwin && !vbwin.closed) vbwin.postEvent(new UpdatingAccountCompleteEvent(nfo.accid));
1197 foreach (immutable c; idx+1..accidCheckList.length) accidCheckList[c-1] = accidCheckList[c];
1198 accidCheckList.length -= 1;
1199 break;
1202 if (!doQuit && vbwin && !vbwin.closed && accidCheckList.length == 0) vbwin.postEvent(new UpdatingCompleteEvent());
1204 break;
1209 for (usize idx = 0; idx < accidCheckList.length; ) {
1210 if (accidCheckList[idx].accid != 0) {
1211 ++idx;
1212 } else {
1213 foreach (immutable c; idx+1..accidCheckList.length) accidCheckList[c-1] = accidCheckList[c];
1214 accidCheckList.length -= 1;
1218 if (doQuit) {
1219 for (usize idx = 0; idx < accidCheckList.length; ) {
1220 if (accidCheckList[idx].inprogress) {
1221 ++idx;
1222 } else {
1223 foreach (immutable c; idx+1..accidCheckList.length) accidCheckList[c-1] = accidCheckList[c];
1224 accidCheckList.length -= 1;
1227 continue;
1230 if (!needUpdates && !updatesDisabled) {
1231 ulong ctt = RunningAverageExp.GetTickCount();
1232 foreach (auto arow; stmtAllAccs.st.range) {
1233 bool found = false;
1234 foreach (const ref AccCheck nfo; accidCheckList) if (nfo.accid == arow.accid!uint) { found = true; break; }
1235 if (found) continue;
1236 // forced update?
1237 if (forceAll) {
1238 accidCheckList ~= AccCheck(arow.accid!uint);
1239 continue;
1241 // check timeout
1242 int upmins = arow.checktime!int;
1243 if (upmins < 1) upmins = 1; else if (upmins > 100000) upmins = 100000;
1244 ulong lastcheck = 0;
1245 foreach (auto crow; stmtGetCheckTime.st.bind(":accid", arow.accid!uint).range) lastcheck = crow.lastcheck!ulong;
1246 lastcheck += upmins*60; // next check time
1247 if (lastcheck < ctt) {
1248 // i found her!
1249 accidCheckList ~= AccCheck(arow.accid!uint);
1252 else {
1253 conwriteln("check for accid ", arow.accid!uint, " in ", (lastcheck-ctt)/60, " minutes...");
1257 forceAll = false;
1260 if (!updatesDisabled) {
1261 foreach (ref AccCheck nfo; accidCheckList) {
1262 if (nfo.inprogress) break;
1263 if (vbwin) vbwin.postEvent(new UpdatingAccountEvent(nfo.accid));
1264 nfo.tid = spawn(&checkerThread, thisTid);
1265 nfo.inprogress = true;
1266 nfo.tid.send(CheckCommand(nfo.accid));
1267 break;
1271 bool hasProgress = false;
1272 foreach (ref AccCheck nfo; accidCheckList) if (nfo.inprogress) { hasProgress = true; break; }
1273 if (!hasProgress) {
1274 updateViewDB();
1275 needUpdates = false;
1278 if (!doQuit) {
1279 immutable ctt = MonoTime.currTime;
1280 if ((ctt-lastCollect).total!"minutes" >= 5) {
1281 import core.memory : GC;
1282 lastCollect = ctt;
1283 GC.collect();
1284 GC.minimize();
1288 ownerTid.send(ControlReply.Quit);
1289 } catch (Throwable e) {
1290 // here, we are dead and fucked (the exact order doesn't matter)
1291 import core.stdc.stdlib : abort;
1292 import core.stdc.stdio : fprintf, stderr;
1293 import core.memory : GC;
1294 import core.thread : thread_suspendAll;
1295 GC.disable(); // yeah
1296 thread_suspendAll(); // stop right here, you criminal scum!
1297 auto s = e.toString();
1298 fprintf(stderr, "\n=== FATAL ===\n%.*s\n", cast(uint)s.length, s.ptr);
1299 abort(); // die, you bitch!
1304 //==========================================================================
1306 // receiverDisable
1308 //==========================================================================
1309 public void receiverDisable () {
1310 rcDisabled = true;
1314 //==========================================================================
1316 // disableMailboxUpdates
1318 //==========================================================================
1319 public void disableMailboxUpdates () {
1320 if (!rcStarted) return;
1321 controlThreadId.send(ControlCommand(ControlCommand.Kind.DisableUpdates));
1325 //==========================================================================
1327 // enableMailboxUpdates
1329 //==========================================================================
1330 public void enableMailboxUpdates () {
1331 if (!rcStarted) return;
1332 controlThreadId.send(ControlCommand(ControlCommand.Kind.EnableUpdates));
1336 //==========================================================================
1338 // receiverForceUpdateAll
1340 //==========================================================================
1341 public void receiverForceUpdateAll () {
1342 if (!rcStarted) return;
1343 controlThreadId.send(ControlCommand(ControlCommand.Kind.ForceUpdateAll));
1347 //==========================================================================
1349 // receiverInit
1351 //==========================================================================
1352 public void receiverInit () {
1353 if (rcStarted) return;
1354 if (rcDisabled) return;
1355 controlThreadId = spawn(&controlThread, thisTid);
1356 rcStarted = true;
1360 //==========================================================================
1362 // receiverDeinit
1364 //==========================================================================
1365 public void receiverDeinit () {
1366 if (!rcStarted) return;
1367 controlThreadId.send(ControlCommand(ControlCommand.Kind.Quit));
1368 bool done = false;
1369 while (!done) {
1370 receive(
1371 (ControlReply reply) {
1372 if (reply == ControlReply.Quit) done = true;