2 * coded by Ketmar // Invisible Vector <ketmar@ketmar.no-ip.org>
3 * Understanding is not required. Only obedience.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, version 3 of the License ONLY.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 // mail/nntp receiver thread
18 module receiver
is aliced
;
21 //version = debug_filter_helper;
22 //version = debug_updater;
24 import std
.concurrency
;
29 import iv
.timer
: DurTimer
= Timer
;
36 import chibackend
.net
;
41 // ////////////////////////////////////////////////////////////////////////// //
42 class RealFilterHelper
: FilterHelper
{
45 // only one of these can be set
46 ActFlagDelete
= 1u<<0,
49 // only one of these can be set
59 DynStr tag
; // destination tag
60 uint actFlags
; // see above
65 ~this () nothrow @nogc { account
.clear(); tag
.clear(); message
.clear(); }
67 final @property bool isDelete () const pure nothrow @safe @nogc { pragma(inline
, true); return !!(actFlags
&ActFlagDelete
); }
68 final @property bool isPurge () const pure nothrow @safe @nogc { pragma(inline
, true); return !!(actFlags
&ActFlagPurge
); }
69 final @property bool isSpam () const pure nothrow @safe @nogc { pragma(inline
, true); return !!(actFlags
&ActFlagSpam
); }
70 final @property bool isHam () const pure nothrow @safe @nogc { pragma(inline
, true); return !!(actFlags
&ActFlagHam
); }
71 final @property bool isRead () const pure nothrow @safe @nogc { pragma(inline
, true); return !!(actFlags
&ActFlagRead
); }
72 final @property bool isStop () const pure nothrow @safe @nogc { pragma(inline
, true); return !!(actFlags
&ActFlagStop
); }
74 // called if a filter was matched
75 override void filterMatched () {
79 override DynStr
getAccount () {
83 override DynStr
getHeaderField (const(char)[] header
, out bool exists
) {
84 auto headers
= message
.getData
[0..findHeadersEnd(message
.getData
)];
85 auto value
= findHeaderField(headers
, header
);
86 exists
= (value
!is null);
87 version(debug_filter_helper
) writeln("...getHeaderField(", header
, "): exists=", exists
, "; res=", value
);
91 override DynStr
getFromName () {
92 auto headers
= message
.getData
[0..findHeadersEnd(message
.getData
)];
93 auto value
= findHeaderField(headers
, "From").extractName
;
94 version(debug_filter_helper
) writeln("...getFromName: res=", value
);
98 override DynStr
getFromMail () {
99 auto headers
= message
.getData
[0..findHeadersEnd(message
.getData
)];
100 auto value
= findHeaderField(headers
, "From").extractMail
;
101 if (value
.length
== 0) value
= "nobody@nowhere";
102 version(debug_filter_helper
) writeln("...getFromMail: res=", value
);
103 return DynStr(value
);
106 override DynStr
getToName () {
107 auto headers
= message
.getData
[0..findHeadersEnd(message
.getData
)];
108 auto value
= findHeaderField(headers
, "To").extractName
;
109 version(debug_filter_helper
) writeln("...getToName: res=", value
);
110 return DynStr(value
);
113 override DynStr
getToMail () {
114 auto headers
= message
.getData
[0..findHeadersEnd(message
.getData
)];
115 auto value
= findHeaderField(headers
, "To").extractMail
;
116 if (value
.length
== 0) value
= "nobody@nowhere";
117 version(debug_filter_helper
) writeln("...getToMail: res=", value
);
118 return DynStr(value
);
121 override DynStr
getSubj (out bool exists
) {
122 auto headers
= message
.getData
[0..findHeadersEnd(message
.getData
)];
123 auto value
= findHeaderField(headers
, "Subject");
124 exists
= (value
!is null);
125 if (exists
) value
= value
.decodeSubj
.subjRemoveRe
;
126 return DynStr(value
);
129 override DynStr
exec (const(char)[] command
) {
130 /*version(debug_filter_helper)*/ writeln("...exec: <", command
, ">");
131 //return DynStr("nothing");
132 import std
.stdio
: File
;
135 // write article to file
137 UUID id
= randomUUID();
139 void deleteTempFile () {
140 if (buf
.length
) try { import std
.file
: remove
; remove(buf
.getData
); } catch (Exception e
) {}
142 scope(exit
) deleteTempFile();
143 buf
.reserve(2+16*2+42);
144 buf
~= "/tmp/_temp_";
145 foreach (immutable ubyte b
; id
.data
[]) {
146 buf
~= "0123456789abcdef"[b
>>4];
147 buf
~= "0123456789abcdef"[b
&0x0f];
151 auto fo
= VFile(buf
.getData
, "w");
152 fo
.rawWriteExact(message
.getData
);
155 //!conwriteln("EXEC filter '", command, "'... (", buf.getData, ")");
156 auto pid
= pipeProcess([command
, /*"-D",*/ buf
.getData
], Redirect
.all
, null, Config
.none
, "/tmp");
157 string action
= pid
.stdout
.readln
.xstrip
;
158 bool doStop
= (action
.length
&& action
[0] == '-');
159 if (doStop
) action
= action
[1..$].xstrip
;
161 while (!pid
.stderr
.eof
) conwriteln(" :", pid
.stderr
.readln
.xstrip
, "|");
164 //!conwriteln("EXEC filter '", command, "' action: ", action, " (", doStop, ")");
165 return DynStr(action
);
166 } catch (Exception e
) {
167 conwriteln("EXEC filter error: ", e
.msg
);
172 override void move (const(char)[] dest
) {
173 version(debug_filter_helper
) writeln("...move: <", dest
, ">");
177 override void performAction (Action action
) {
178 version(debug_filter_helper
) writeln("...performAction: ", action
);
181 actFlags
&= ~(ActFlagDelete|ActFlagPurge
);
182 actFlags |
= ActFlagPurge
;
184 case Action
.SoftDelete
:
185 actFlags
&= ~(ActFlagDelete|ActFlagPurge
);
186 actFlags |
= ActFlagDelete
;
189 actFlags
&= ~(ActFlagSpam|ActFlagHam
);
190 actFlags |
= ActFlagSpam
;
193 actFlags
&= ~(ActFlagSpam|ActFlagHam
);
194 actFlags |
= ActFlagHam
;
197 actFlags |
= ActFlagRead
;
200 actFlags |
= ActFlagStop
;
204 import std
.conv
: to
;
205 throw new FilterSyntaxException("action "~action
.to
!string
~" should not end up in the handler");
210 override bool match (const(char)[] pat
, const(char)[] str, bool casesens
) {
211 version(debug_filter_helper
) writeln("...match: casesens=", casesens
, "; pat=<", pat
, ">; str=<", str, ">");
212 immutable bool bol
= (pat
.length
&& pat
[0] == '^');
213 if (bol
) pat
= pat
[1..$];
214 immutable bool eol
= (pat
.length
&& pat
[$-1] == '$');
215 if (eol
) pat
= pat
[0..$-1];
216 version(debug_filter_helper
) writeln("...match: bol=", bol
, "; eol=", eol
, "; pat=<", pat
, ">");
217 if (pat
.length
== 0) return (bol
&& eol ?
str.length
== 0 : false);
218 if (str.length
< pat
.length
) return false;
219 if (bol
&& eol
) { if (str.length
!= pat
.length
) return false; }
220 else if (bol
) str = str[0..pat
.length
];
221 else if (eol
) str = str[str.length
-pat
.length
..$];
223 return (str.indexOf(pat
) >= 0);
225 while (str.length
>= pat
.length
) {
226 if (str.startsWithCI(pat
)) {
227 //writeln("...match: HIT! str=<", str, ">");
231 //writeln("...match: skip; str=<", str, ">; pat=<", pat, ">");
233 //writeln("...match: FAIL!");
238 void writeResult() () const {
239 if (isDelete
) write(" softdelete");
240 if (isPurge
) write(" purge");
241 if (isSpam
) write(" spam");
242 if (isHam
) write(" ham");
243 if (isRead
) write(" read");
244 write("; dest tag: ", tag
.getData
);
249 // ////////////////////////////////////////////////////////////////////////// //
250 __gshared
bool updatesDisabled
= false;
251 __gshared
bool rcDisabled
= false;
252 __gshared
bool rcStarted
= false;
253 __gshared Tid controlThreadId
;
260 struct ControlCommand
{
273 // for CheckDone or CheckError
277 this (Kind atype
) nothrow @safe @nogc { type
= atype
; accid
= 0; }
278 this (Kind atype
, uint aid
) nothrow @safe @nogc { type
= atype
; accid
= aid
; }
282 struct CheckCommand
{
287 static stmtAccInfo
= LazyStatement
!"Conf"(`
290 , checktime AS checktime
291 , nosendauth AS nosendauth
292 , debuglog AS debuglog
293 , nntplastindex AS nntplastindex
295 , recvserver AS recvserver
296 , sendserver AS sendserver
300 , nntpgroup AS nntpgroup
307 static stmtSetCheckTime
= LazyStatement
!"Conf"(`
308 INSERT INTO checktimes(accid,lastcheck) VALUES(:accid,:lastcheck)
310 DO UPDATE SET lastcheck=:lastcheck
314 //==========================================================================
318 // return `false` from delegate to stop
320 //==========================================================================
321 void forEachTag (const(char)[] tags
, bool delegate (const(char)[] tag
) dg
) {
322 if (dg
is null) return;
324 while (tags
.length
) {
325 auto stp
= tags
.indexOf('|');
326 if (stp
< 0) stp
= cast(uint)tags
.length
;
327 auto tag
= tags
[0..stp
];
328 tags
= tags
[(stp
< tags
.length ? stp
+1 : tags
.length
)..$];
329 if (tag
.length
== 0) continue;
335 //==========================================================================
339 //==========================================================================
340 DynStr
extractAccount (const(char)[] tags
) {
341 auto stp
= tags
.indexOf("account:");
343 if (stp
== 0 || tags
[stp
-1] == '|') {
344 tags
= tags
[stp
+8..$];
345 stp
= tags
.indexOf('|');
346 if (stp
>= 0) tags
= tags
[0..stp
];
354 //==========================================================================
356 // extractFirstFolder
358 // can return empty string
360 //==========================================================================
361 DynStr
extractFirstFolder (const(char)[] tags
) {
363 forEachTag(tags
, (tag
) {
364 if (tag
[0] != '/') return true; // go on
366 return false; // stop
372 //==========================================================================
376 // can return empty tags string
378 //==========================================================================
379 DynStr
removeFirstFolder (const(char)[] tags
) {
381 bool seenFolder
= false;
382 forEachTag(tags
, (tag
) {
383 if (!seenFolder
&& tag
[0] == '/') {
386 if (res
.length
) res
~= "|";
389 return true; // go on
395 // ////////////////////////////////////////////////////////////////////////// //
396 static struct TagInfo
{
403 //==========================================================================
407 //==========================================================================
408 void getMessageTags (ref TagInfo
[] tags
, uint uid
) {
409 auto stGetTags
= LazyStatement
!"View"(`
411 DISTINCT(threads.tagid) AS tagid
414 INNER JOIN tagnames AS tn USING(tagid)
419 foreach (auto row
; stGetTags
.st
.bind(":uid", uid
).range
) {
420 tags
~= TagInfo(row
.tagid
!uint, DynStr(row
.name
!SQ3Text
));
425 //==========================================================================
427 // updateTwittedThreadsInTag
429 //==========================================================================
430 void updateTwittedThreadsInTag (uint tagid
, uint uid
) {
431 auto stTempTbl
= LazyStatement
!"View"(`
432 INSERT INTO mutepairs
433 WITH RECURSIVE children(muid, paruid, mtagid, mmute) AS (
434 SELECT 0, :uid, :tagid, 666
437 tt.uid, tt.uid, mtagid, tt.mute
439 INNER JOIN threads AS tt ON
440 tt.tagid=cc.mtagid AND
441 tt.parent=cc.paruid AND
449 WHERE muid<>0 AND mmute=0
452 auto stFixMutes
= LazyStatement
!"View"(`
456 , appearance=iif(appearance=0,1,appearance)
457 FROM (SELECT muid, mtagid FROM mutepairs) AS cc
458 WHERE uid=cc.muid AND tagid=cc.mtagid AND mute=0
462 dbView
.execute(`DELETE FROM mutepairs;`);
465 .bind(":tagid", tagid
)
468 .bind(":mute", Mute
.ThreadOther
)
473 //==========================================================================
477 //==========================================================================
478 public void createTwitByMsgid (uint uid
, const(char)[] glob
="/dmars_ng/*") {
480 if (glob
.length
== 0) return;
482 auto stGetMsgid
= LazyStatement
!"View"(`
491 foreach (auto row
; stGetMsgid
.st
.bind(":uid", uid
).range
) msgid
= row
.msgid
!SQ3Text
;
492 if (msgid
.length
== 0) return;
494 auto stFindMsgidTwit
= LazyStatement
!"Conf"(`
498 WHERE msgid=:msgid AND tagglob=:glob
502 // check if we already have such twit
503 foreach (auto row
; stFindMsgidTwit
.st
.bindConstText(":msgid", msgid
.getData
).bindConstText(":glob", glob
).range
) return;
506 auto stAddMsgidTwit
= LazyStatement
!"Conf"(`
507 INSERT INTO msgidtwits
508 (etwitid, automatic, tagglob, msgid)
509 VALUES(0, 0,:tagglob,:msgid)
513 .bindConstText(":tagglob", glob
)
514 .bindConstText(":msgid", msgid
.getData
)
519 scope(exit
) delete tags
;
520 getMessageTags(ref tags
, uid
);
521 if (tags
.length
== 0) return; // just in case
525 auto stUpdateMute
= LazyStatement
!"View"(`
530 , appearance=iif(appearance=0,1,appearance)
531 WHERE uid=:uid AND tagid=:tagid AND mute=0
534 // mark the message as twitted
536 foreach (ref TagInfo ti
; tags
) {
539 .bind(":tagid", ti
.tagid
)
540 .bind(":mute", Mute
.ThreadStart
)
542 updateTwittedThreadsInTag(ti
.tagid
, uid
);
548 //==========================================================================
552 //==========================================================================
553 public void twitPrepare () {
555 CREATE TEMP TABLE IF NOT EXISTS mutepairs(
563 //==========================================================================
567 // set "mute" flag according to message filters
569 //==========================================================================
570 public void twitMessage (uint uid
) {
574 scope(exit
) delete tags
;
575 getMessageTags(ref tags
, uid
);
576 if (tags
.length
== 0) return; // just in case
578 auto stUpdateMute
= LazyStatement
!"View"(`
583 , appearance=iif(appearance=0,1,appearance)
584 WHERE uid=:uid AND tagid=:tagid AND mute=0
587 DynStr fromMail
, fromName
;
589 foreach (auto row; dbView.statement(`
591 from_name AS fromName
592 , from_mail AS fromMail
596 ;`).bind(":uid", uid).range)
598 fromMail = row.fromMail!SQ3Text;
599 fromName = row.fromName!SQ3Text;
602 if (!chiroGetMessageFrom(uid
, ref fromMail
, ref fromName
)) return;
605 if (fromMail
.length
!= 0 || fromName
.length
!= 0) {
606 foreach (auto trow
; dbConf
.statement(`
615 auto email
= trow
.email
!SQ3Text
;
616 auto name
= trow
.name
!SQ3Text
;
617 auto glob
= trow
.tagglob
!SQ3Text
;
618 if (glob
.length
== 0 ||
(!email
.length
&& !name
.length
)) continue; // just in case
619 // check for filter match
620 if (email
.length
&& !globmatchCI(fromMail
, email
)) continue;
621 if (name
.length
&& !globmatchCI(fromName
, name
)) continue;
622 auto title
= trow
.title
!SQ3Text
;
624 foreach (ref TagInfo ti
; tags
) {
625 if (ti
.wasUpdates
) continue;
626 if (!globmatch(ti
.name
, glob
)) continue;
629 .bind(":tagid", ti
.tagid
)
630 .bind(":mute", Mute
.ThreadStart
)
631 .bindConstText(":title", (title
.length ? title
: null), allowNull
:true)
633 ti
.wasUpdates
= true;
636 if (ttcount
== tags
.length
) break;
640 // mute it if it is not muted, but should be
641 static auto statFindParentFor
= LazyStatement
!"View"(`
642 SELECT mute AS mute, parent AS parent
644 WHERE tagid=:tagid AND uid=:uid
648 foreach (TagInfo ti
; tags
) {
649 auto mute
= chiroGetMessageMute(ti
.tagid
, uid
);
650 if (mute
> Mute
.Normal
) {
651 ti
.wasUpdates
= true; // just in case
652 if (!ttcount
) ttcount
= 1;
658 .bind(":tagid", ti
.tagid
)
661 foreach (auto prow
; statFindParentFor
.st
.range
) {
662 if (prow
.mute
!int > Mute
.Normal
) {
663 chiroSetMessageMute(ti
.tagid
, uid
, Mute
.ThreadOther
);
664 ti
.wasUpdates
= true; // just in case
665 if (!ttcount
) ttcount
= 1;
667 puid
= prow
.parent
!uint;
674 if (!ttcount
) return;
677 foreach (ref TagInfo ti
; tags
) {
678 if (!ti
.wasUpdates
) continue;
679 updateTwittedThreadsInTag(ti
.tagid
, uid
);
684 //==========================================================================
688 // check for new messages, and update view database
690 //==========================================================================
691 void updateViewDB () {
693 uint maxStoreUid
= 0;
697 foreach (auto row
; dbView
.statement(`SELECT MAX(uid) AS uid FROM info;`).range
) maxViewUid
= row
.uid
!uint;
698 foreach (auto row
; dbStore
.statement(`SELECT MAX(uid) AS uid FROM messages;`).range
) maxStoreUid
= row
.uid
!uint;
700 if (maxViewUid
>= maxStoreUid
) return;
701 conwriteln("need to process around ", maxStoreUid
-maxViewUid
, " messages.");
704 relinkTids
.reserve(64);
705 scope(exit
) delete relinkTids
;
707 foreach (uint uid
; maxViewUid
+1..maxStoreUid
+1) {
708 conwriteln("============ message #", uid
, " ============");
710 foreach (auto row
; dbStore
.statement(`
711 SELECT tags AS tags, ChiroUnpack(data) AS data FROM messages WHERE uid=:uid LIMIT 1
712 ;`).bind(":uid", uid
).range
)
714 msg
= row
.data
!SQ3Text
;
715 tags
= row
.tags
!SQ3Text
;
717 if (msg
.length
== 0 || tags
.length
== 0) continue; // not interesting
719 DynStr acc
= tags
.extractAccount();
720 DynStr origTags
= tags
;
721 DynStr deftag
= tags
.extractFirstFolder();
722 tags
= tags
.removeFirstFolder();
724 auto hlp
= new RealFilterHelper
;
725 scope(exit
) delete hlp
;
728 if (hlp
.tag
.length
== 0) hlp
.tag
= "#hobo";
731 foreach (auto row
; dbConf
.statement(`SELECT filterid AS filterid, name AS name, body AS body FROM filters ORDER BY idx;`).range
) {
732 //conwrite(" filter '", row.name!SQ3Text, "' (", row.filterid!uint, "): ");
736 //version(debug_filter_helper) writeln("::: <", row.body!SQ3Text, ">");
737 goOn
= executeMailFilter(row
.body!SQ3Text
, hlp
);
738 } catch (Exception e
) {
739 conwriteln("ERROR IN FILTER '", row
.name
!SQ3Text
, "': ", e
.msg
);
742 conwriteln("...filter '", row
.name
!SQ3Text
, " matched!");
744 //hlp.writeResult(); writeln;
745 //version(debug_filter_helper) writeln("::: <", row.body!SQ3Text, ">: goon=", goOn, "; isstop=", hlp.isStop);
746 //assert(hlp.isStop == !goOn);
747 if (hlp
.isStop
) break;
749 //write(" FINAL RESULT:"); hlp.writeResult(); writeln;
752 bool markSpamHam
= false; //!!!
753 if (!hlp
.isSpam
&& !hlp
.isHam
) {
754 auto bogo
= messageBogoCheck(uid
);
755 if (bogo
== Bogo
.Spam
) {
757 conwriteln("BOGO: SPAM message #", uid
, "; from={", hlp
.getFromName
.getData
, "}:<", hlp
.getFromMail
.getData
, ">; to={",
758 hlp
.getToName
.getData
, "}:<", hlp
.getToMail
.getData
, ">; subj=", hlp
.getSubj(out exists
).getData
);
759 hlp
.performAction(hlp
.Action
.Spam
);
764 if (hlp
.isSpam
) hlp
.tag
= "#spam"; // always
766 if (hlp
.tag
.length
== 0) hlp
.tag
= deftag
; // just in case
768 forEachTag(tags
, (xtag
) {
769 if (xtag
== hlp
.tag
) {
771 return false; // stop
773 return true; // go on
776 // `tags` should contain our new tags
786 // update tags info in the storage
787 if (tags
!= origTags
) {
789 dbStore
.statement(`UPDATE messages SET tags=:tags WHERE uid=:uid;`)
791 .bindConstText(":tags", tags
.getData
)
796 // insert the message into the view db
797 int appearance
= Appearance
.Unread
;
798 if (hlp
.isDelete
) appearance
= Appearance
.SoftDeleteFilter
;
799 else if (hlp
.isPurge
) appearance
= Appearance
.SoftDeletePurge
;
800 if (appearance
== Appearance
.Unread
&& (hlp
.isRead || hlp
.isSpam
)) appearance
= Appearance
.Read
;
802 if (hlp
.isSpam
) messageBogoMarkSpam(uid
);
803 if (hlp
.isHam
) messageBogoMarkHam(uid
);
808 foreach (auto trow
; dbStore
.statement(`
810 ChiroExtractHeaders(:msgdata) AS headers
811 , ChiroExtractBody(:msgdata) AS body
812 , ChiroHdr_RecvTime(:msgdata) AS msgtime
813 ;`).bindConstText(":msgdata", msg
.getData
).range
)
815 msgtime
= trow
.msgtime
!uint;
816 hdr
= trow
.headers
!SQ3Text
;
817 body = trow
.body!SQ3Text
;
820 conwriteln("putting msg ", uid
, " (time:", msgtime
, "; appr=", appearance
, ") to '", tags
.getData
, "'; oldtags='", origTags
.getData
, "'");
823 //dbView.beginTransaction();
824 //scope(success) dbView.commitTransaction();
825 //scope(failure) dbView.rollbackTransaction();
826 chiroParseAndInsertOneMessage(uid
, msgtime
, appearance
, hdr
, body, tags
);
829 foreach (auto mrow
; dbView
.statement(`SELECT msgid AS msgid FROM msgids WHERE uid=:uid LIMIT 1;`).bind(":uid", uid
).range
) {
830 msgid
= mrow
.msgid
!SQ3Text
;
832 //if (msgid.length == 0) return;
833 version(debug_updater
) {
835 auto fo
= VFile("zzz", "a");
836 fo
.writeln("MSGUID: ", uid
, "; MSGID: <", msgid
.getData
, ">");
839 conwriteln("MSGUID: ", uid
, "; MSGID: <", msgid
.getData
, ">");
841 // collect tags to modify
843 scope(exit
) delete taglist
;
846 foreach (auto trow
; dbView
.statement(`SELECT tagid AS tagid, parent AS parent FROM threads WHERE uid=:uid;`)
847 .bind(":uid", uid
).range
)
849 immutable uint tid
= trow
.tagid
!uint;
852 foreach (immutable uint tt
; relinkTids
) if (tt
== tid
) { found
= true; break; }
853 if (!found
) relinkTids
~= tid
;
855 if (!tid || trow
.parent
!uint ||
!chiroIsTagThreaded(tid
)) continue;
856 conwriteln(" tagid: ", tid
, " (", chiroGetTagName(tid
).getData
, ")");
857 version(debug_updater
) {
859 auto fo
= VFile("zzz", "a");
860 fo
.writeln(" tagid: ", tid
, " (", chiroGetTagName(tid
).getData
, ")");
866 foreach (immutable uint tid
; taglist
) {
867 uint setUsAsParentFor
= 0;
868 bool needFullRelink
= false;
869 // check if there are any references to us, and fix them by full relink
870 if (!msgid
.length
) continue;
871 foreach (auto nrow
; dbView
.statement(`
872 SELECT refids.uid AS uid, tt.parent AS parent
874 INNER JOIN(threads) AS tt
875 ON tt.tagid=:tagid AND tt.uid=refids.uid
876 WHERE idx=0 AND msgid=:msgid
878 ;`).bind(":tagid", tid
).bindConstText(":msgid", msgid
.getData
).range
)
880 if (nrow
.parent
!uint == 0) {
881 setUsAsParentFor
= nrow
.uid
!uint;
883 needFullRelink
= true;
887 if (needFullRelink
) {
888 //FIXME: make this faster!
889 conwriteln(" tid: ", tid
, " (", chiroGetTagName(tid
).getData
, "); performing full relink...");
890 chiroSupportRelinkTagThreads(tid
);
894 if (setUsAsParentFor
) {
895 conwriteln(" tid: ", tid
, " (", chiroGetTagName(tid
).getData
, "); settuing us (", uid
, ") as a parent for ", setUsAsParentFor
);
899 WHERE uid=:xuid AND tagid=:tid
900 ;`).bind(":uid", uid
).bind(":xuid", setUsAsParentFor
).bind(":tagid", tid
).doAll();
903 // find parent for us
905 foreach (auto prow
; dbView
.statement(`
906 SELECT msgids.uid AS paruid
908 INNER JOIN(threads) AS tt
909 ON tt.tagid=:tagid AND tt.uid=msgids.uid
910 WHERE msgids.uid<>:uid AND msgids.msgid IN (SELECT msgid FROM refids WHERE uid=:uid ORDER BY idx)
912 ;`).bind(":uid", uid
).bind(":tagid", tid
).range
)
914 paruid
= prow
.paruid
!uint;
916 conwriteln(" tid: ", tid
, " (", chiroGetTagName(tid
).getData
, "); paruid=", paruid
);
917 if (paruid
&& paruid
!= uid
) {
918 dbView
.statement(`UPDATE threads SET parent=:paruid WHERE uid=:uid AND tagid=:tagid;`)
921 .bind(":paruid", paruid
)
931 if (relinkTids
.length
) {
932 foreach (immutable uint tid
; relinkTids
) {
933 if (vbwin
&& !vbwin
.closed
) vbwin
.postEvent(new TagThreadsUpdatedEvent(tid
));
939 //==========================================================================
943 //==========================================================================
944 void checkerThread (Tid ownerTid
) {
946 bool isError
= false;
955 ownerTid
.send(ControlCommand(ControlCommand
.Kind
.CheckError
, accid
));
972 foreach (auto arow
; stmtAccInfo
.st
.bind(":accid", accid
).range
) {
975 int upmins
= arow
.checktime
!int;
976 if (upmins
< 1) upmins
= 1; else if (upmins
> 100000) upmins
= 100000;
978 nosendauth
= (arow
.nosendauth
!int > 0);
979 debuglog
= (arow
.debuglog
!int > 0);
980 nntplastindex
= arow
.nntplastindex
!uint;
981 name
= arow
.name
!SQ3Text
;
982 recvserver
= arow
.recvserver
!SQ3Text
;
983 sendserver
= arow
.sendserver
!SQ3Text
;
984 user
= arow
.user
!SQ3Text
;
985 pass
= arow
.pass
!SQ3Text
;
986 inbox
= arow
.inbox
!SQ3Text
;
987 nntpgroup
= arow
.nntpgroup
!SQ3Text
;
991 ownerTid
.send(ControlCommand(ControlCommand
.Kind
.CheckError
, accid
));
995 conwriteln("checking account '", name
, "' (", accid
, ")...");
997 stmtSetCheckTime
.st
.bind(":accid", accid
).bind(":lastcheck", RunningAverageExp
.GetTickCount()+checktime
*60).doAll();
999 // ////////////////////////////////////////////////////////////////// //
1001 auto nsk
= new SocketNNTP(recvserver
.idup
);
1002 scope(exit
) nsk
.close();
1004 nsk
.selectGroup(nntpgroup
);
1005 if (nsk
.emptyGroup
) {
1006 conwriteln("[", name
, ":", nntpgroup
, "]: no new articles (empty group)");
1010 uint stnum
= nntplastindex
+1;
1011 if (stnum
> nsk
.hiwater
) {
1012 conwriteln("[", name
, ":", nntpgroup
, "]: no new articles");
1016 conwriteln("[", name
, ":", nntpgroup
, "]: ", nsk
.hiwater
+1-stnum
, " (possible) new articles");
1018 // download new articles
1019 foreach (immutable uint anum
; stnum
..nsk
.hiwater
+1) {
1022 msg
= nsk
.getArticle(anum
);
1023 } catch (Exception e
) {
1024 conwriteln("[", name
, ":", nntpgroup
, "]: error downloading article #", anum
);
1027 if (msg
.length
== 0) continue; // this article is empty
1028 // insert article into the storage
1029 // filtering will be done later, for now, insert with the default inbox
1031 if (inbox
.length
) tags
~= inbox
;
1033 if (tags
.length
) tags
~= "|";
1037 if (tags
.length
== 0) tags
= "#hobo";
1038 conwriteln("[", name
, ":", nntpgroup
, "]: storing article #", anum
, " for '", tags
.getData
, "'...");
1040 dbStore
.statement(`INSERT INTO messages(tags, data) VALUES(:tags, ChiroPack(:data));`)
1041 .bindConstText(":tags", tags
.getData
)
1042 .bindConstBlob(":data", msg
.getData
)
1045 // update account with the new highest nntp index
1047 dbConf
.statement(`UPDATE accounts SET nntplastindex=:anum WHERE accid=:accid;`)
1048 .bind(":accid", accid
)
1049 .bind(":anum", anum
)
1055 // ////////////////////////////////////////////////////////////////// //
1057 conwriteln("*** [", name
, "]: connecting...");
1058 auto pop3
= new SocketPOP3(recvserver
.idup
);
1059 scope(exit
) pop3
.close();
1061 conwriteln("[", name
, "]: authenticating...");
1062 pop3
.auth(user
, pass
);
1064 auto newmsg
= pop3
.getNewMailCount
;
1066 conwriteln("[", name
, "]: no new messages");
1069 conwriteln("[", name
, "]: ", newmsg
, " new message", (newmsg
> 1 ?
"s" : ""));
1070 foreach (immutable int popidx
; 1..newmsg
+1) {
1073 msg
= pop3
.getMessage(popidx
); // full message, with the ending dot
1074 } catch (Exception e
) {
1075 conwriteln("[", name
, "]: error downloading message #", popidx
);
1078 if (msg
.length
!= 0) {
1080 if (inbox
.length
) tags
~= inbox
;
1082 if (tags
.length
) tags
~= "|";
1086 if (tags
.length
== 0) tags
= "#hobo";
1087 conwriteln("[", name
, ":", nntpgroup
, "]: storing message #", popidx
, " for '", tags
.getData
, "'...");
1089 dbStore
.statement(`INSERT INTO messages(tags, data) VALUES(:tags, ChiroPack(:data));`)
1090 .bindConstText(":tags", tags
.getData
)
1091 .bindConstBlob(":data", msg
.getData
)
1095 //auto msg = pop3.getMessage!true(popidx); // full message, with the ending dot, and exact terminators
1097 pop3
.deleteMessage(popidx
);
1101 // ////////////////////////////////////////////////////////////////// //
1103 if (nntpgroup
.length
) CheckNNTP(); else CheckSMTP();
1104 } catch (Throwable e
) {
1105 conwriteln("ERROR checking account '", name
, "' (", accid
, "): ", e
.msg
);
1109 conwriteln("done checking account '", name
, "' (", accid
, ")...");
1111 if (vbwin
&& !vbwin
.closed
) {
1112 //{ import core.stdc.stdio : fprintf, stderr; fprintf(stderr, "POSTING %s: accid=%u\n", (isError ? "ERROR".ptr : "DONE".ptr), accid); }
1113 vbwin
.postEvent(new UpdatingAccountCompleteEvent(accid
));
1114 //{ import core.stdc.stdio : fprintf, stderr; fprintf(stderr, "POSTED %s: accid=%u\n", (isError ? "ERROR".ptr : "DONE".ptr), accid); }
1115 //sqlite3_sleep(1000);
1117 } catch (Throwable e
) {
1118 // here, we are dead and fucked (the exact order doesn't matter)
1119 //import core.stdc.stdlib : abort;
1120 import core
.stdc
.stdio
: fprintf
, stderr
;
1121 //import core.memory : GC;
1122 import core
.thread
: thread_suspendAll
;
1123 //GC.disable(); // yeah
1124 //thread_suspendAll(); // stop right here, you criminal scum!
1125 auto s
= e
.toString();
1126 fprintf(stderr
, "\n=== FATAL ===\n%.*s\n", cast(uint)s
.length
, s
.ptr
);
1127 //abort(); // die, you bitch!
1128 ownerTid
.send(ControlCommand(ControlCommand
.Kind
.CheckError
, accid
));
1131 //if (vbwin) vbwin.postEvent(evDoConCommands); }
1132 //{ import core.stdc.stdio : fprintf, stderr; fprintf(stderr, "SENDING %s: accid=%u\n", (isError ? "ERROR".ptr : "DONE".ptr), accid); }
1133 ownerTid
.send(ControlCommand((isError ? ControlCommand
.Kind
.CheckError
: ControlCommand
.Kind
.CheckDone
), accid
));
1134 //{ import core.stdc.stdio : fprintf, stderr; fprintf(stderr, "SENDT %s: accid=%u\n", (isError ? "ERROR".ptr : "DONE".ptr), accid); }
1136 import core
.memory
: GC
;
1140 //{ import core.stdc.stdio : fprintf, stderr; fprintf(stderr, "DONE with accid=%u\n", accid); }
1144 //==========================================================================
1148 //==========================================================================
1149 void controlThread (Tid ownerTid
) {
1151 bool doQuit
= false;
1153 static struct AccCheck
{
1159 AccCheck
[] accidCheckList
;
1160 accidCheckList
.reserve(128);
1163 CREATE TEMP TABLE IF NOT EXISTS checktimes (
1164 accid INTEGER PRIMARY KEY UNIQUE /* unique, never zero */
1165 , lastcheck INTEGER NOT NULL DEFAULT 0
1166 , checking INTEGER NOT NULL DEFAULT 0
1170 static stmtAllAccs
= LazyStatement
!"Conf"(`
1173 , checktime AS checktime
1175 WHERE nocheck=0 AND inbox<>''
1179 static stmtGetCheckTime
= LazyStatement
!"Conf"(`
1180 SELECT lastcheck AS lastcheck FROM checktimes WHERE accid=:accid LIMIT 1
1185 MonoTime lastCollect
= MonoTime
.currTime
;
1186 //accidCheckList ~= AccCheck();
1188 bool needUpdates
= false;
1189 bool forceAll
= false;
1191 if (doQuit
&& accidCheckList
.length
== 0) break;
1192 receiveTimeout((doQuit ?
50.msecs
: accidCheckList
.length || needUpdates || forceAll ?
1.seconds
: 60.seconds
),
1193 (ControlCommand cmd
) {
1194 final switch (cmd
.type
) {
1195 case ControlCommand
.Kind
.ForceUpdateAll
: forceAll
= true; break;
1196 case ControlCommand
.Kind
.Ping
: break;
1197 case ControlCommand
.Kind
.Quit
: doQuit
= true; break;
1198 case ControlCommand
.Kind
.DisableUpdates
: updatesDisabled
= true; break;
1199 case ControlCommand
.Kind
.EnableUpdates
: updatesDisabled
= false; break;
1200 case ControlCommand
.Kind
.CheckDone
:
1201 case ControlCommand
.Kind
.CheckError
:
1203 if (accidCheckList
.length
) {
1204 foreach (immutable idx
, const ref AccCheck nfo
; accidCheckList
) {
1205 if (nfo
.accid
== cmd
.accid
) {
1206 //if (!doQuit && vbwin && !vbwin.closed) vbwin.postEvent(new UpdatingAccountCompleteEvent(nfo.accid));
1207 foreach (immutable c
; idx
+1..accidCheckList
.length
) accidCheckList
[c
-1] = accidCheckList
[c
];
1208 accidCheckList
.length
-= 1;
1212 if (!doQuit
&& vbwin
&& !vbwin
.closed
&& accidCheckList
.length
== 0) vbwin
.postEvent(new UpdatingCompleteEvent());
1219 for (usize idx
= 0; idx
< accidCheckList
.length
; ) {
1220 if (accidCheckList
[idx
].accid
!= 0) {
1223 foreach (immutable c
; idx
+1..accidCheckList
.length
) accidCheckList
[c
-1] = accidCheckList
[c
];
1224 accidCheckList
.length
-= 1;
1229 for (usize idx
= 0; idx
< accidCheckList
.length
; ) {
1230 if (accidCheckList
[idx
].inprogress
) {
1233 foreach (immutable c
; idx
+1..accidCheckList
.length
) accidCheckList
[c
-1] = accidCheckList
[c
];
1234 accidCheckList
.length
-= 1;
1240 if (!needUpdates
&& !updatesDisabled
) {
1241 ulong ctt
= RunningAverageExp
.GetTickCount();
1242 foreach (auto arow
; stmtAllAccs
.st
.range
) {
1244 foreach (const ref AccCheck nfo
; accidCheckList
) if (nfo
.accid
== arow
.accid
!uint) { found
= true; break; }
1245 if (found
) continue;
1248 accidCheckList
~= AccCheck(arow
.accid
!uint);
1252 int upmins
= arow
.checktime
!int;
1253 if (upmins
< 1) upmins
= 1; else if (upmins
> 100000) upmins
= 100000;
1254 ulong lastcheck
= 0;
1255 foreach (auto crow
; stmtGetCheckTime
.st
.bind(":accid", arow
.accid
!uint).range
) lastcheck
= crow
.lastcheck
!ulong;
1256 lastcheck
+= upmins
*60; // next check time
1257 if (lastcheck
< ctt
) {
1259 accidCheckList
~= AccCheck(arow
.accid
!uint);
1263 conwriteln("check for accid ", arow.accid!uint, " in ", (lastcheck-ctt)/60, " minutes...");
1270 if (!updatesDisabled
) {
1271 foreach (ref AccCheck nfo
; accidCheckList
) {
1272 if (nfo
.inprogress
) break;
1273 if (vbwin
) vbwin
.postEvent(new UpdatingAccountEvent(nfo
.accid
));
1274 nfo
.tid
= spawn(&checkerThread
, thisTid
);
1275 nfo
.inprogress
= true;
1276 nfo
.tid
.send(CheckCommand(nfo
.accid
));
1281 bool hasProgress
= false;
1282 foreach (ref AccCheck nfo
; accidCheckList
) if (nfo
.inprogress
) { hasProgress
= true; break; }
1285 needUpdates
= false;
1289 immutable ctt
= MonoTime
.currTime
;
1290 if ((ctt
-lastCollect
).total
!"minutes" >= 1) {
1291 import core
.memory
: GC
;
1298 ownerTid
.send(ControlReply
.Quit
);
1299 } catch (Throwable e
) {
1300 // here, we are dead and fucked (the exact order doesn't matter)
1301 import core
.stdc
.stdlib
: abort
;
1302 import core
.stdc
.stdio
: fprintf
, stderr
;
1303 import core
.memory
: GC
;
1304 import core
.thread
: thread_suspendAll
;
1305 GC
.disable(); // yeah
1306 thread_suspendAll(); // stop right here, you criminal scum!
1307 auto s
= e
.toString();
1308 fprintf(stderr
, "\n=== FATAL ===\n%.*s\n", cast(uint)s
.length
, s
.ptr
);
1309 abort(); // die, you bitch!
1314 //==========================================================================
1318 //==========================================================================
1319 public void receiverDisable () {
1324 //==========================================================================
1326 // disableMailboxUpdates
1328 //==========================================================================
1329 public void disableMailboxUpdates () {
1330 if (!rcStarted
) return;
1331 controlThreadId
.send(ControlCommand(ControlCommand
.Kind
.DisableUpdates
));
1335 //==========================================================================
1337 // enableMailboxUpdates
1339 //==========================================================================
1340 public void enableMailboxUpdates () {
1341 if (!rcStarted
) return;
1342 controlThreadId
.send(ControlCommand(ControlCommand
.Kind
.EnableUpdates
));
1346 //==========================================================================
1348 // receiverForceUpdateAll
1350 //==========================================================================
1351 public void receiverForceUpdateAll () {
1352 if (!rcStarted
) return;
1353 controlThreadId
.send(ControlCommand(ControlCommand
.Kind
.ForceUpdateAll
));
1357 //==========================================================================
1361 //==========================================================================
1362 public void receiverInit () {
1363 if (rcStarted
) return;
1364 if (rcDisabled
) return;
1365 controlThreadId
= spawn(&controlThread
, thisTid
);
1370 //==========================================================================
1374 //==========================================================================
1375 public void receiverDeinit () {
1376 if (!rcStarted
) return;
1377 controlThreadId
.send(ControlCommand(ControlCommand
.Kind
.Quit
));
1381 (ControlReply reply
) {
1382 if (reply
== ControlReply
.Quit
) done
= true;