2 * coded by Ketmar // Invisible Vector <ketmar@ketmar.no-ip.org>
3 * Understanding is not required. Only obedience.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, version 3 of the License ONLY.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 // mail/nntp receiver thread
18 module receiver
is aliced
;
21 //version = debug_filter_helper;
22 //version = debug_updater;
24 import std
.concurrency
;
29 import iv
.timer
: DurTimer
= Timer
;
36 import chibackend
.net
;
41 // ////////////////////////////////////////////////////////////////////////// //
42 class RealFilterHelper
: FilterHelper
{
45 // only one of these can be set
46 ActFlagDelete
= 1u<<0,
49 // only one of these can be set
59 DynStr tag
; // destination tag
60 uint actFlags
; // see above
65 ~this () nothrow @nogc { account
.clear(); tag
.clear(); message
.clear(); }
67 final @property bool isDelete () const pure nothrow @safe @nogc { pragma(inline
, true); return !!(actFlags
&ActFlagDelete
); }
68 final @property bool isPurge () const pure nothrow @safe @nogc { pragma(inline
, true); return !!(actFlags
&ActFlagPurge
); }
69 final @property bool isSpam () const pure nothrow @safe @nogc { pragma(inline
, true); return !!(actFlags
&ActFlagSpam
); }
70 final @property bool isHam () const pure nothrow @safe @nogc { pragma(inline
, true); return !!(actFlags
&ActFlagHam
); }
71 final @property bool isRead () const pure nothrow @safe @nogc { pragma(inline
, true); return !!(actFlags
&ActFlagRead
); }
72 final @property bool isStop () const pure nothrow @safe @nogc { pragma(inline
, true); return !!(actFlags
&ActFlagStop
); }
74 // called if a filter was matched
75 override void filterMatched () {
79 override DynStr
getAccount () {
83 override DynStr
getHeaderField (const(char)[] header
, out bool exists
) {
84 auto headers
= message
.getData
[0..findHeadersEnd(message
.getData
)];
85 auto value
= findHeaderField(headers
, header
);
86 exists
= (value
!is null);
87 version(debug_filter_helper
) writeln("...getHeaderField(", header
, "): exists=", exists
, "; res=", value
);
91 override DynStr
getFromName () {
92 auto headers
= message
.getData
[0..findHeadersEnd(message
.getData
)];
93 auto value
= findHeaderField(headers
, "From").extractName
;
94 version(debug_filter_helper
) writeln("...getFromName: res=", value
);
98 override DynStr
getFromMail () {
99 auto headers
= message
.getData
[0..findHeadersEnd(message
.getData
)];
100 auto value
= findHeaderField(headers
, "From").extractMail
;
101 if (value
.length
== 0) value
= "nobody@nowhere";
102 version(debug_filter_helper
) writeln("...getFromMail: res=", value
);
103 return DynStr(value
);
106 override DynStr
getToName () {
107 auto headers
= message
.getData
[0..findHeadersEnd(message
.getData
)];
108 auto value
= findHeaderField(headers
, "To").extractName
;
109 version(debug_filter_helper
) writeln("...getToName: res=", value
);
110 return DynStr(value
);
113 override DynStr
getToMail () {
114 auto headers
= message
.getData
[0..findHeadersEnd(message
.getData
)];
115 auto value
= findHeaderField(headers
, "To").extractMail
;
116 if (value
.length
== 0) value
= "nobody@nowhere";
117 version(debug_filter_helper
) writeln("...getToMail: res=", value
);
118 return DynStr(value
);
121 override DynStr
getSubj (out bool exists
) {
122 auto headers
= message
.getData
[0..findHeadersEnd(message
.getData
)];
123 auto value
= findHeaderField(headers
, "Subject");
124 exists
= (value
!is null);
125 if (exists
) value
= value
.decodeSubj
.subjRemoveRe
;
126 return DynStr(value
);
129 override DynStr
exec (const(char)[] command
) {
130 /*version(debug_filter_helper)*/ writeln("...exec: <", command
, ">");
131 //return DynStr("nothing");
132 import std
.stdio
: File
;
135 // write article to file
137 UUID id
= randomUUID();
139 void deleteTempFile () {
140 if (buf
.length
) try { import std
.file
: remove
; remove(buf
.getData
); } catch (Exception e
) {}
142 scope(exit
) deleteTempFile();
143 buf
.reserve(2+16*2+42);
144 buf
~= "/tmp/_temp_";
145 foreach (immutable ubyte b
; id
.data
[]) {
146 buf
~= "0123456789abcdef"[b
>>4];
147 buf
~= "0123456789abcdef"[b
&0x0f];
151 auto fo
= VFile(buf
.getData
, "w");
152 fo
.rawWriteExact(message
.getData
);
155 //!conwriteln("EXEC filter '", command, "'... (", buf.getData, ")");
156 auto pid
= pipeProcess([command
, /*"-D",*/ buf
.getData
], Redirect
.all
, null, Config
.none
, "/tmp");
157 string action
= pid
.stdout
.readln
.xstrip
;
158 bool doStop
= (action
.length
&& action
[0] == '-');
159 if (doStop
) action
= action
[1..$].xstrip
;
161 while (!pid
.stderr
.eof
) conwriteln(" :", pid
.stderr
.readln
.xstrip
, "|");
164 //!conwriteln("EXEC filter '", command, "' action: ", action, " (", doStop, ")");
165 return DynStr(action
);
166 } catch (Exception e
) {
167 conwriteln("EXEC filter error: ", e
.msg
);
172 override void move (const(char)[] dest
) {
173 version(debug_filter_helper
) writeln("...move: <", dest
, ">");
177 override void performAction (Action action
) {
178 version(debug_filter_helper
) writeln("...performAction: ", action
);
181 actFlags
&= ~(ActFlagDelete|ActFlagPurge
);
182 actFlags |
= ActFlagPurge
;
184 case Action
.SoftDelete
:
185 actFlags
&= ~(ActFlagDelete|ActFlagPurge
);
186 actFlags |
= ActFlagDelete
;
189 actFlags
&= ~(ActFlagSpam|ActFlagHam
);
190 actFlags |
= ActFlagSpam
;
193 actFlags
&= ~(ActFlagSpam|ActFlagHam
);
194 actFlags |
= ActFlagHam
;
197 actFlags |
= ActFlagRead
;
200 actFlags |
= ActFlagStop
;
204 import std
.conv
: to
;
205 throw new FilterSyntaxException("action "~action
.to
!string
~" should not end up in the handler");
210 override bool match (const(char)[] pat
, const(char)[] str, bool casesens
) {
211 version(debug_filter_helper
) writeln("...match: casesens=", casesens
, "; pat=<", pat
, ">; str=<", str, ">");
212 immutable bool bol
= (pat
.length
&& pat
[0] == '^');
213 if (bol
) pat
= pat
[1..$];
214 immutable bool eol
= (pat
.length
&& pat
[$-1] == '$');
215 if (eol
) pat
= pat
[0..$-1];
216 version(debug_filter_helper
) writeln("...match: bol=", bol
, "; eol=", eol
, "; pat=<", pat
, ">");
217 if (pat
.length
== 0) return (bol
&& eol ?
str.length
== 0 : false);
218 if (str.length
< pat
.length
) return false;
219 if (bol
&& eol
) { if (str.length
!= pat
.length
) return false; }
220 else if (bol
) str = str[0..pat
.length
];
221 else if (eol
) str = str[str.length
-pat
.length
..$];
223 return (str.indexOf(pat
) >= 0);
225 while (str.length
>= pat
.length
) {
226 if (str.startsWithCI(pat
)) {
227 //writeln("...match: HIT! str=<", str, ">");
231 //writeln("...match: skip; str=<", str, ">; pat=<", pat, ">");
233 //writeln("...match: FAIL!");
238 void writeResult() () const {
239 if (isDelete
) write(" softdelete");
240 if (isPurge
) write(" purge");
241 if (isSpam
) write(" spam");
242 if (isHam
) write(" ham");
243 if (isRead
) write(" read");
244 write("; dest tag: ", tag
.getData
);
249 // ////////////////////////////////////////////////////////////////////////// //
250 __gshared
bool updatesDisabled
= false;
251 __gshared
bool rcDisabled
= false;
252 __gshared
bool rcStarted
= false;
253 __gshared Tid controlThreadId
;
260 struct ControlCommand
{
273 // for CheckDone or CheckError
277 this (Kind atype
) nothrow @safe @nogc { type
= atype
; accid
= 0; }
278 this (Kind atype
, uint aid
) nothrow @safe @nogc { type
= atype
; accid
= aid
; }
282 struct CheckCommand
{
287 static stmtAccInfo
= LazyStatement
!"Conf"(`
290 , checktime AS checktime
291 , nosendauth AS nosendauth
292 , debuglog AS debuglog
293 , nntplastindex AS nntplastindex
295 , recvserver AS recvserver
296 , sendserver AS sendserver
300 , nntpgroup AS nntpgroup
307 static stmtSetCheckTime
= LazyStatement
!"Conf"(`
308 INSERT INTO checktimes(accid,lastcheck) VALUES(:accid,:lastcheck)
310 DO UPDATE SET lastcheck=:lastcheck
314 //==========================================================================
318 // return `false` from delegate to stop
320 //==========================================================================
321 void forEachTag (const(char)[] tags
, bool delegate (const(char)[] tag
) dg
) {
322 if (dg
is null) return;
324 while (tags
.length
) {
325 auto stp
= tags
.indexOf('|');
326 if (stp
< 0) stp
= cast(uint)tags
.length
;
327 auto tag
= tags
[0..stp
];
328 tags
= tags
[(stp
< tags
.length ? stp
+1 : tags
.length
)..$];
329 if (tag
.length
== 0) continue;
335 //==========================================================================
339 //==========================================================================
340 DynStr
extractAccount (const(char)[] tags
) {
341 auto stp
= tags
.indexOf("account:");
343 if (stp
== 0 || tags
[stp
-1] == '|') {
344 tags
= tags
[stp
+8..$];
345 stp
= tags
.indexOf('|');
346 if (stp
>= 0) tags
= tags
[0..stp
];
354 //==========================================================================
356 // extractFirstFolder
358 // can return empty string
360 //==========================================================================
361 DynStr
extractFirstFolder (const(char)[] tags
) {
363 forEachTag(tags
, (tag
) {
364 if (tag
[0] != '/') return true; // go on
366 return false; // stop
372 //==========================================================================
376 // can return empty tags string
378 //==========================================================================
379 DynStr
removeFirstFolder (const(char)[] tags
) {
381 bool seenFolder
= false;
382 forEachTag(tags
, (tag
) {
383 if (!seenFolder
&& tag
[0] == '/') {
386 if (res
.length
) res
~= "|";
389 return true; // go on
395 // ////////////////////////////////////////////////////////////////////////// //
396 static struct TagInfo
{
403 //==========================================================================
407 //==========================================================================
408 void getMessageTags (ref TagInfo
[] tags
, uint uid
) {
409 auto stGetTags
= LazyStatement
!"View"(`
411 DISTINCT(threads.tagid) AS tagid
414 INNER JOIN tagnames AS tn USING(tagid)
419 foreach (auto row
; stGetTags
.st
.bind(":uid", uid
).range
) {
420 tags
~= TagInfo(row
.tagid
!uint, DynStr(row
.name
!SQ3Text
));
425 //==========================================================================
427 // updateTwittedThreadsInTag
429 //==========================================================================
430 void updateTwittedThreadsInTag (uint tagid
, uint uid
) {
431 auto stTempTbl
= LazyStatement
!"View"(`
432 INSERT INTO mutepairs
433 WITH RECURSIVE children(muid, paruid, mtagid, mmute) AS (
434 SELECT 0, :uid, :tagid, 666
437 tt.uid, tt.uid, mtagid, tt.mute
439 INNER JOIN threads AS tt ON
440 tt.tagid=cc.mtagid AND
441 tt.parent=cc.paruid AND
449 WHERE muid<>0 AND mmute=0
452 auto stFixMutes
= LazyStatement
!"View"(`
456 , appearance=iif(appearance=0,1,appearance)
457 FROM (SELECT muid, mtagid FROM mutepairs) AS cc
458 WHERE uid=cc.muid AND tagid=cc.mtagid AND mute=0
462 dbView
.execute(`DELETE FROM mutepairs;`);
465 .bind(":tagid", tagid
)
468 .bind(":mute", Mute
.ThreadOther
)
473 //==========================================================================
477 //==========================================================================
478 public void createTwitByMsgid (uint uid
, const(char)[] glob
="/dmars_ng/*") {
480 if (glob
.length
== 0) return;
482 auto stGetMsgid
= LazyStatement
!"View"(`
491 foreach (auto row
; stGetMsgid
.st
.bind(":uid", uid
).range
) msgid
= row
.msgid
!SQ3Text
;
492 if (msgid
.length
== 0) return;
494 auto stFindMsgidTwit
= LazyStatement
!"Conf"(`
498 WHERE msgid=:msgid AND tagglob=:glob
502 // check if we already have such twit
503 foreach (auto row
; stFindMsgidTwit
.st
.bindConstText(":msgid", msgid
.getData
).bindConstText(":glob", glob
).range
) return;
506 auto stAddMsgidTwit
= LazyStatement
!"Conf"(`
507 INSERT INTO msgidtwits
508 (etwitid, automatic, tagglob, msgid)
509 VALUES(0, 0,:tagglob,:msgid)
513 .bindConstText(":tagglob", glob
)
514 .bindConstText(":msgid", msgid
.getData
)
519 scope(exit
) delete tags
;
520 getMessageTags(ref tags
, uid
);
521 if (tags
.length
== 0) return; // just in case
525 auto stUpdateMute
= LazyStatement
!"View"(`
530 , appearance=iif(appearance=0,1,appearance)
531 WHERE uid=:uid AND tagid=:tagid AND mute=0
534 // mark the message as twitted
536 foreach (ref TagInfo ti
; tags
) {
539 .bind(":tagid", ti
.tagid
)
540 .bind(":mute", Mute
.ThreadStart
)
542 updateTwittedThreadsInTag(ti
.tagid
, uid
);
548 //==========================================================================
552 //==========================================================================
553 public void twitPrepare () {
555 CREATE TEMP TABLE IF NOT EXISTS mutepairs(
563 //==========================================================================
567 // set "mute" flag according to message filters
569 //==========================================================================
570 public void twitMessage (uint uid
) {
574 scope(exit
) delete tags
;
575 getMessageTags(ref tags
, uid
);
576 if (tags
.length
== 0) return; // just in case
578 auto stUpdateMute
= LazyStatement
!"View"(`
583 , appearance=iif(appearance=0,1,appearance)
584 WHERE uid=:uid AND tagid=:tagid AND mute=0
587 DynStr fromMail
, fromName
;
589 foreach (auto row; dbView.statement(`
591 from_name AS fromName
592 , from_mail AS fromMail
596 ;`).bind(":uid", uid).range)
598 fromMail = row.fromMail!SQ3Text;
599 fromName = row.fromName!SQ3Text;
602 if (!chiroGetMessageFrom(uid
, ref fromMail
, ref fromName
)) return;
603 if (fromMail
.length
== 0 && fromName
.length
== 0) return; // just in case
606 foreach (auto trow
; dbConf
.statement(`
615 auto email
= trow
.email
!SQ3Text
;
616 auto name
= trow
.name
!SQ3Text
;
617 auto glob
= trow
.tagglob
!SQ3Text
;
618 if (glob
.length
== 0 ||
(!email
.length
&& !name
.length
)) continue; // just in case
619 // check for filter match
620 if (email
.length
&& !globmatchCI(fromMail
, email
)) continue;
621 if (name
.length
&& !globmatchCI(fromName
, name
)) continue;
622 auto title
= trow
.title
!SQ3Text
;
624 foreach (ref TagInfo ti
; tags
) {
625 if (ti
.wasUpdates
) continue;
626 if (!globmatch(ti
.name
, glob
)) continue;
629 .bind(":tagid", ti
.tagid
)
630 .bind(":mute", Mute
.ThreadStart
)
631 .bindConstText(":title", (title
.length ? title
: null), allowNull
:true)
633 ti
.wasUpdates
= true;
636 if (ttcount
== tags
.length
) break;
639 if (!ttcount
) return;
642 foreach (ref TagInfo ti
; tags
) {
643 if (!ti
.wasUpdates
) continue;
644 updateTwittedThreadsInTag(ti
.tagid
, uid
);
649 //==========================================================================
653 // check for new messages, and update view database
655 //==========================================================================
656 void updateViewDB () {
658 uint maxStoreUid
= 0;
662 foreach (auto row
; dbView
.statement(`SELECT MAX(uid) AS uid FROM info;`).range
) maxViewUid
= row
.uid
!uint;
663 foreach (auto row
; dbStore
.statement(`SELECT MAX(uid) AS uid FROM messages;`).range
) maxStoreUid
= row
.uid
!uint;
665 if (maxViewUid
>= maxStoreUid
) return;
666 conwriteln("need to process around ", maxStoreUid
-maxViewUid
, " messages.");
669 relinkTids
.reserve(64);
670 scope(exit
) delete relinkTids
;
672 foreach (uint uid
; maxViewUid
+1..maxStoreUid
+1) {
673 conwriteln("============ message #", uid
, " ============");
675 foreach (auto row
; dbStore
.statement(`
676 SELECT tags AS tags, ChiroUnpack(data) AS data FROM messages WHERE uid=:uid LIMIT 1
677 ;`).bind(":uid", uid
).range
)
679 msg
= row
.data
!SQ3Text
;
680 tags
= row
.tags
!SQ3Text
;
682 if (msg
.length
== 0 || tags
.length
== 0) continue; // not interesting
684 DynStr acc
= tags
.extractAccount();
685 DynStr origTags
= tags
;
686 DynStr deftag
= tags
.extractFirstFolder();
687 tags
= tags
.removeFirstFolder();
689 auto hlp
= new RealFilterHelper
;
690 scope(exit
) delete hlp
;
693 if (hlp
.tag
.length
== 0) hlp
.tag
= "#hobo";
696 foreach (auto row
; dbConf
.statement(`SELECT filterid AS filterid, name AS name, body AS body FROM filters ORDER BY idx;`).range
) {
697 //conwrite(" filter '", row.name!SQ3Text, "' (", row.filterid!uint, "): ");
701 //version(debug_filter_helper) writeln("::: <", row.body!SQ3Text, ">");
702 goOn
= executeMailFilter(row
.body!SQ3Text
, hlp
);
703 } catch (Exception e
) {
704 conwriteln("ERROR IN FILTER '", row
.name
!SQ3Text
, "': ", e
.msg
);
707 conwriteln("...filter '", row
.name
!SQ3Text
, " matched!");
709 //hlp.writeResult(); writeln;
710 //version(debug_filter_helper) writeln("::: <", row.body!SQ3Text, ">: goon=", goOn, "; isstop=", hlp.isStop);
711 //assert(hlp.isStop == !goOn);
712 if (hlp
.isStop
) break;
714 //write(" FINAL RESULT:"); hlp.writeResult(); writeln;
717 bool markSpamHam
= false; //!!!
718 if (!hlp
.isSpam
&& !hlp
.isHam
) {
719 auto bogo
= messageBogoCheck(uid
);
720 if (bogo
== Bogo
.Spam
) {
722 conwriteln("BOGO: SPAM message #", uid
, "; from={", hlp
.getFromName
.getData
, "}:<", hlp
.getFromMail
.getData
, ">; to={",
723 hlp
.getToName
.getData
, "}:<", hlp
.getToMail
.getData
, ">; subj=", hlp
.getSubj(out exists
).getData
);
724 hlp
.performAction(hlp
.Action
.Spam
);
729 if (hlp
.isSpam
) hlp
.tag
= "#spam"; // always
731 if (hlp
.tag
.length
== 0) hlp
.tag
= deftag
; // just in case
733 forEachTag(tags
, (xtag
) {
734 if (xtag
== hlp
.tag
) {
736 return false; // stop
738 return true; // go on
741 // `tags` should contain our new tags
751 // update tags info in the storage
752 if (tags
!= origTags
) {
754 dbStore
.statement(`UPDATE messages SET tags=:tags WHERE uid=:uid;`)
756 .bindConstText(":tags", tags
.getData
)
761 // insert the message into the view db
762 int appearance
= Appearance
.Unread
;
763 if (hlp
.isDelete
) appearance
= Appearance
.SoftDeleteFilter
;
764 else if (hlp
.isPurge
) appearance
= Appearance
.SoftDeletePurge
;
765 if (appearance
== Appearance
.Unread
&& (hlp
.isRead || hlp
.isSpam
)) appearance
= Appearance
.Read
;
767 if (hlp
.isSpam
) messageBogoMarkSpam(uid
);
768 if (hlp
.isHam
) messageBogoMarkHam(uid
);
773 foreach (auto trow
; dbStore
.statement(`
775 ChiroExtractHeaders(:msgdata) AS headers
776 , ChiroExtractBody(:msgdata) AS body
777 , ChiroHdr_RecvTime(:msgdata) AS msgtime
778 ;`).bindConstText(":msgdata", msg
.getData
).range
)
780 msgtime
= trow
.msgtime
!uint;
781 hdr
= trow
.headers
!SQ3Text
;
782 body = trow
.body!SQ3Text
;
785 conwriteln("putting msg ", uid
, " (time:", msgtime
, "; appr=", appearance
, ") to '", tags
.getData
, "'; oldtags='", origTags
.getData
, "'");
788 //dbView.beginTransaction();
789 //scope(success) dbView.commitTransaction();
790 //scope(failure) dbView.rollbackTransaction();
791 chiroParseAndInsertOneMessage(uid
, msgtime
, appearance
, hdr
, body, tags
);
794 foreach (auto mrow
; dbView
.statement(`SELECT msgid AS msgid FROM msgids WHERE uid=:uid LIMIT 1;`).bind(":uid", uid
).range
) {
795 msgid
= mrow
.msgid
!SQ3Text
;
797 //if (msgid.length == 0) return;
798 version(debug_updater
) {
800 auto fo
= VFile("zzz", "a");
801 fo
.writeln("MSGUID: ", uid
, "; MSGID: <", msgid
.getData
, ">");
804 conwriteln("MSGUID: ", uid
, "; MSGID: <", msgid
.getData
, ">");
806 // collect tags to modify
808 scope(exit
) delete taglist
;
811 foreach (auto trow
; dbView
.statement(`SELECT tagid AS tagid, parent AS parent FROM threads WHERE uid=:uid;`)
812 .bind(":uid", uid
).range
)
814 immutable uint tid
= trow
.tagid
!uint;
817 foreach (immutable uint tt
; relinkTids
) if (tt
== tid
) { found
= true; break; }
818 if (!found
) relinkTids
~= tid
;
820 if (!tid || trow
.parent
!uint ||
!chiroIsTagThreaded(tid
)) continue;
821 conwriteln(" tagid: ", tid
, " (", chiroGetTagName(tid
).getData
, ")");
822 version(debug_updater
) {
824 auto fo
= VFile("zzz", "a");
825 fo
.writeln(" tagid: ", tid
, " (", chiroGetTagName(tid
).getData
, ")");
831 foreach (immutable uint tid
; taglist
) {
832 uint setUsAsParentFor
= 0;
833 bool needFullRelink
= false;
834 // check if there are any references to us, and fix them by full relink
835 if (!msgid
.length
) continue;
836 foreach (auto nrow
; dbView
.statement(`
837 SELECT refids.uid AS uid, tt.parent AS parent
839 INNER JOIN(threads) AS tt
840 ON tt.tagid=:tagid AND tt.uid=refids.uid
841 WHERE idx=0 AND msgid=:msgid
843 ;`).bind(":tagid", tid
).bindConstText(":msgid", msgid
.getData
).range
)
845 if (nrow
.parent
!uint == 0) {
846 setUsAsParentFor
= nrow
.uid
!uint;
848 needFullRelink
= true;
852 if (needFullRelink
) {
853 //FIXME: make this faster!
854 conwriteln(" tid: ", tid
, " (", chiroGetTagName(tid
).getData
, "); performing full relink...");
855 chiroSupportRelinkTagThreads(tid
);
859 if (setUsAsParentFor
) {
860 conwriteln(" tid: ", tid
, " (", chiroGetTagName(tid
).getData
, "); settuing us (", uid
, ") as a parent for ", setUsAsParentFor
);
864 WHERE uid=:xuid AND tagid=:tid
865 ;`).bind(":uid", uid
).bind(":xuid", setUsAsParentFor
).bind(":tagid", tid
).doAll();
868 // find parent for us
870 foreach (auto prow
; dbView
.statement(`
871 SELECT msgids.uid AS paruid
873 INNER JOIN(threads) AS tt
874 ON tt.tagid=:tagid AND tt.uid=msgids.uid
875 WHERE msgids.uid<>:uid AND msgids.msgid IN (SELECT msgid FROM refids WHERE uid=:uid ORDER BY idx)
877 ;`).bind(":uid", uid
).bind(":tagid", tid
).range
)
879 paruid
= prow
.paruid
!uint;
881 conwriteln(" tid: ", tid
, " (", chiroGetTagName(tid
).getData
, "); paruid=", paruid
);
882 if (paruid
&& paruid
!= uid
) {
883 dbView
.statement(`UPDATE threads SET parent=:paruid WHERE uid=:uid AND tagid=:tagid;`)
886 .bind(":paruid", paruid
)
896 if (relinkTids
.length
) {
897 foreach (immutable uint tid
; relinkTids
) {
898 if (vbwin
&& !vbwin
.closed
) vbwin
.postEvent(new TagThreadsUpdatedEvent(tid
));
904 //==========================================================================
908 //==========================================================================
909 void checkerThread (Tid ownerTid
) {
911 bool isError
= false;
920 ownerTid
.send(ControlCommand(ControlCommand
.Kind
.CheckError
, accid
));
937 foreach (auto arow
; stmtAccInfo
.st
.bind(":accid", accid
).range
) {
940 int upmins
= arow
.checktime
!int;
941 if (upmins
< 1) upmins
= 1; else if (upmins
> 100000) upmins
= 100000;
943 nosendauth
= (arow
.nosendauth
!int > 0);
944 debuglog
= (arow
.debuglog
!int > 0);
945 nntplastindex
= arow
.nntplastindex
!uint;
946 name
= arow
.name
!SQ3Text
;
947 recvserver
= arow
.recvserver
!SQ3Text
;
948 sendserver
= arow
.sendserver
!SQ3Text
;
949 user
= arow
.user
!SQ3Text
;
950 pass
= arow
.pass
!SQ3Text
;
951 inbox
= arow
.inbox
!SQ3Text
;
952 nntpgroup
= arow
.nntpgroup
!SQ3Text
;
956 ownerTid
.send(ControlCommand(ControlCommand
.Kind
.CheckError
, accid
));
960 conwriteln("checking account '", name
, "' (", accid
, ")...");
962 stmtSetCheckTime
.st
.bind(":accid", accid
).bind(":lastcheck", RunningAverageExp
.GetTickCount()+checktime
*60).doAll();
964 // ////////////////////////////////////////////////////////////////// //
966 auto nsk
= new SocketNNTP(recvserver
);
967 scope(exit
) nsk
.close();
969 nsk
.selectGroup(nntpgroup
);
970 if (nsk
.emptyGroup
) {
971 conwriteln("[", name
, ":", nntpgroup
, "]: no new articles (empty group)");
975 uint stnum
= nntplastindex
+1;
976 if (stnum
> nsk
.hiwater
) {
977 conwriteln("[", name
, ":", nntpgroup
, "]: no new articles");
981 conwriteln("[", name
, ":", nntpgroup
, "]: ", nsk
.hiwater
+1-stnum
, " (possible) new articles");
983 // download new articles
984 foreach (immutable uint anum
; stnum
..nsk
.hiwater
+1) {
987 msg
= nsk
.getArticle(anum
);
988 } catch (Exception e
) {
989 conwriteln("[", name
, ":", nntpgroup
, "]: error downloading article #", anum
);
992 if (msg
.length
== 0) continue; // this article is empty
993 // insert article into the storage
994 // filtering will be done later, for now, insert with the default inbox
996 if (inbox
.length
) tags
~= inbox
;
998 if (tags
.length
) tags
~= "|";
1002 if (tags
.length
== 0) tags
= "#hobo";
1003 conwriteln("[", name
, ":", nntpgroup
, "]: storing article #", anum
, " for '", tags
.getData
, "'...");
1005 dbStore
.statement(`INSERT INTO messages(tags, data) VALUES(:tags, ChiroPack(:data));`)
1006 .bindConstText(":tags", tags
.getData
)
1007 .bindConstBlob(":data", msg
.getData
)
1010 // update account with the new highest nntp index
1012 dbConf
.statement(`UPDATE accounts SET nntplastindex=:anum WHERE accid=:accid;`)
1013 .bind(":accid", accid
)
1014 .bind(":anum", anum
)
1020 // ////////////////////////////////////////////////////////////////// //
1022 conwriteln("*** [", name
, "]: connecting...");
1023 auto pop3
= new SocketPOP3(recvserver
);
1024 scope(exit
) pop3
.close();
1026 conwriteln("[", name
, "]: authenticating...");
1027 pop3
.auth(user
, pass
);
1029 auto newmsg
= pop3
.getNewMailCount
;
1031 conwriteln("[", name
, "]: no new messages");
1034 conwriteln("[", name
, "]: ", newmsg
, " new message", (newmsg
> 1 ?
"s" : ""));
1035 foreach (immutable int popidx
; 1..newmsg
+1) {
1038 msg
= pop3
.getMessage(popidx
); // full message, with the ending dot
1039 } catch (Exception e
) {
1040 conwriteln("[", name
, "]: error downloading message #", popidx
);
1043 if (msg
.length
!= 0) {
1045 if (inbox
.length
) tags
~= inbox
;
1047 if (tags
.length
) tags
~= "|";
1051 if (tags
.length
== 0) tags
= "#hobo";
1052 conwriteln("[", name
, ":", nntpgroup
, "]: storing message #", popidx
, " for '", tags
.getData
, "'...");
1054 dbStore
.statement(`INSERT INTO messages(tags, data) VALUES(:tags, ChiroPack(:data));`)
1055 .bindConstText(":tags", tags
.getData
)
1056 .bindConstBlob(":data", msg
.getData
)
1060 //auto msg = pop3.getMessage!true(popidx); // full message, with the ending dot, and exact terminators
1062 pop3
.deleteMessage(popidx
);
1066 // ////////////////////////////////////////////////////////////////// //
1068 if (nntpgroup
.length
) CheckNNTP(); else CheckSMTP();
1069 } catch (Exception e
) {
1070 conwriteln("ERROR checking account '", name
, "' (", accid
, "): ", e
.msg
);
1074 conwriteln("done checking account '", name
, "' (", accid
, ")...");
1076 if (vbwin
&& !vbwin
.closed
) {
1077 vbwin
.postEvent(new UpdatingAccountCompleteEvent(accid
));
1078 //sqlite3_sleep(1000);
1080 } catch (Throwable e
) {
1081 // here, we are dead and fucked (the exact order doesn't matter)
1082 //import core.stdc.stdlib : abort;
1083 import core
.stdc
.stdio
: fprintf
, stderr
;
1084 //import core.memory : GC;
1085 import core
.thread
: thread_suspendAll
;
1086 //GC.disable(); // yeah
1087 //thread_suspendAll(); // stop right here, you criminal scum!
1088 auto s
= e
.toString();
1089 fprintf(stderr
, "\n=== FATAL ===\n%.*s\n", cast(uint)s
.length
, s
.ptr
);
1090 //abort(); // die, you bitch!
1091 ownerTid
.send(ControlCommand(ControlCommand
.Kind
.CheckError
, accid
));
1094 //if (vbwin) vbwin.postEvent(evDoConCommands); }
1095 ownerTid
.send(ControlCommand((isError ? ControlCommand
.Kind
.CheckError
: ControlCommand
.Kind
.CheckDone
), accid
));
1099 //==========================================================================
1103 //==========================================================================
1104 void controlThread (Tid ownerTid
) {
1106 bool doQuit
= false;
1108 static struct AccCheck
{
1114 AccCheck
[] accidCheckList
;
1115 accidCheckList
.reserve(128);
1118 CREATE TEMP TABLE IF NOT EXISTS checktimes (
1119 accid INTEGER PRIMARY KEY UNIQUE /* unique, never zero */
1120 , lastcheck INTEGER NOT NULL DEFAULT 0
1121 , checking INTEGER NOT NULL DEFAULT 0
1125 static stmtAllAccs
= LazyStatement
!"Conf"(`
1128 , checktime AS checktime
1130 WHERE nocheck=0 AND inbox<>''
1134 static stmtGetCheckTime
= LazyStatement
!"Conf"(`
1135 SELECT lastcheck AS lastcheck FROM checktimes WHERE accid=:accid LIMIT 1
1140 MonoTime lastCollect
= MonoTime
.currTime
;
1141 //accidCheckList ~= AccCheck();
1143 bool needUpdates
= false;
1144 bool forceAll
= false;
1146 if (doQuit
&& accidCheckList
.length
== 0) break;
1147 receiveTimeout((doQuit ?
50.msecs
: accidCheckList
.length || needUpdates || forceAll ?
1.seconds
: 60.seconds
),
1148 (ControlCommand cmd
) {
1149 final switch (cmd
.type
) {
1150 case ControlCommand
.Kind
.ForceUpdateAll
: forceAll
= true; break;
1151 case ControlCommand
.Kind
.Ping
: break;
1152 case ControlCommand
.Kind
.Quit
: doQuit
= true; break;
1153 case ControlCommand
.Kind
.DisableUpdates
: updatesDisabled
= true; break;
1154 case ControlCommand
.Kind
.EnableUpdates
: updatesDisabled
= false; break;
1155 case ControlCommand
.Kind
.CheckDone
:
1156 case ControlCommand
.Kind
.CheckError
:
1158 if (accidCheckList
.length
) {
1159 foreach (immutable idx
, const ref AccCheck nfo
; accidCheckList
) {
1160 if (nfo
.accid
== cmd
.accid
) {
1161 //if (!doQuit && vbwin && !vbwin.closed) vbwin.postEvent(new UpdatingAccountCompleteEvent(nfo.accid));
1162 foreach (immutable c
; idx
+1..accidCheckList
.length
) accidCheckList
[c
-1] = accidCheckList
[c
];
1163 accidCheckList
.length
-= 1;
1167 if (!doQuit
&& vbwin
&& !vbwin
.closed
&& accidCheckList
.length
== 0) vbwin
.postEvent(new UpdatingCompleteEvent());
1174 for (usize idx
= 0; idx
< accidCheckList
.length
; ) {
1175 if (accidCheckList
[idx
].accid
!= 0) {
1178 foreach (immutable c
; idx
+1..accidCheckList
.length
) accidCheckList
[c
-1] = accidCheckList
[c
];
1179 accidCheckList
.length
-= 1;
1184 for (usize idx
= 0; idx
< accidCheckList
.length
; ) {
1185 if (accidCheckList
[idx
].inprogress
) {
1188 foreach (immutable c
; idx
+1..accidCheckList
.length
) accidCheckList
[c
-1] = accidCheckList
[c
];
1189 accidCheckList
.length
-= 1;
1195 if (!needUpdates
&& !updatesDisabled
) {
1196 ulong ctt
= RunningAverageExp
.GetTickCount();
1197 foreach (auto arow
; stmtAllAccs
.st
.range
) {
1199 foreach (const ref AccCheck nfo
; accidCheckList
) if (nfo
.accid
== arow
.accid
!uint) { found
= true; break; }
1200 if (found
) continue;
1203 accidCheckList
~= AccCheck(arow
.accid
!uint);
1207 int upmins
= arow
.checktime
!int;
1208 if (upmins
< 1) upmins
= 1; else if (upmins
> 100000) upmins
= 100000;
1209 ulong lastcheck
= 0;
1210 foreach (auto crow
; stmtGetCheckTime
.st
.bind(":accid", arow
.accid
!uint).range
) lastcheck
= crow
.lastcheck
!ulong;
1211 lastcheck
+= upmins
*60; // next check time
1212 if (lastcheck
< ctt
) {
1214 accidCheckList
~= AccCheck(arow
.accid
!uint);
1218 conwriteln("check for accid ", arow.accid!uint, " in ", (lastcheck-ctt)/60, " minutes...");
1225 if (!updatesDisabled
) {
1226 foreach (ref AccCheck nfo
; accidCheckList
) {
1227 if (nfo
.inprogress
) break;
1228 if (vbwin
) vbwin
.postEvent(new UpdatingAccountEvent(nfo
.accid
));
1229 nfo
.tid
= spawn(&checkerThread
, thisTid
);
1230 nfo
.inprogress
= true;
1231 nfo
.tid
.send(CheckCommand(nfo
.accid
));
1236 bool hasProgress
= false;
1237 foreach (ref AccCheck nfo
; accidCheckList
) if (nfo
.inprogress
) { hasProgress
= true; break; }
1240 needUpdates
= false;
1244 immutable ctt
= MonoTime
.currTime
;
1245 if ((ctt
-lastCollect
).total
!"minutes" >= 5) {
1246 import core
.memory
: GC
;
1253 ownerTid
.send(ControlReply
.Quit
);
1254 } catch (Throwable e
) {
1255 // here, we are dead and fucked (the exact order doesn't matter)
1256 import core
.stdc
.stdlib
: abort
;
1257 import core
.stdc
.stdio
: fprintf
, stderr
;
1258 import core
.memory
: GC
;
1259 import core
.thread
: thread_suspendAll
;
1260 GC
.disable(); // yeah
1261 thread_suspendAll(); // stop right here, you criminal scum!
1262 auto s
= e
.toString();
1263 fprintf(stderr
, "\n=== FATAL ===\n%.*s\n", cast(uint)s
.length
, s
.ptr
);
1264 abort(); // die, you bitch!
1269 //==========================================================================
1273 //==========================================================================
1274 public void receiverDisable () {
1279 //==========================================================================
1281 // disableMailboxUpdates
1283 //==========================================================================
1284 public void disableMailboxUpdates () {
1285 if (!rcStarted
) return;
1286 controlThreadId
.send(ControlCommand(ControlCommand
.Kind
.DisableUpdates
));
1290 //==========================================================================
1292 // enableMailboxUpdates
1294 //==========================================================================
1295 public void enableMailboxUpdates () {
1296 if (!rcStarted
) return;
1297 controlThreadId
.send(ControlCommand(ControlCommand
.Kind
.EnableUpdates
));
1301 //==========================================================================
1303 // receiverForceUpdateAll
1305 //==========================================================================
1306 public void receiverForceUpdateAll () {
1307 if (!rcStarted
) return;
1308 controlThreadId
.send(ControlCommand(ControlCommand
.Kind
.ForceUpdateAll
));
1312 //==========================================================================
1316 //==========================================================================
1317 public void receiverInit () {
1318 if (rcStarted
) return;
1319 if (rcDisabled
) return;
1320 controlThreadId
= spawn(&controlThread
, thisTid
);
1325 //==========================================================================
1329 //==========================================================================
1330 public void receiverDeinit () {
1331 if (!rcStarted
) return;
1332 controlThreadId
.send(ControlCommand(ControlCommand
.Kind
.Quit
));
1336 (ControlReply reply
) {
1337 if (reply
== ControlReply
.Quit
) done
= true;