2 * coded by Ketmar // Invisible Vector <ketmar@ketmar.no-ip.org>
3 * Understanding is not required. Only obedience.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, version 3 of the License ONLY.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 // mail/nntp receiver thread
18 module receiver
is aliced
;
21 //version = debug_filter_helper;
22 //version = debug_updater;
24 import std
.concurrency
;
30 import iv
.timer
: DurTimer
= Timer
;
37 import chibackend
.net
;
41 static if (__traits(compiles
, () { import extfilter
; })) {
42 static import extfilter
;
43 enum HasExtFilter
= true;
45 enum HasExtFilter
= false;
49 // ////////////////////////////////////////////////////////////////////////// //
50 class RealFilterHelper
: FilterHelper
{
53 // only one of these can be set
54 ActFlagDelete
= 1u<<0,
57 // only one of these can be set
67 DynStr tag
; // destination tag
68 uint actFlags
; // see above
73 ~this () nothrow @nogc { account
.clear(); tag
.clear(); message
.clear(); }
75 final @property bool isDelete () const pure nothrow @safe @nogc { pragma(inline
, true); return !!(actFlags
&ActFlagDelete
); }
76 final @property bool isPurge () const pure nothrow @safe @nogc { pragma(inline
, true); return !!(actFlags
&ActFlagPurge
); }
77 final @property bool isSpam () const pure nothrow @safe @nogc { pragma(inline
, true); return !!(actFlags
&ActFlagSpam
); }
78 final @property bool isHam () const pure nothrow @safe @nogc { pragma(inline
, true); return !!(actFlags
&ActFlagHam
); }
79 final @property bool isRead () const pure nothrow @safe @nogc { pragma(inline
, true); return !!(actFlags
&ActFlagRead
); }
80 final @property bool isStop () const pure nothrow @safe @nogc { pragma(inline
, true); return !!(actFlags
&ActFlagStop
); }
82 // called if a filter was matched
83 override void filterMatched () {
87 override DynStr
getAccount () {
91 override DynStr
getHeaderField (const(char)[] header
, out bool exists
) {
92 auto headers
= message
.getData
[0..findHeadersEnd(message
.getData
)];
93 auto value
= findHeaderField(headers
, header
);
94 exists
= (value
!is null);
95 version(debug_filter_helper
) writeln("...getHeaderField(", header
, "): exists=", exists
, "; res=", value
);
99 override DynStr
getFromName () {
100 auto headers
= message
.getData
[0..findHeadersEnd(message
.getData
)];
101 auto value
= findHeaderField(headers
, "From").extractName
;
102 version(debug_filter_helper
) writeln("...getFromName: res=", value
);
103 return DynStr(value
);
106 override DynStr
getFromMail () {
107 auto headers
= message
.getData
[0..findHeadersEnd(message
.getData
)];
108 auto value
= findHeaderField(headers
, "From").extractMail
;
109 if (value
.length
== 0) value
= "nobody@nowhere";
110 version(debug_filter_helper
) writeln("...getFromMail: res=", value
);
111 return DynStr(value
);
114 override DynStr
getToName () {
115 auto headers
= message
.getData
[0..findHeadersEnd(message
.getData
)];
116 auto value
= findHeaderField(headers
, "To").extractName
;
117 version(debug_filter_helper
) writeln("...getToName: res=", value
);
118 return DynStr(value
);
121 override DynStr
getToMail () {
122 auto headers
= message
.getData
[0..findHeadersEnd(message
.getData
)];
123 auto value
= findHeaderField(headers
, "To").extractMail
;
124 if (value
.length
== 0) value
= "nobody@nowhere";
125 version(debug_filter_helper
) writeln("...getToMail: res=", value
);
126 return DynStr(value
);
129 override DynStr
getSubj (out bool exists
) {
130 auto headers
= message
.getData
[0..findHeadersEnd(message
.getData
)];
131 auto value
= findHeaderField(headers
, "Subject");
132 exists
= (value
!is null);
133 if (exists
) value
= value
.decodeSubj
.subjRemoveRe
;
134 return DynStr(value
);
137 override DynStr
exec (const(char)[] command
) {
138 /*version(debug_filter_helper)*/ writeln("...exec: <", command
, ">");
139 //return DynStr("nothing");
140 import std
.stdio
: File
;
143 // write article to file
145 UUID id
= randomUUID();
147 void deleteTempFile () {
148 if (buf
.length
) try { import std
.file
: remove
; remove(buf
.getData
); } catch (Exception e
) {}
150 scope(exit
) deleteTempFile();
151 buf
.reserve(2+16*2+42);
152 buf
~= "/tmp/_temp_";
153 foreach (immutable ubyte b
; id
.data
[]) {
154 buf
~= "0123456789abcdef"[b
>>4];
155 buf
~= "0123456789abcdef"[b
&0x0f];
159 auto fo
= VFile(buf
.getData
, "w");
160 fo
.rawWriteExact(message
.getData
);
163 //!conwriteln("EXEC filter '", command, "'... (", buf.getData, ")");
164 auto pid
= pipeProcess([command
, /*"-D",*/ buf
.getData
], Redirect
.all
, null, Config
.none
, "/tmp");
165 string action
= pid
.stdout
.readln
.xstrip
;
166 bool doStop
= (action
.length
&& action
[0] == '-');
167 if (doStop
) action
= action
[1..$].xstrip
;
169 while (!pid
.stderr
.eof
) conwriteln(" :", pid
.stderr
.readln
.xstrip
, "|");
172 //!conwriteln("EXEC filter '", command, "' action: ", action, " (", doStop, ")");
173 return DynStr(action
);
174 } catch (Exception e
) {
175 conwriteln("EXEC filter error: ", e
.msg
);
180 override void move (const(char)[] dest
) {
181 version(debug_filter_helper
) writeln("...move: <", dest
, ">");
185 override void performAction (Action action
) {
186 version(debug_filter_helper
) writeln("...performAction: ", action
);
189 actFlags
&= ~(ActFlagDelete|ActFlagPurge
);
190 actFlags |
= ActFlagPurge
;
192 case Action
.SoftDelete
:
193 actFlags
&= ~(ActFlagDelete|ActFlagPurge
);
194 actFlags |
= ActFlagDelete
;
197 actFlags
&= ~(ActFlagSpam|ActFlagHam
);
198 actFlags |
= ActFlagSpam
;
201 actFlags
&= ~(ActFlagSpam|ActFlagHam
);
202 actFlags |
= ActFlagHam
;
205 actFlags |
= ActFlagRead
;
208 actFlags |
= ActFlagStop
;
212 import std
.conv
: to
;
213 throw new FilterSyntaxException("action "~action
.to
!string
~" should not end up in the handler");
218 override bool match (const(char)[] pat
, const(char)[] str, bool casesens
) {
219 version(debug_filter_helper
) writeln("...match: casesens=", casesens
, "; pat=<", pat
, ">; str=<", str, ">");
220 immutable bool bol
= (pat
.length
&& pat
[0] == '^');
221 if (bol
) pat
= pat
[1..$];
222 immutable bool eol
= (pat
.length
&& pat
[$-1] == '$');
223 if (eol
) pat
= pat
[0..$-1];
224 version(debug_filter_helper
) writeln("...match: bol=", bol
, "; eol=", eol
, "; pat=<", pat
, ">");
225 if (pat
.length
== 0) return (bol
&& eol ?
str.length
== 0 : false);
226 if (str.length
< pat
.length
) return false;
227 if (bol
&& eol
) { if (str.length
!= pat
.length
) return false; }
228 else if (bol
) str = str[0..pat
.length
];
229 else if (eol
) str = str[str.length
-pat
.length
..$];
231 return (str.indexOf(pat
) >= 0);
233 while (str.length
>= pat
.length
) {
234 if (str.startsWithCI(pat
)) {
235 //writeln("...match: HIT! str=<", str, ">");
239 //writeln("...match: skip; str=<", str, ">; pat=<", pat, ">");
241 //writeln("...match: FAIL!");
246 void writeResult() () const {
247 if (isDelete
) write(" softdelete");
248 if (isPurge
) write(" purge");
249 if (isSpam
) write(" spam");
250 if (isHam
) write(" ham");
251 if (isRead
) write(" read");
252 write("; dest tag: ", tag
.getData
);
257 // ////////////////////////////////////////////////////////////////////////// //
258 __gshared
bool updatesDisabled
= false;
259 __gshared
bool rcDisabled
= false;
260 __gshared
bool rcStarted
= false;
261 __gshared Tid controlThreadId
;
268 struct ControlCommand
{
281 // for CheckDone or CheckError
285 this (Kind atype
) nothrow @safe @nogc { type
= atype
; accid
= 0; }
286 this (Kind atype
, uint aid
) nothrow @safe @nogc { type
= atype
; accid
= aid
; }
290 struct CheckCommand
{
295 static stmtAccInfo
= LazyStatement
!"Conf"(`
298 , checktime AS checktime
299 , nosendauth AS nosendauth
300 , debuglog AS debuglog
301 , nntplastindex AS nntplastindex
303 , recvserver AS recvserver
304 , sendserver AS sendserver
308 , nntpgroup AS nntpgroup
316 static stmtSetCheckTime
= LazyStatement
!"Conf"(`
317 INSERT INTO checktimes(accid,lastcheck) VALUES(:accid,:lastcheck)
319 DO UPDATE SET lastcheck=:lastcheck
323 //==========================================================================
327 // return `false` from delegate to stop
329 //==========================================================================
330 void forEachTag (const(char)[] tags
, bool delegate (const(char)[] tag
) dg
) {
331 if (dg
is null) return;
333 while (tags
.length
) {
334 auto stp
= tags
.indexOf('|');
335 if (stp
< 0) stp
= cast(uint)tags
.length
;
336 auto tag
= tags
[0..stp
];
337 tags
= tags
[(stp
< tags
.length ? stp
+1 : tags
.length
)..$];
338 if (tag
.length
== 0) continue;
344 //==========================================================================
348 //==========================================================================
349 DynStr
extractAccount (const(char)[] tags
) {
350 auto stp
= tags
.indexOf("account:");
352 if (stp
== 0 || tags
[stp
-1] == '|') {
353 tags
= tags
[stp
+8..$];
354 stp
= tags
.indexOf('|');
355 if (stp
>= 0) tags
= tags
[0..stp
];
363 //==========================================================================
365 // extractFirstFolder
367 // can return empty string
369 //==========================================================================
370 DynStr
extractFirstFolder (const(char)[] tags
) {
372 forEachTag(tags
, (tag
) {
373 if (tag
[0] != '/') return true; // go on
375 return false; // stop
381 //==========================================================================
385 // can return empty tags string
387 //==========================================================================
388 DynStr
removeFirstFolder (const(char)[] tags
) {
390 bool seenFolder
= false;
391 forEachTag(tags
, (tag
) {
392 if (!seenFolder
&& tag
[0] == '/') {
395 if (res
.length
) res
~= "|";
398 return true; // go on
404 // ////////////////////////////////////////////////////////////////////////// //
405 static struct TagInfo
{
412 //==========================================================================
416 //==========================================================================
417 void getMessageTags (ref TagInfo
[] tags
, uint uid
) {
418 auto stGetTags
= LazyStatement
!"View"(`
420 DISTINCT(threads.tagid) AS tagid
423 INNER JOIN tagnames AS tn USING(tagid)
428 foreach (auto row
; stGetTags
.st
.bind(":uid", uid
).range
) {
429 tags
~= TagInfo(row
.tagid
!uint, DynStr(row
.name
!SQ3Text
));
434 //==========================================================================
436 // updateTwittedThreadsInTag
438 //==========================================================================
439 void updateTwittedThreadsInTag (uint tagid
, uint uid
) {
440 auto stTempTbl
= LazyStatement
!"View"(`
441 INSERT INTO mutepairs
442 WITH RECURSIVE children(muid, paruid, mtagid, mmute) AS (
443 SELECT 0, :uid, :tagid, 666
446 tt.uid, tt.uid, mtagid, tt.mute
448 INNER JOIN threads AS tt ON
449 tt.tagid=cc.mtagid AND
450 tt.parent=cc.paruid AND
458 WHERE muid<>0 AND mmute=0
461 auto stFixMutes
= LazyStatement
!"View"(`
465 , appearance=iif(appearance=0,1,appearance)
466 FROM (SELECT muid, mtagid FROM mutepairs) AS cc
467 WHERE uid=cc.muid AND tagid=cc.mtagid AND mute=0
471 dbView
.execute(`DELETE FROM mutepairs;`);
474 .bind(":tagid", tagid
)
477 .bind(":mute", Mute
.ThreadOther
)
482 //==========================================================================
486 //==========================================================================
487 public void createTwitByMsgid (uint uid
, const(char)[] glob
="/dmars_ng/*") {
489 if (glob
.length
== 0) return;
491 auto stGetMsgid
= LazyStatement
!"View"(`
500 foreach (auto row
; stGetMsgid
.st
.bind(":uid", uid
).range
) msgid
= row
.msgid
!SQ3Text
;
501 if (msgid
.length
== 0) return;
503 auto stFindMsgidTwit
= LazyStatement
!"Conf"(`
507 WHERE msgid=:msgid AND tagglob=:glob
511 // check if we already have such twit
512 foreach (auto row
; stFindMsgidTwit
.st
.bindConstText(":msgid", msgid
.getData
).bindConstText(":glob", glob
).range
) return;
515 auto stAddMsgidTwit
= LazyStatement
!"Conf"(`
516 INSERT INTO msgidtwits
517 (etwitid, automatic, tagglob, msgid)
518 VALUES(0, 0,:tagglob,:msgid)
522 .bindConstText(":tagglob", glob
)
523 .bindConstText(":msgid", msgid
.getData
)
528 scope(exit
) delete tags
;
529 getMessageTags(ref tags
, uid
);
530 if (tags
.length
== 0) return; // just in case
534 auto stUpdateMute
= LazyStatement
!"View"(`
539 , appearance=iif(appearance=0,1,appearance)
540 WHERE uid=:uid AND tagid=:tagid AND mute=0
543 // mark the message as twitted
545 foreach (ref TagInfo ti
; tags
) {
548 .bind(":tagid", ti
.tagid
)
549 .bind(":mute", Mute
.ThreadStart
)
551 updateTwittedThreadsInTag(ti
.tagid
, uid
);
557 //==========================================================================
561 //==========================================================================
562 public void twitPrepare () {
564 CREATE TEMP TABLE IF NOT EXISTS mutepairs(
572 //==========================================================================
576 // set "mute" flag according to message filters
578 //==========================================================================
579 public void twitMessage (uint uid
) {
583 scope(exit
) delete tags
;
584 getMessageTags(ref tags
, uid
);
585 if (tags
.length
== 0) return; // just in case
587 auto stUpdateMute
= LazyStatement
!"View"(`
592 , appearance=iif(appearance=0,1,appearance)
593 WHERE uid=:uid AND tagid=:tagid AND mute=0
596 DynStr fromMail
, fromName
;
598 foreach (auto row; dbView.statement(`
600 from_name AS fromName
601 , from_mail AS fromMail
605 ;`).bind(":uid", uid).range)
607 fromMail = row.fromMail!SQ3Text;
608 fromName = row.fromName!SQ3Text;
611 if (!chiroGetMessageFrom(uid
, ref fromMail
, ref fromName
)) return;
614 if (fromMail
.length
!= 0 || fromName
.length
!= 0) {
615 foreach (auto trow
; dbConf
.statement(`
624 auto email
= trow
.email
!SQ3Text
;
625 auto name
= trow
.name
!SQ3Text
;
626 auto glob
= trow
.tagglob
!SQ3Text
;
627 if (glob
.length
== 0 ||
(!email
.length
&& !name
.length
)) continue; // just in case
628 // check for filter match
629 if (email
.length
&& !globmatchCI(fromMail
, email
)) continue;
630 if (name
.length
&& !globmatchCI(fromName
, name
)) continue;
631 auto title
= trow
.title
!SQ3Text
;
633 foreach (ref TagInfo ti
; tags
) {
634 if (ti
.wasUpdates
) continue;
635 if (!globmatch(ti
.name
, glob
)) continue;
638 .bind(":tagid", ti
.tagid
)
639 .bind(":mute", Mute
.ThreadStart
)
640 .bindConstText(":title", (title
.length ? title
: null), allowNull
:true)
642 ti
.wasUpdates
= true;
645 if (ttcount
== tags
.length
) break;
649 // mute it if it is not muted, but should be
650 static auto statFindParentFor
= LazyStatement
!"View"(`
651 SELECT mute AS mute, parent AS parent
653 WHERE tagid=:tagid AND uid=:uid
657 foreach (TagInfo ti
; tags
) {
658 auto mute
= chiroGetMessageMute(ti
.tagid
, uid
);
659 if (mute
> Mute
.Normal
) {
660 ti
.wasUpdates
= true; // just in case
661 if (!ttcount
) ttcount
= 1;
667 .bind(":tagid", ti
.tagid
)
670 foreach (auto prow
; statFindParentFor
.st
.range
) {
671 if (prow
.mute
!int > Mute
.Normal
) {
672 chiroSetMessageMute(ti
.tagid
, uid
, Mute
.ThreadOther
);
673 ti
.wasUpdates
= true; // just in case
674 if (!ttcount
) ttcount
= 1;
676 puid
= prow
.parent
!uint;
683 if (!ttcount
) return;
686 foreach (ref TagInfo ti
; tags
) {
687 if (!ti
.wasUpdates
) continue;
688 updateTwittedThreadsInTag(ti
.tagid
, uid
);
693 //==========================================================================
697 // check for new messages, and update view database
699 //==========================================================================
700 public void updateViewDB (bool skipFilters
=false) {
702 uint maxStoreUid
= 0;
706 foreach (auto row
; dbView
.statement(`SELECT MAX(uid) AS uid FROM info;`).range
) maxViewUid
= row
.uid
!uint;
707 foreach (auto row
; dbStore
.statement(`SELECT MAX(uid) AS uid FROM messages;`).range
) maxStoreUid
= row
.uid
!uint;
709 if (maxViewUid
>= maxStoreUid
) return;
710 conwriteln("need to process around ", maxStoreUid
-maxViewUid
, " messages.");
713 relinkTids
.reserve(64);
714 scope(exit
) delete relinkTids
;
716 foreach (uint uid
; maxViewUid
+1..maxStoreUid
+1) {
718 foreach (auto row
; dbStore
.statement(`
719 SELECT tags AS tags, ChiroUnpack(data) AS data FROM messages WHERE uid=:uid LIMIT 1
720 ;`).bind(":uid", uid
).range
)
722 msg
= row
.data
!SQ3Text
;
723 tags
= row
.tags
!SQ3Text
;
725 if (msg
.length
== 0 || tags
.length
== 0) continue; // not interesting
727 conwriteln("============ message #", uid
, " ============");
729 DynStr acc
= tags
.extractAccount();
730 DynStr origTags
= tags
;
731 RealFilterHelper hlp
;
732 scope(exit
) delete hlp
;
733 bool markSpamHam
= false; //!!!
736 DynStr deftag
= tags
.extractFirstFolder();
737 tags
= tags
.removeFirstFolder();
739 hlp
= new RealFilterHelper
;
742 if (hlp
.tag
.length
== 0) hlp
.tag
= "#hobo";
745 foreach (auto row
; dbConf
.statement(`SELECT filterid AS filterid, name AS name, body AS body FROM filters ORDER BY idx;`).range
) {
746 //conwrite(" filter '", row.name!SQ3Text, "' (", row.filterid!uint, "): ");
750 //version(debug_filter_helper) writeln("::: <", row.body!SQ3Text, ">");
751 goOn
= executeMailFilter(row
.body!SQ3Text
, hlp
);
752 } catch (Exception e
) {
753 conwriteln("ERROR IN FILTER '", row
.name
!SQ3Text
, "': ", e
.msg
);
756 conwriteln("...filter '", row
.name
!SQ3Text
, "' matched!");
758 //hlp.writeResult(); writeln;
759 //version(debug_filter_helper) writeln("::: <", row.body!SQ3Text, ">: goon=", goOn, "; isstop=", hlp.isStop);
760 //assert(hlp.isStop == !goOn);
761 if (hlp
.isStop
) break;
763 //write(" FINAL RESULT:"); hlp.writeResult(); writeln;
766 markSpamHam
= false; //!!!
767 if (!hlp
.isSpam
&& !hlp
.isHam
) {
768 auto bogo
= messageBogoCheck(uid
);
769 if (bogo
== Bogo
.Spam
) {
771 conwriteln("BOGO: SPAM message #", uid
, "; from={", hlp
.getFromName
.getData
, "}:<", hlp
.getFromMail
.getData
, ">; to={",
772 hlp
.getToName
.getData
, "}:<", hlp
.getToMail
.getData
, ">; subj=", hlp
.getSubj(out exists
).getData
);
773 hlp
.performAction(hlp
.Action
.Spam
);
778 if (hlp
.isSpam
) hlp
.tag
= "#spam"; // always
780 if (hlp
.tag
.length
== 0) hlp
.tag
= deftag
; // just in case
782 forEachTag(tags
, (xtag
) {
783 if (xtag
== hlp
.tag
) {
785 return false; // stop
787 return true; // go on
790 // `tags` should contain our new tags
801 // update tags info in the storage
802 if (tags
!= origTags
) {
804 dbStore
.statement(`UPDATE messages SET tags=:tags WHERE uid=:uid;`)
806 .bindConstText(":tags", tags
.getData
)
811 // insert the message into the view db
812 int appearance
= (skipFilters ? Appearance
.Read
: Appearance
.Unread
);
813 if (hlp
!is null && hlp
.isDelete
) appearance
= Appearance
.SoftDeleteFilter
;
814 else if (hlp
!is null && hlp
.isPurge
) appearance
= Appearance
.SoftDeletePurge
;
815 if (hlp
!is null && appearance
== Appearance
.Unread
&& (hlp
.isRead || hlp
.isSpam
)) appearance
= Appearance
.Read
;
816 if (hlp
!is null && markSpamHam
) {
817 if (hlp
.isSpam
) messageBogoMarkSpam(uid
);
818 if (hlp
.isHam
) messageBogoMarkHam(uid
);
823 foreach (auto trow
; dbStore
.statement(`
825 ChiroExtractHeaders(:msgdata) AS headers
826 , ChiroExtractBody(:msgdata) AS body
827 , ChiroHdr_RecvTime(:msgdata) AS msgtime
828 ;`).bindConstText(":msgdata", msg
.getData
).range
)
830 msgtime
= trow
.msgtime
!uint;
831 hdr
= trow
.headers
!SQ3Text
;
832 body = trow
.body!SQ3Text
;
835 conwriteln("putting msg ", uid
, " (time:", msgtime
, "; appr=", appearance
, ") to '", tags
.getData
, "'; oldtags='", origTags
.getData
, "'");
838 //dbView.beginTransaction();
839 //scope(success) dbView.commitTransaction();
840 //scope(failure) dbView.rollbackTransaction();
841 chiroParseAndInsertOneMessage(uid
, msgtime
, appearance
, hdr
, body, tags
);
843 // custom filters, not in the repository
844 static if (HasExtFilter
) {
845 extfilter
.ExtInTrans(hdr
, body, tags
, uid
);
849 foreach (auto mrow
; dbView
.statement(`SELECT msgid AS msgid FROM msgids WHERE uid=:uid LIMIT 1;`).bind(":uid", uid
).range
) {
850 msgid
= mrow
.msgid
!SQ3Text
;
852 //if (msgid.length == 0) return;
853 version(debug_updater
) {
855 auto fo
= VFile("zzz", "a");
856 fo
.writeln("MSGUID: ", uid
, "; MSGID: <", msgid
.getData
, ">");
859 conwriteln("MSGUID: ", uid
, "; MSGID: <", msgid
.getData
, ">");
861 // collect tags to modify
863 scope(exit
) delete taglist
;
866 foreach (auto trow
; dbView
.statement(`SELECT tagid AS tagid, parent AS parent FROM threads WHERE uid=:uid;`)
867 .bind(":uid", uid
).range
)
869 immutable uint tid
= trow
.tagid
!uint;
872 foreach (immutable uint tt
; relinkTids
) if (tt
== tid
) { found
= true; break; }
873 if (!found
) relinkTids
~= tid
;
875 if (!tid || trow
.parent
!uint ||
!chiroIsTagThreaded(tid
)) continue;
876 conwriteln(" tagid: ", tid
, " (", chiroGetTagName(tid
).getData
, ")");
877 version(debug_updater
) {
879 auto fo
= VFile("zzz", "a");
880 fo
.writeln(" tagid: ", tid
, " (", chiroGetTagName(tid
).getData
, ")");
886 foreach (immutable uint tid
; taglist
) {
887 uint setUsAsParentFor
= 0;
888 bool needFullRelink
= false;
889 // check if there are any references to us, and fix them by full relink
890 if (!msgid
.length
) continue;
891 foreach (auto nrow
; dbView
.statement(`
892 SELECT refids.uid AS uid, tt.parent AS parent
894 INNER JOIN(threads) AS tt
895 ON tt.tagid=:tagid AND tt.uid=refids.uid
896 WHERE idx=0 AND msgid=:msgid
898 ;`).bind(":tagid", tid
).bindConstText(":msgid", msgid
.getData
).range
)
900 if (nrow
.parent
!uint == 0) {
901 setUsAsParentFor
= nrow
.uid
!uint;
903 needFullRelink
= true;
907 if (needFullRelink
) {
908 //FIXME: make this faster!
909 conwriteln(" tid: ", tid
, " (", chiroGetTagName(tid
).getData
, "); performing full relink...");
910 chiroSupportRelinkTagThreads(tid
);
914 if (setUsAsParentFor
) {
915 conwriteln(" tid: ", tid
, " (", chiroGetTagName(tid
).getData
, "); settuing us (", uid
, ") as a parent for ", setUsAsParentFor
);
919 WHERE uid=:xuid AND tagid=:tagid
920 ;`).bind(":uid", uid
).bind(":xuid", setUsAsParentFor
).bind(":tagid", tid
).doAll();
923 // find parent for us
925 foreach (auto prow
; dbView
.statement(`
926 SELECT msgids.uid AS paruid
928 INNER JOIN(threads) AS tt
929 ON tt.tagid=:tagid AND tt.uid=msgids.uid
930 WHERE msgids.uid<>:uid AND msgids.msgid IN (SELECT msgid FROM refids WHERE uid=:uid ORDER BY idx)
932 ;`).bind(":uid", uid
).bind(":tagid", tid
).range
)
934 paruid
= prow
.paruid
!uint;
936 conwriteln(" tid: ", tid
, " (", chiroGetTagName(tid
).getData
, "); paruid=", paruid
);
937 if (paruid
&& paruid
!= uid
) {
938 dbView
.statement(`UPDATE threads SET parent=:paruid WHERE uid=:uid AND tagid=:tagid;`)
941 .bind(":paruid", paruid
)
951 if (relinkTids
.length
) {
952 foreach (immutable uint tid
; relinkTids
) {
953 if (vbwin
&& !vbwin
.closed
) vbwin
.postEvent(new TagThreadsUpdatedEvent(tid
));
959 //==========================================================================
963 //==========================================================================
964 void checkerThread (Tid ownerTid
) {
966 bool isError
= false;
975 ownerTid
.send(ControlCommand(ControlCommand
.Kind
.CheckError
, accid
));
993 foreach (auto arow
; stmtAccInfo
.st
.bind(":accid", accid
).range
) {
996 int upmins
= arow
.checktime
!int;
997 if (upmins
< 1) upmins
= 1; else if (upmins
> 100000) upmins
= 100000;
999 nosendauth
= (arow
.nosendauth
!int > 0);
1000 debuglog
= (arow
.debuglog
!int > 0);
1001 nntplastindex
= arow
.nntplastindex
!uint;
1002 name
= arow
.name
!SQ3Text
;
1003 recvserver
= arow
.recvserver
!SQ3Text
;
1004 sendserver
= arow
.sendserver
!SQ3Text
;
1005 user
= arow
.user
!SQ3Text
;
1006 pass
= arow
.pass
!SQ3Text
;
1007 inbox
= arow
.inbox
!SQ3Text
;
1008 nntpgroup
= arow
.nntpgroup
!SQ3Text
;
1009 xemail
= arow
.email
!SQ3Text
;
1013 ownerTid
.send(ControlCommand(ControlCommand
.Kind
.CheckError
, accid
));
1026 foreach (ref ToSend ss
; sendQueue
) { ss
.from
.clear
; ss
.to
.clear
; ss
.data
.clear
; }
1027 sendQueue
.length
= 0;
1030 //FIXME: nntp sends!
1031 if (sendserver
.length
&& (nntpgroup
.length
!= 0 || xemail
.length
!= 0)) {
1032 // check if we have something to send
1033 foreach (auto srow
; dbView
.statement(`
1034 SELECT uid AS uid, from_pop3 AS from_pop3, to_pop3 AS to_pop3, ChiroUnpack(data) AS data
1036 WHERE accid=:accid AND sendtime=0
1037 ;`).bind(":accid", accid
).range
)
1040 ss
.uid
= srow
.uid
!uint;
1041 ss
.from
= srow
.from_pop3
!SQ3Text
;
1042 ss
.to
= srow
.to_pop3
!SQ3Text
;
1043 ss
.data
= srow
.data
!SQ3Text
;
1048 //FIXME: batch send!
1049 if (sendQueue
.length
) {
1050 conwriteln("sending ", sendQueue
.length
, " message", (sendQueue
.length
== 1 ?
"" : "s"));
1051 foreach (ref ToSend ss
; sendQueue
) {
1053 if (nntpgroup
.length
== 0) {
1054 conwriteln("*** [", name
, "]: connecting... (smtp)");
1055 SocketSMTP nsk
= new SocketSMTP(sendserver
.idup
);
1056 scope(exit
) { nsk
.close(); delete nsk
; }
1058 conwriteln("[", name
, "]: authenticating...");
1059 nsk
.auth(xemail
.getData
, user
.getData
, pass
.getData
);
1061 conwriteln("[", name
, "]: sending (uid=", ss
.uid
, ")...");
1062 nsk
.sendMessage(ss
.from
.getData
, ss
.to
.getData
, ss
.data
.getData
);
1064 conwriteln("[", name
, "]: closing...");
1066 conwriteln("*** [", name
, "]: connecting... (nntp)");
1067 SocketNNTP nsk
= new SocketNNTP(recvserver
.idup
);
1068 scope(exit
) { nsk
.close(); delete nsk
; }
1069 conwriteln("[", name
, "]: selecting group (", nntpgroup
, ")");
1070 nsk
.selectGroup(nntpgroup
.getData
);
1071 conwriteln("[", name
, "]: sending (uid=", ss
.uid
, ")...");
1073 nsk
.doSendRaw(ss
.data
.getData
);
1074 conwriteln("[", name
, "]: getting answer...");
1075 auto ln
= nsk
.readLine
;
1076 conwriteln("[", name
, "]: ", ln
); // 340 Ok, recommended message-ID <o7dq4o$mpm$1@digitalmars.com>
1077 if (ln
.length
== 0 || ln
[0] != '3') throw new Exception(ln
.idup
);
1078 conwriteln("[", name
, "]: closing...");
1082 } catch (Exception e
) {
1083 conwriteln("SENDING ERROR: ", e
.msg
);
1087 // mark sent messages
1089 foreach (ref ToSend ss
; sendQueue
) {
1093 SET sendtime=CAST(strftime('%s','now') AS INTEGER), lastsendtime=CAST(strftime('%s','now') AS INTEGER)
1095 ;`).bind(":uid", ss
.uid
).doAll();
1099 SET lastsendtime=CAST(strftime('%s','now') AS INTEGER)
1101 ;`).bind(":uid", ss
.uid
).doAll();
1108 conwriteln("checking account '", name
, "' (", accid
, ")...");
1110 stmtSetCheckTime
.st
.bind(":accid", accid
).bind(":lastcheck", RunningAverageExp
.GetTickCount()+checktime
*60).doAll();
1112 // ////////////////////////////////////////////////////////////////// //
1114 auto nsk
= new SocketNNTP(recvserver
.idup
);
1115 scope(exit
) { nsk
.close(); delete nsk
; }
1117 nsk
.selectGroup(nntpgroup
);
1118 if (nsk
.emptyGroup
) {
1119 conwriteln("[", name
, ":", nntpgroup
, "]: no new articles (empty group)");
1123 uint stnum
= nntplastindex
+1;
1124 if (stnum
> nsk
.hiwater
) {
1125 conwriteln("[", name
, ":", nntpgroup
, "]: no new articles");
1129 conwriteln("[", name
, ":", nntpgroup
, "]: ", nsk
.hiwater
+1-stnum
, " (possible) new articles");
1131 // download new articles
1132 foreach (immutable uint anum
; stnum
..nsk
.hiwater
+1) {
1135 msg
= nsk
.getArticle(anum
);
1136 } catch (Exception e
) {
1137 conwriteln("[", name
, ":", nntpgroup
, "]: error downloading article #", anum
);
1140 if (msg
.length
== 0) continue; // this article is empty
1141 // insert article into the storage
1142 // filtering will be done later, for now, insert with the default inbox
1144 if (inbox
.length
) tags
~= inbox
;
1146 if (tags
.length
) tags
~= "|";
1150 if (tags
.length
== 0) tags
= "#hobo";
1151 conwriteln("[", name
, ":", nntpgroup
, "]: storing article #", anum
, " for '", tags
.getData
, "'...");
1153 dbStore
.statement(`INSERT INTO messages(tags, data) VALUES(:tags, ChiroPack(:data));`)
1154 .bindConstText(":tags", tags
.getData
)
1155 .bindConstBlob(":data", msg
.getData
)
1158 // update account with the new highest nntp index
1160 dbConf
.statement(`UPDATE accounts SET nntplastindex=:anum WHERE accid=:accid;`)
1161 .bind(":accid", accid
)
1162 .bind(":anum", anum
)
1168 // ////////////////////////////////////////////////////////////////// //
1170 conwriteln("*** [", name
, "]: connecting...");
1171 auto pop3
= new SocketPOP3(recvserver
.idup
);
1172 scope(exit
) { pop3
.close(); delete pop3
; }
1174 conwriteln("[", name
, "]: authenticating...");
1175 pop3
.auth(user
, pass
);
1177 auto newmsg
= pop3
.getNewMailCount
;
1179 conwriteln("[", name
, "]: no new messages");
1182 conwriteln("[", name
, "]: ", newmsg
, " new message", (newmsg
> 1 ?
"s" : ""));
1183 foreach (immutable int popidx
; 1..newmsg
+1) {
1186 msg
= pop3
.getMessage(popidx
); // full message, with the ending dot
1187 } catch (Exception e
) {
1188 conwriteln("[", name
, "]: error downloading message #", popidx
);
1191 if (msg
.length
!= 0) {
1193 if (inbox
.length
) tags
~= inbox
;
1195 if (tags
.length
) tags
~= "|";
1199 if (tags
.length
== 0) tags
= "#hobo";
1200 conwriteln("[", name
, ":", nntpgroup
, "]: storing message #", popidx
, " for '", tags
.getData
, "'...");
1202 dbStore
.statement(`INSERT INTO messages(tags, data) VALUES(:tags, ChiroPack(:data));`)
1203 .bindConstText(":tags", tags
.getData
)
1204 .bindConstBlob(":data", msg
.getData
)
1208 //auto msg = pop3.getMessage!true(popidx); // full message, with the ending dot, and exact terminators
1210 pop3
.deleteMessage(popidx
);
1214 // ////////////////////////////////////////////////////////////////// //
1216 if (nntpgroup
.length
) CheckNNTP(); else CheckSMTP();
1217 } catch (Throwable e
) {
1218 conwriteln("ERROR checking account '", name
, "' (", accid
, "): ", e
.msg
);
1222 conwriteln("done checking account '", name
, "' (", accid
, ")...");
1224 if (vbwin
&& !vbwin
.closed
) {
1225 //{ import core.stdc.stdio : fprintf, stderr; fprintf(stderr, "POSTING %s: accid=%u\n", (isError ? "ERROR".ptr : "DONE".ptr), accid); }
1226 vbwin
.postEvent(new UpdatingAccountCompleteEvent(accid
));
1227 //{ import core.stdc.stdio : fprintf, stderr; fprintf(stderr, "POSTED %s: accid=%u\n", (isError ? "ERROR".ptr : "DONE".ptr), accid); }
1228 //sqlite3_sleep(1000);
1230 } catch (Throwable e
) {
1231 // here, we are dead and fucked (the exact order doesn't matter)
1232 //import core.stdc.stdlib : abort;
1233 import core
.stdc
.stdio
: fprintf
, stderr
;
1234 //import core.memory : GC;
1235 import core
.thread
: thread_suspendAll
;
1236 //GC.disable(); // yeah
1237 //thread_suspendAll(); // stop right here, you criminal scum!
1238 auto s
= e
.toString();
1239 fprintf(stderr
, "\n=== FATAL ===\n%.*s\n", cast(uint)s
.length
, s
.ptr
);
1240 //abort(); // die, you bitch!
1241 ownerTid
.send(ControlCommand(ControlCommand
.Kind
.CheckError
, accid
));
1244 //if (vbwin) vbwin.postEvent(evDoConCommands); }
1245 //{ import core.stdc.stdio : fprintf, stderr; fprintf(stderr, "SENDING %s: accid=%u\n", (isError ? "ERROR".ptr : "DONE".ptr), accid); }
1246 ownerTid
.send(ControlCommand((isError ? ControlCommand
.Kind
.CheckError
: ControlCommand
.Kind
.CheckDone
), accid
));
1247 //{ import core.stdc.stdio : fprintf, stderr; fprintf(stderr, "SENDT %s: accid=%u\n", (isError ? "ERROR".ptr : "DONE".ptr), accid); }
1249 import core
.memory
: GC
;
1253 //{ import core.stdc.stdio : fprintf, stderr; fprintf(stderr, "DONE with accid=%u\n", accid); }
1257 //==========================================================================
1261 //==========================================================================
1262 void controlThread (Tid ownerTid
) {
1264 bool doQuit
= false;
1266 static struct AccCheck
{
1272 AccCheck
[] accidCheckList
;
1273 accidCheckList
.reserve(128);
1276 CREATE TEMP TABLE IF NOT EXISTS checktimes (
1277 accid INTEGER PRIMARY KEY UNIQUE /* unique, never zero */
1278 , lastcheck INTEGER NOT NULL DEFAULT 0
1279 , checking INTEGER NOT NULL DEFAULT 0
1283 static stmtAllAccs
= LazyStatement
!"Conf"(`
1286 , checktime AS checktime
1288 WHERE nocheck=0 AND inbox<>''
1292 static stmtGetCheckTime
= LazyStatement
!"Conf"(`
1293 SELECT lastcheck AS lastcheck FROM checktimes WHERE accid=:accid LIMIT 1
1298 MonoTime lastCollect
= MonoTime
.currTime
;
1299 //accidCheckList ~= AccCheck();
1301 bool needUpdates
= false;
1302 bool forceAll
= false;
1304 if (doQuit
&& accidCheckList
.length
== 0) break;
1305 receiveTimeout((doQuit ?
50.msecs
: accidCheckList
.length || needUpdates || forceAll ?
1.seconds
: 60.seconds
),
1306 (ControlCommand cmd
) {
1307 final switch (cmd
.type
) {
1308 case ControlCommand
.Kind
.ForceUpdateAll
: forceAll
= true; break;
1309 case ControlCommand
.Kind
.Ping
: break;
1310 case ControlCommand
.Kind
.Quit
: doQuit
= true; break;
1311 case ControlCommand
.Kind
.DisableUpdates
: updatesDisabled
= true; break;
1312 case ControlCommand
.Kind
.EnableUpdates
: updatesDisabled
= false; break;
1313 case ControlCommand
.Kind
.CheckDone
:
1314 case ControlCommand
.Kind
.CheckError
:
1316 if (accidCheckList
.length
) {
1317 foreach (immutable idx
, const ref AccCheck nfo
; accidCheckList
) {
1318 if (nfo
.accid
== cmd
.accid
) {
1319 //if (!doQuit && vbwin && !vbwin.closed) vbwin.postEvent(new UpdatingAccountCompleteEvent(nfo.accid));
1320 foreach (immutable c
; idx
+1..accidCheckList
.length
) accidCheckList
[c
-1] = accidCheckList
[c
];
1321 accidCheckList
.length
-= 1;
1325 if (!doQuit
&& vbwin
&& !vbwin
.closed
&& accidCheckList
.length
== 0) vbwin
.postEvent(new UpdatingCompleteEvent());
1332 for (usize idx
= 0; idx
< accidCheckList
.length
; ) {
1333 if (accidCheckList
[idx
].accid
!= 0) {
1336 foreach (immutable c
; idx
+1..accidCheckList
.length
) accidCheckList
[c
-1] = accidCheckList
[c
];
1337 accidCheckList
.length
-= 1;
1342 for (usize idx
= 0; idx
< accidCheckList
.length
; ) {
1343 if (accidCheckList
[idx
].inprogress
) {
1346 foreach (immutable c
; idx
+1..accidCheckList
.length
) accidCheckList
[c
-1] = accidCheckList
[c
];
1347 accidCheckList
.length
-= 1;
1353 if (!needUpdates
&& !updatesDisabled
) {
1354 ulong ctt
= RunningAverageExp
.GetTickCount();
1355 foreach (auto arow
; stmtAllAccs
.st
.range
) {
1357 foreach (const ref AccCheck nfo
; accidCheckList
) if (nfo
.accid
== arow
.accid
!uint) { found
= true; break; }
1358 if (found
) continue;
1361 accidCheckList
~= AccCheck(arow
.accid
!uint);
1365 int upmins
= arow
.checktime
!int;
1366 if (upmins
< 1) upmins
= 1; else if (upmins
> 100000) upmins
= 100000;
1367 ulong lastcheck
= 0;
1368 foreach (auto crow
; stmtGetCheckTime
.st
.bind(":accid", arow
.accid
!uint).range
) lastcheck
= crow
.lastcheck
!ulong;
1369 lastcheck
+= upmins
*60; // next check time
1370 if (lastcheck
< ctt
) {
1372 accidCheckList
~= AccCheck(arow
.accid
!uint);
1376 conwriteln("check for accid ", arow.accid!uint, " in ", (lastcheck-ctt)/60, " minutes...");
1383 if (!updatesDisabled
) {
1384 foreach (ref AccCheck nfo
; accidCheckList
) {
1385 if (nfo
.inprogress
) break;
1386 if (vbwin
) vbwin
.postEvent(new UpdatingAccountEvent(nfo
.accid
));
1387 nfo
.tid
= spawn(&checkerThread
, thisTid
);
1388 nfo
.inprogress
= true;
1389 nfo
.tid
.send(CheckCommand(nfo
.accid
));
1394 bool hasProgress
= false;
1395 foreach (ref AccCheck nfo
; accidCheckList
) if (nfo
.inprogress
) { hasProgress
= true; break; }
1398 needUpdates
= false;
1402 immutable ctt
= MonoTime
.currTime
;
1403 if ((ctt
-lastCollect
).total
!"minutes" >= 1) {
1404 import core
.memory
: GC
;
1411 ownerTid
.send(ControlReply
.Quit
);
1412 } catch (Throwable e
) {
1413 // here, we are dead and fucked (the exact order doesn't matter)
1414 import core
.stdc
.stdlib
: abort
;
1415 import core
.stdc
.stdio
: fprintf
, stderr
;
1416 import core
.memory
: GC
;
1417 import core
.thread
: thread_suspendAll
;
1418 GC
.disable(); // yeah
1419 thread_suspendAll(); // stop right here, you criminal scum!
1420 auto s
= e
.toString();
1421 fprintf(stderr
, "\n=== FATAL ===\n%.*s\n", cast(uint)s
.length
, s
.ptr
);
1422 abort(); // die, you bitch!
1427 //==========================================================================
1431 //==========================================================================
1432 public void receiverDisable () {
1437 //==========================================================================
1439 // disableMailboxUpdates
1441 //==========================================================================
1442 public void disableMailboxUpdates () {
1443 if (!rcStarted
) return;
1444 controlThreadId
.send(ControlCommand(ControlCommand
.Kind
.DisableUpdates
));
1448 //==========================================================================
1450 // enableMailboxUpdates
1452 //==========================================================================
1453 public void enableMailboxUpdates () {
1454 if (!rcStarted
) return;
1455 controlThreadId
.send(ControlCommand(ControlCommand
.Kind
.EnableUpdates
));
1459 //==========================================================================
1461 // receiverForceUpdateAll
1463 //==========================================================================
1464 public void receiverForceUpdateAll () {
1465 if (!rcStarted
) return;
1466 controlThreadId
.send(ControlCommand(ControlCommand
.Kind
.ForceUpdateAll
));
1470 //==========================================================================
1474 //==========================================================================
1475 public void receiverInit () {
1476 if (rcStarted
) return;
1477 if (rcDisabled
) return;
1478 controlThreadId
= spawn(&controlThread
, thisTid
);
1483 //==========================================================================
1487 //==========================================================================
1488 public void receiverDeinit () {
1489 if (!rcStarted
) return;
1490 controlThreadId
.send(ControlCommand(ControlCommand
.Kind
.Quit
));
1494 (ControlReply reply
) {
1495 if (reply
== ControlReply
.Quit
) done
= true;