2 * coded by Ketmar // Invisible Vector <ketmar@ketmar.no-ip.org>
3 * Understanding is not required. Only obedience.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, version 3 of the License ONLY.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 // mail/nntp receiver thread
18 module receiver
is aliced
;
21 //version = debug_filter_helper;
22 //version = debug_updater;
24 import std
.concurrency
;
30 import iv
.timer
: DurTimer
= Timer
;
37 import chibackend
.net
;
42 // ////////////////////////////////////////////////////////////////////////// //
43 class RealFilterHelper
: FilterHelper
{
46 // only one of these can be set
47 ActFlagDelete
= 1u<<0,
50 // only one of these can be set
60 DynStr tag
; // destination tag
61 uint actFlags
; // see above
66 ~this () nothrow @nogc { account
.clear(); tag
.clear(); message
.clear(); }
68 final @property bool isDelete () const pure nothrow @safe @nogc { pragma(inline
, true); return !!(actFlags
&ActFlagDelete
); }
69 final @property bool isPurge () const pure nothrow @safe @nogc { pragma(inline
, true); return !!(actFlags
&ActFlagPurge
); }
70 final @property bool isSpam () const pure nothrow @safe @nogc { pragma(inline
, true); return !!(actFlags
&ActFlagSpam
); }
71 final @property bool isHam () const pure nothrow @safe @nogc { pragma(inline
, true); return !!(actFlags
&ActFlagHam
); }
72 final @property bool isRead () const pure nothrow @safe @nogc { pragma(inline
, true); return !!(actFlags
&ActFlagRead
); }
73 final @property bool isStop () const pure nothrow @safe @nogc { pragma(inline
, true); return !!(actFlags
&ActFlagStop
); }
75 // called if a filter was matched
76 override void filterMatched () {
80 override DynStr
getAccount () {
84 override DynStr
getHeaderField (const(char)[] header
, out bool exists
) {
85 auto headers
= message
.getData
[0..findHeadersEnd(message
.getData
)];
86 auto value
= findHeaderField(headers
, header
);
87 exists
= (value
!is null);
88 version(debug_filter_helper
) writeln("...getHeaderField(", header
, "): exists=", exists
, "; res=", value
);
92 override DynStr
getFromName () {
93 auto headers
= message
.getData
[0..findHeadersEnd(message
.getData
)];
94 auto value
= findHeaderField(headers
, "From").extractName
;
95 version(debug_filter_helper
) writeln("...getFromName: res=", value
);
99 override DynStr
getFromMail () {
100 auto headers
= message
.getData
[0..findHeadersEnd(message
.getData
)];
101 auto value
= findHeaderField(headers
, "From").extractMail
;
102 if (value
.length
== 0) value
= "nobody@nowhere";
103 version(debug_filter_helper
) writeln("...getFromMail: res=", value
);
104 return DynStr(value
);
107 override DynStr
getToName () {
108 auto headers
= message
.getData
[0..findHeadersEnd(message
.getData
)];
109 auto value
= findHeaderField(headers
, "To").extractName
;
110 version(debug_filter_helper
) writeln("...getToName: res=", value
);
111 return DynStr(value
);
114 override DynStr
getToMail () {
115 auto headers
= message
.getData
[0..findHeadersEnd(message
.getData
)];
116 auto value
= findHeaderField(headers
, "To").extractMail
;
117 if (value
.length
== 0) value
= "nobody@nowhere";
118 version(debug_filter_helper
) writeln("...getToMail: res=", value
);
119 return DynStr(value
);
122 override DynStr
getSubj (out bool exists
) {
123 auto headers
= message
.getData
[0..findHeadersEnd(message
.getData
)];
124 auto value
= findHeaderField(headers
, "Subject");
125 exists
= (value
!is null);
126 if (exists
) value
= value
.decodeSubj
.subjRemoveRe
;
127 return DynStr(value
);
130 override DynStr
exec (const(char)[] command
) {
131 /*version(debug_filter_helper)*/ writeln("...exec: <", command
, ">");
132 //return DynStr("nothing");
133 import std
.stdio
: File
;
136 // write article to file
138 UUID id
= randomUUID();
140 void deleteTempFile () {
141 if (buf
.length
) try { import std
.file
: remove
; remove(buf
.getData
); } catch (Exception e
) {}
143 scope(exit
) deleteTempFile();
144 buf
.reserve(2+16*2+42);
145 buf
~= "/tmp/_temp_";
146 foreach (immutable ubyte b
; id
.data
[]) {
147 buf
~= "0123456789abcdef"[b
>>4];
148 buf
~= "0123456789abcdef"[b
&0x0f];
152 auto fo
= VFile(buf
.getData
, "w");
153 fo
.rawWriteExact(message
.getData
);
156 //!conwriteln("EXEC filter '", command, "'... (", buf.getData, ")");
157 auto pid
= pipeProcess([command
, /*"-D",*/ buf
.getData
], Redirect
.all
, null, Config
.none
, "/tmp");
158 string action
= pid
.stdout
.readln
.xstrip
;
159 bool doStop
= (action
.length
&& action
[0] == '-');
160 if (doStop
) action
= action
[1..$].xstrip
;
162 while (!pid
.stderr
.eof
) conwriteln(" :", pid
.stderr
.readln
.xstrip
, "|");
165 //!conwriteln("EXEC filter '", command, "' action: ", action, " (", doStop, ")");
166 return DynStr(action
);
167 } catch (Exception e
) {
168 conwriteln("EXEC filter error: ", e
.msg
);
173 override void move (const(char)[] dest
) {
174 version(debug_filter_helper
) writeln("...move: <", dest
, ">");
178 override void performAction (Action action
) {
179 version(debug_filter_helper
) writeln("...performAction: ", action
);
182 actFlags
&= ~(ActFlagDelete|ActFlagPurge
);
183 actFlags |
= ActFlagPurge
;
185 case Action
.SoftDelete
:
186 actFlags
&= ~(ActFlagDelete|ActFlagPurge
);
187 actFlags |
= ActFlagDelete
;
190 actFlags
&= ~(ActFlagSpam|ActFlagHam
);
191 actFlags |
= ActFlagSpam
;
194 actFlags
&= ~(ActFlagSpam|ActFlagHam
);
195 actFlags |
= ActFlagHam
;
198 actFlags |
= ActFlagRead
;
201 actFlags |
= ActFlagStop
;
205 import std
.conv
: to
;
206 throw new FilterSyntaxException("action "~action
.to
!string
~" should not end up in the handler");
211 override bool match (const(char)[] pat
, const(char)[] str, bool casesens
) {
212 version(debug_filter_helper
) writeln("...match: casesens=", casesens
, "; pat=<", pat
, ">; str=<", str, ">");
213 immutable bool bol
= (pat
.length
&& pat
[0] == '^');
214 if (bol
) pat
= pat
[1..$];
215 immutable bool eol
= (pat
.length
&& pat
[$-1] == '$');
216 if (eol
) pat
= pat
[0..$-1];
217 version(debug_filter_helper
) writeln("...match: bol=", bol
, "; eol=", eol
, "; pat=<", pat
, ">");
218 if (pat
.length
== 0) return (bol
&& eol ?
str.length
== 0 : false);
219 if (str.length
< pat
.length
) return false;
220 if (bol
&& eol
) { if (str.length
!= pat
.length
) return false; }
221 else if (bol
) str = str[0..pat
.length
];
222 else if (eol
) str = str[str.length
-pat
.length
..$];
224 return (str.indexOf(pat
) >= 0);
226 while (str.length
>= pat
.length
) {
227 if (str.startsWithCI(pat
)) {
228 //writeln("...match: HIT! str=<", str, ">");
232 //writeln("...match: skip; str=<", str, ">; pat=<", pat, ">");
234 //writeln("...match: FAIL!");
239 void writeResult() () const {
240 if (isDelete
) write(" softdelete");
241 if (isPurge
) write(" purge");
242 if (isSpam
) write(" spam");
243 if (isHam
) write(" ham");
244 if (isRead
) write(" read");
245 write("; dest tag: ", tag
.getData
);
250 // ////////////////////////////////////////////////////////////////////////// //
251 __gshared
bool updatesDisabled
= false;
252 __gshared
bool rcDisabled
= false;
253 __gshared
bool rcStarted
= false;
254 __gshared Tid controlThreadId
;
261 struct ControlCommand
{
274 // for CheckDone or CheckError
278 this (Kind atype
) nothrow @safe @nogc { type
= atype
; accid
= 0; }
279 this (Kind atype
, uint aid
) nothrow @safe @nogc { type
= atype
; accid
= aid
; }
283 struct CheckCommand
{
288 static stmtAccInfo
= LazyStatement
!"Conf"(`
291 , checktime AS checktime
292 , nosendauth AS nosendauth
293 , debuglog AS debuglog
294 , nntplastindex AS nntplastindex
296 , recvserver AS recvserver
297 , sendserver AS sendserver
301 , nntpgroup AS nntpgroup
309 static stmtSetCheckTime
= LazyStatement
!"Conf"(`
310 INSERT INTO checktimes(accid,lastcheck) VALUES(:accid,:lastcheck)
312 DO UPDATE SET lastcheck=:lastcheck
316 //==========================================================================
320 // return `false` from delegate to stop
322 //==========================================================================
323 void forEachTag (const(char)[] tags
, bool delegate (const(char)[] tag
) dg
) {
324 if (dg
is null) return;
326 while (tags
.length
) {
327 auto stp
= tags
.indexOf('|');
328 if (stp
< 0) stp
= cast(uint)tags
.length
;
329 auto tag
= tags
[0..stp
];
330 tags
= tags
[(stp
< tags
.length ? stp
+1 : tags
.length
)..$];
331 if (tag
.length
== 0) continue;
337 //==========================================================================
341 //==========================================================================
342 DynStr
extractAccount (const(char)[] tags
) {
343 auto stp
= tags
.indexOf("account:");
345 if (stp
== 0 || tags
[stp
-1] == '|') {
346 tags
= tags
[stp
+8..$];
347 stp
= tags
.indexOf('|');
348 if (stp
>= 0) tags
= tags
[0..stp
];
356 //==========================================================================
358 // extractFirstFolder
360 // can return empty string
362 //==========================================================================
363 DynStr
extractFirstFolder (const(char)[] tags
) {
365 forEachTag(tags
, (tag
) {
366 if (tag
[0] != '/') return true; // go on
368 return false; // stop
374 //==========================================================================
378 // can return empty tags string
380 //==========================================================================
381 DynStr
removeFirstFolder (const(char)[] tags
) {
383 bool seenFolder
= false;
384 forEachTag(tags
, (tag
) {
385 if (!seenFolder
&& tag
[0] == '/') {
388 if (res
.length
) res
~= "|";
391 return true; // go on
397 // ////////////////////////////////////////////////////////////////////////// //
398 static struct TagInfo
{
405 //==========================================================================
409 //==========================================================================
410 void getMessageTags (ref TagInfo
[] tags
, uint uid
) {
411 auto stGetTags
= LazyStatement
!"View"(`
413 DISTINCT(threads.tagid) AS tagid
416 INNER JOIN tagnames AS tn USING(tagid)
421 foreach (auto row
; stGetTags
.st
.bind(":uid", uid
).range
) {
422 tags
~= TagInfo(row
.tagid
!uint, DynStr(row
.name
!SQ3Text
));
427 //==========================================================================
429 // updateTwittedThreadsInTag
431 //==========================================================================
432 void updateTwittedThreadsInTag (uint tagid
, uint uid
) {
433 auto stTempTbl
= LazyStatement
!"View"(`
434 INSERT INTO mutepairs
435 WITH RECURSIVE children(muid, paruid, mtagid, mmute) AS (
436 SELECT 0, :uid, :tagid, 666
439 tt.uid, tt.uid, mtagid, tt.mute
441 INNER JOIN threads AS tt ON
442 tt.tagid=cc.mtagid AND
443 tt.parent=cc.paruid AND
451 WHERE muid<>0 AND mmute=0
454 auto stFixMutes
= LazyStatement
!"View"(`
458 , appearance=iif(appearance=0,1,appearance)
459 FROM (SELECT muid, mtagid FROM mutepairs) AS cc
460 WHERE uid=cc.muid AND tagid=cc.mtagid AND mute=0
464 dbView
.execute(`DELETE FROM mutepairs;`);
467 .bind(":tagid", tagid
)
470 .bind(":mute", Mute
.ThreadOther
)
475 //==========================================================================
479 //==========================================================================
480 public void createTwitByMsgid (uint uid
, const(char)[] glob
="/dmars_ng/*") {
482 if (glob
.length
== 0) return;
484 auto stGetMsgid
= LazyStatement
!"View"(`
493 foreach (auto row
; stGetMsgid
.st
.bind(":uid", uid
).range
) msgid
= row
.msgid
!SQ3Text
;
494 if (msgid
.length
== 0) return;
496 auto stFindMsgidTwit
= LazyStatement
!"Conf"(`
500 WHERE msgid=:msgid AND tagglob=:glob
504 // check if we already have such twit
505 foreach (auto row
; stFindMsgidTwit
.st
.bindConstText(":msgid", msgid
.getData
).bindConstText(":glob", glob
).range
) return;
508 auto stAddMsgidTwit
= LazyStatement
!"Conf"(`
509 INSERT INTO msgidtwits
510 (etwitid, automatic, tagglob, msgid)
511 VALUES(0, 0,:tagglob,:msgid)
515 .bindConstText(":tagglob", glob
)
516 .bindConstText(":msgid", msgid
.getData
)
521 scope(exit
) delete tags
;
522 getMessageTags(ref tags
, uid
);
523 if (tags
.length
== 0) return; // just in case
527 auto stUpdateMute
= LazyStatement
!"View"(`
532 , appearance=iif(appearance=0,1,appearance)
533 WHERE uid=:uid AND tagid=:tagid AND mute=0
536 // mark the message as twitted
538 foreach (ref TagInfo ti
; tags
) {
541 .bind(":tagid", ti
.tagid
)
542 .bind(":mute", Mute
.ThreadStart
)
544 updateTwittedThreadsInTag(ti
.tagid
, uid
);
550 //==========================================================================
554 //==========================================================================
555 public void twitPrepare () {
557 CREATE TEMP TABLE IF NOT EXISTS mutepairs(
565 //==========================================================================
569 // set "mute" flag according to message filters
571 //==========================================================================
572 public void twitMessage (uint uid
) {
576 scope(exit
) delete tags
;
577 getMessageTags(ref tags
, uid
);
578 if (tags
.length
== 0) return; // just in case
580 auto stUpdateMute
= LazyStatement
!"View"(`
585 , appearance=iif(appearance=0,1,appearance)
586 WHERE uid=:uid AND tagid=:tagid AND mute=0
589 DynStr fromMail
, fromName
;
591 foreach (auto row; dbView.statement(`
593 from_name AS fromName
594 , from_mail AS fromMail
598 ;`).bind(":uid", uid).range)
600 fromMail = row.fromMail!SQ3Text;
601 fromName = row.fromName!SQ3Text;
604 if (!chiroGetMessageFrom(uid
, ref fromMail
, ref fromName
)) return;
607 if (fromMail
.length
!= 0 || fromName
.length
!= 0) {
608 foreach (auto trow
; dbConf
.statement(`
617 auto email
= trow
.email
!SQ3Text
;
618 auto name
= trow
.name
!SQ3Text
;
619 auto glob
= trow
.tagglob
!SQ3Text
;
620 if (glob
.length
== 0 ||
(!email
.length
&& !name
.length
)) continue; // just in case
621 // check for filter match
622 if (email
.length
&& !globmatchCI(fromMail
, email
)) continue;
623 if (name
.length
&& !globmatchCI(fromName
, name
)) continue;
624 auto title
= trow
.title
!SQ3Text
;
626 foreach (ref TagInfo ti
; tags
) {
627 if (ti
.wasUpdates
) continue;
628 if (!globmatch(ti
.name
, glob
)) continue;
631 .bind(":tagid", ti
.tagid
)
632 .bind(":mute", Mute
.ThreadStart
)
633 .bindConstText(":title", (title
.length ? title
: null), allowNull
:true)
635 ti
.wasUpdates
= true;
638 if (ttcount
== tags
.length
) break;
642 // mute it if it is not muted, but should be
643 static auto statFindParentFor
= LazyStatement
!"View"(`
644 SELECT mute AS mute, parent AS parent
646 WHERE tagid=:tagid AND uid=:uid
650 foreach (TagInfo ti
; tags
) {
651 auto mute
= chiroGetMessageMute(ti
.tagid
, uid
);
652 if (mute
> Mute
.Normal
) {
653 ti
.wasUpdates
= true; // just in case
654 if (!ttcount
) ttcount
= 1;
660 .bind(":tagid", ti
.tagid
)
663 foreach (auto prow
; statFindParentFor
.st
.range
) {
664 if (prow
.mute
!int > Mute
.Normal
) {
665 chiroSetMessageMute(ti
.tagid
, uid
, Mute
.ThreadOther
);
666 ti
.wasUpdates
= true; // just in case
667 if (!ttcount
) ttcount
= 1;
669 puid
= prow
.parent
!uint;
676 if (!ttcount
) return;
679 foreach (ref TagInfo ti
; tags
) {
680 if (!ti
.wasUpdates
) continue;
681 updateTwittedThreadsInTag(ti
.tagid
, uid
);
686 //==========================================================================
690 // check for new messages, and update view database
692 //==========================================================================
693 public void updateViewDB (bool skipFilters
=false) {
695 uint maxStoreUid
= 0;
699 foreach (auto row
; dbView
.statement(`SELECT MAX(uid) AS uid FROM info;`).range
) maxViewUid
= row
.uid
!uint;
700 foreach (auto row
; dbStore
.statement(`SELECT MAX(uid) AS uid FROM messages;`).range
) maxStoreUid
= row
.uid
!uint;
702 if (maxViewUid
>= maxStoreUid
) return;
703 conwriteln("need to process around ", maxStoreUid
-maxViewUid
, " messages.");
706 relinkTids
.reserve(64);
707 scope(exit
) delete relinkTids
;
709 foreach (uint uid
; maxViewUid
+1..maxStoreUid
+1) {
711 foreach (auto row
; dbStore
.statement(`
712 SELECT tags AS tags, ChiroUnpack(data) AS data FROM messages WHERE uid=:uid LIMIT 1
713 ;`).bind(":uid", uid
).range
)
715 msg
= row
.data
!SQ3Text
;
716 tags
= row
.tags
!SQ3Text
;
718 if (msg
.length
== 0 || tags
.length
== 0) continue; // not interesting
720 conwriteln("============ message #", uid
, " ============");
722 DynStr acc
= tags
.extractAccount();
723 DynStr origTags
= tags
;
724 RealFilterHelper hlp
;
725 scope(exit
) delete hlp
;
726 bool markSpamHam
= false; //!!!
729 DynStr deftag
= tags
.extractFirstFolder();
730 tags
= tags
.removeFirstFolder();
732 hlp
= new RealFilterHelper
;
735 if (hlp
.tag
.length
== 0) hlp
.tag
= "#hobo";
738 foreach (auto row
; dbConf
.statement(`SELECT filterid AS filterid, name AS name, body AS body FROM filters ORDER BY idx;`).range
) {
739 //conwrite(" filter '", row.name!SQ3Text, "' (", row.filterid!uint, "): ");
743 //version(debug_filter_helper) writeln("::: <", row.body!SQ3Text, ">");
744 goOn
= executeMailFilter(row
.body!SQ3Text
, hlp
);
745 } catch (Exception e
) {
746 conwriteln("ERROR IN FILTER '", row
.name
!SQ3Text
, "': ", e
.msg
);
749 conwriteln("...filter '", row
.name
!SQ3Text
, " matched!");
751 //hlp.writeResult(); writeln;
752 //version(debug_filter_helper) writeln("::: <", row.body!SQ3Text, ">: goon=", goOn, "; isstop=", hlp.isStop);
753 //assert(hlp.isStop == !goOn);
754 if (hlp
.isStop
) break;
756 //write(" FINAL RESULT:"); hlp.writeResult(); writeln;
759 markSpamHam
= false; //!!!
760 if (!hlp
.isSpam
&& !hlp
.isHam
) {
761 auto bogo
= messageBogoCheck(uid
);
762 if (bogo
== Bogo
.Spam
) {
764 conwriteln("BOGO: SPAM message #", uid
, "; from={", hlp
.getFromName
.getData
, "}:<", hlp
.getFromMail
.getData
, ">; to={",
765 hlp
.getToName
.getData
, "}:<", hlp
.getToMail
.getData
, ">; subj=", hlp
.getSubj(out exists
).getData
);
766 hlp
.performAction(hlp
.Action
.Spam
);
771 if (hlp
.isSpam
) hlp
.tag
= "#spam"; // always
773 if (hlp
.tag
.length
== 0) hlp
.tag
= deftag
; // just in case
775 forEachTag(tags
, (xtag
) {
776 if (xtag
== hlp
.tag
) {
778 return false; // stop
780 return true; // go on
783 // `tags` should contain our new tags
794 // update tags info in the storage
795 if (tags
!= origTags
) {
797 dbStore
.statement(`UPDATE messages SET tags=:tags WHERE uid=:uid;`)
799 .bindConstText(":tags", tags
.getData
)
804 // insert the message into the view db
805 int appearance
= (skipFilters ? Appearance
.Read
: Appearance
.Unread
);
806 if (hlp
!is null && hlp
.isDelete
) appearance
= Appearance
.SoftDeleteFilter
;
807 else if (hlp
!is null && hlp
.isPurge
) appearance
= Appearance
.SoftDeletePurge
;
808 if (hlp
!is null && appearance
== Appearance
.Unread
&& (hlp
.isRead || hlp
.isSpam
)) appearance
= Appearance
.Read
;
809 if (hlp
!is null && markSpamHam
) {
810 if (hlp
.isSpam
) messageBogoMarkSpam(uid
);
811 if (hlp
.isHam
) messageBogoMarkHam(uid
);
816 foreach (auto trow
; dbStore
.statement(`
818 ChiroExtractHeaders(:msgdata) AS headers
819 , ChiroExtractBody(:msgdata) AS body
820 , ChiroHdr_RecvTime(:msgdata) AS msgtime
821 ;`).bindConstText(":msgdata", msg
.getData
).range
)
823 msgtime
= trow
.msgtime
!uint;
824 hdr
= trow
.headers
!SQ3Text
;
825 body = trow
.body!SQ3Text
;
828 conwriteln("putting msg ", uid
, " (time:", msgtime
, "; appr=", appearance
, ") to '", tags
.getData
, "'; oldtags='", origTags
.getData
, "'");
831 //dbView.beginTransaction();
832 //scope(success) dbView.commitTransaction();
833 //scope(failure) dbView.rollbackTransaction();
834 chiroParseAndInsertOneMessage(uid
, msgtime
, appearance
, hdr
, body, tags
);
837 foreach (auto mrow
; dbView
.statement(`SELECT msgid AS msgid FROM msgids WHERE uid=:uid LIMIT 1;`).bind(":uid", uid
).range
) {
838 msgid
= mrow
.msgid
!SQ3Text
;
840 //if (msgid.length == 0) return;
841 version(debug_updater
) {
843 auto fo
= VFile("zzz", "a");
844 fo
.writeln("MSGUID: ", uid
, "; MSGID: <", msgid
.getData
, ">");
847 conwriteln("MSGUID: ", uid
, "; MSGID: <", msgid
.getData
, ">");
849 // collect tags to modify
851 scope(exit
) delete taglist
;
854 foreach (auto trow
; dbView
.statement(`SELECT tagid AS tagid, parent AS parent FROM threads WHERE uid=:uid;`)
855 .bind(":uid", uid
).range
)
857 immutable uint tid
= trow
.tagid
!uint;
860 foreach (immutable uint tt
; relinkTids
) if (tt
== tid
) { found
= true; break; }
861 if (!found
) relinkTids
~= tid
;
863 if (!tid || trow
.parent
!uint ||
!chiroIsTagThreaded(tid
)) continue;
864 conwriteln(" tagid: ", tid
, " (", chiroGetTagName(tid
).getData
, ")");
865 version(debug_updater
) {
867 auto fo
= VFile("zzz", "a");
868 fo
.writeln(" tagid: ", tid
, " (", chiroGetTagName(tid
).getData
, ")");
874 foreach (immutable uint tid
; taglist
) {
875 uint setUsAsParentFor
= 0;
876 bool needFullRelink
= false;
877 // check if there are any references to us, and fix them by full relink
878 if (!msgid
.length
) continue;
879 foreach (auto nrow
; dbView
.statement(`
880 SELECT refids.uid AS uid, tt.parent AS parent
882 INNER JOIN(threads) AS tt
883 ON tt.tagid=:tagid AND tt.uid=refids.uid
884 WHERE idx=0 AND msgid=:msgid
886 ;`).bind(":tagid", tid
).bindConstText(":msgid", msgid
.getData
).range
)
888 if (nrow
.parent
!uint == 0) {
889 setUsAsParentFor
= nrow
.uid
!uint;
891 needFullRelink
= true;
895 if (needFullRelink
) {
896 //FIXME: make this faster!
897 conwriteln(" tid: ", tid
, " (", chiroGetTagName(tid
).getData
, "); performing full relink...");
898 chiroSupportRelinkTagThreads(tid
);
902 if (setUsAsParentFor
) {
903 conwriteln(" tid: ", tid
, " (", chiroGetTagName(tid
).getData
, "); settuing us (", uid
, ") as a parent for ", setUsAsParentFor
);
907 WHERE uid=:xuid AND tagid=:tagid
908 ;`).bind(":uid", uid
).bind(":xuid", setUsAsParentFor
).bind(":tagid", tid
).doAll();
911 // find parent for us
913 foreach (auto prow
; dbView
.statement(`
914 SELECT msgids.uid AS paruid
916 INNER JOIN(threads) AS tt
917 ON tt.tagid=:tagid AND tt.uid=msgids.uid
918 WHERE msgids.uid<>:uid AND msgids.msgid IN (SELECT msgid FROM refids WHERE uid=:uid ORDER BY idx)
920 ;`).bind(":uid", uid
).bind(":tagid", tid
).range
)
922 paruid
= prow
.paruid
!uint;
924 conwriteln(" tid: ", tid
, " (", chiroGetTagName(tid
).getData
, "); paruid=", paruid
);
925 if (paruid
&& paruid
!= uid
) {
926 dbView
.statement(`UPDATE threads SET parent=:paruid WHERE uid=:uid AND tagid=:tagid;`)
929 .bind(":paruid", paruid
)
939 if (relinkTids
.length
) {
940 foreach (immutable uint tid
; relinkTids
) {
941 if (vbwin
&& !vbwin
.closed
) vbwin
.postEvent(new TagThreadsUpdatedEvent(tid
));
947 //==========================================================================
951 //==========================================================================
952 void checkerThread (Tid ownerTid
) {
954 bool isError
= false;
963 ownerTid
.send(ControlCommand(ControlCommand
.Kind
.CheckError
, accid
));
981 foreach (auto arow
; stmtAccInfo
.st
.bind(":accid", accid
).range
) {
984 int upmins
= arow
.checktime
!int;
985 if (upmins
< 1) upmins
= 1; else if (upmins
> 100000) upmins
= 100000;
987 nosendauth
= (arow
.nosendauth
!int > 0);
988 debuglog
= (arow
.debuglog
!int > 0);
989 nntplastindex
= arow
.nntplastindex
!uint;
990 name
= arow
.name
!SQ3Text
;
991 recvserver
= arow
.recvserver
!SQ3Text
;
992 sendserver
= arow
.sendserver
!SQ3Text
;
993 user
= arow
.user
!SQ3Text
;
994 pass
= arow
.pass
!SQ3Text
;
995 inbox
= arow
.inbox
!SQ3Text
;
996 nntpgroup
= arow
.nntpgroup
!SQ3Text
;
997 xemail
= arow
.email
!SQ3Text
;
1001 ownerTid
.send(ControlCommand(ControlCommand
.Kind
.CheckError
, accid
));
1014 foreach (ref ToSend ss
; sendQueue
) { ss
.from
.clear
; ss
.to
.clear
; ss
.data
.clear
; }
1015 sendQueue
.length
= 0;
1018 //FIXME: nntp sends!
1019 if (sendserver
.length
&& (nntpgroup
.length
!= 0 || xemail
.length
!= 0)) {
1020 // check if we have something to send
1021 foreach (auto srow
; dbView
.statement(`
1022 SELECT uid AS uid, from_pop3 AS from_pop3, to_pop3 AS to_pop3, ChiroUnpack(data) AS data
1024 WHERE accid=:accid AND sendtime=0
1025 ;`).bind(":accid", accid
).range
)
1028 ss
.uid
= srow
.uid
!uint;
1029 ss
.from
= srow
.from_pop3
!SQ3Text
;
1030 ss
.to
= srow
.to_pop3
!SQ3Text
;
1031 ss
.data
= srow
.data
!SQ3Text
;
1036 //FIXME: batch send!
1037 if (sendQueue
.length
) {
1038 conwriteln("sending ", sendQueue
.length
, " message", (sendQueue
.length
== 1 ?
"" : "s"));
1039 foreach (ref ToSend ss
; sendQueue
) {
1041 if (nntpgroup
.length
== 0) {
1042 conwriteln("*** [", name
, "]: connecting... (smtp)");
1043 SocketSMTP nsk
= new SocketSMTP(sendserver
.idup
);
1044 scope(exit
) { nsk
.close(); delete nsk
; }
1046 conwriteln("[", name
, "]: authenticating...");
1047 nsk
.auth(xemail
.getData
, user
.getData
, pass
.getData
);
1049 conwriteln("[", name
, "]: sending (uid=", ss
.uid
, ")...");
1050 nsk
.sendMessage(ss
.from
.getData
, ss
.to
.getData
, ss
.data
.getData
);
1052 conwriteln("[", name
, "]: closing...");
1054 conwriteln("*** [", name
, "]: connecting... (nntp)");
1055 SocketNNTP nsk
= new SocketNNTP(recvserver
.idup
);
1056 scope(exit
) { nsk
.close(); delete nsk
; }
1057 conwriteln("[", name
, "]: selecting group (", nntpgroup
, ")");
1058 nsk
.selectGroup(nntpgroup
.getData
);
1059 conwriteln("[", name
, "]: sending (uid=", ss
.uid
, ")...");
1061 nsk
.doSendRaw(ss
.data
.getData
);
1062 conwriteln("[", name
, "]: getting answer...");
1063 auto ln
= nsk
.readLine
;
1064 conwriteln("[", name
, "]: ", ln
); // 340 Ok, recommended message-ID <o7dq4o$mpm$1@digitalmars.com>
1065 if (ln
.length
== 0 || ln
[0] != '3') throw new Exception(ln
.idup
);
1066 conwriteln("[", name
, "]: closing...");
1070 } catch (Exception e
) {
1071 conwriteln("SENDING ERROR: ", e
.msg
);
1075 // mark sent messages
1077 foreach (ref ToSend ss
; sendQueue
) {
1081 SET sendtime=CAST(strftime('%s','now') AS INTEGER), lastsendtime=CAST(strftime('%s','now') AS INTEGER)
1083 ;`).bind(":uid", ss
.uid
).doAll();
1087 SET lastsendtime=CAST(strftime('%s','now') AS INTEGER)
1089 ;`).bind(":uid", ss
.uid
).doAll();
1096 conwriteln("checking account '", name
, "' (", accid
, ")...");
1098 stmtSetCheckTime
.st
.bind(":accid", accid
).bind(":lastcheck", RunningAverageExp
.GetTickCount()+checktime
*60).doAll();
1100 // ////////////////////////////////////////////////////////////////// //
1102 auto nsk
= new SocketNNTP(recvserver
.idup
);
1103 scope(exit
) { nsk
.close(); delete nsk
; }
1105 nsk
.selectGroup(nntpgroup
);
1106 if (nsk
.emptyGroup
) {
1107 conwriteln("[", name
, ":", nntpgroup
, "]: no new articles (empty group)");
1111 uint stnum
= nntplastindex
+1;
1112 if (stnum
> nsk
.hiwater
) {
1113 conwriteln("[", name
, ":", nntpgroup
, "]: no new articles");
1117 conwriteln("[", name
, ":", nntpgroup
, "]: ", nsk
.hiwater
+1-stnum
, " (possible) new articles");
1119 // download new articles
1120 foreach (immutable uint anum
; stnum
..nsk
.hiwater
+1) {
1123 msg
= nsk
.getArticle(anum
);
1124 } catch (Exception e
) {
1125 conwriteln("[", name
, ":", nntpgroup
, "]: error downloading article #", anum
);
1128 if (msg
.length
== 0) continue; // this article is empty
1129 // insert article into the storage
1130 // filtering will be done later, for now, insert with the default inbox
1132 if (inbox
.length
) tags
~= inbox
;
1134 if (tags
.length
) tags
~= "|";
1138 if (tags
.length
== 0) tags
= "#hobo";
1139 conwriteln("[", name
, ":", nntpgroup
, "]: storing article #", anum
, " for '", tags
.getData
, "'...");
1141 dbStore
.statement(`INSERT INTO messages(tags, data) VALUES(:tags, ChiroPack(:data));`)
1142 .bindConstText(":tags", tags
.getData
)
1143 .bindConstBlob(":data", msg
.getData
)
1146 // update account with the new highest nntp index
1148 dbConf
.statement(`UPDATE accounts SET nntplastindex=:anum WHERE accid=:accid;`)
1149 .bind(":accid", accid
)
1150 .bind(":anum", anum
)
1156 // ////////////////////////////////////////////////////////////////// //
1158 conwriteln("*** [", name
, "]: connecting...");
1159 auto pop3
= new SocketPOP3(recvserver
.idup
);
1160 scope(exit
) { pop3
.close(); delete pop3
; }
1162 conwriteln("[", name
, "]: authenticating...");
1163 pop3
.auth(user
, pass
);
1165 auto newmsg
= pop3
.getNewMailCount
;
1167 conwriteln("[", name
, "]: no new messages");
1170 conwriteln("[", name
, "]: ", newmsg
, " new message", (newmsg
> 1 ?
"s" : ""));
1171 foreach (immutable int popidx
; 1..newmsg
+1) {
1174 msg
= pop3
.getMessage(popidx
); // full message, with the ending dot
1175 } catch (Exception e
) {
1176 conwriteln("[", name
, "]: error downloading message #", popidx
);
1179 if (msg
.length
!= 0) {
1181 if (inbox
.length
) tags
~= inbox
;
1183 if (tags
.length
) tags
~= "|";
1187 if (tags
.length
== 0) tags
= "#hobo";
1188 conwriteln("[", name
, ":", nntpgroup
, "]: storing message #", popidx
, " for '", tags
.getData
, "'...");
1190 dbStore
.statement(`INSERT INTO messages(tags, data) VALUES(:tags, ChiroPack(:data));`)
1191 .bindConstText(":tags", tags
.getData
)
1192 .bindConstBlob(":data", msg
.getData
)
1196 //auto msg = pop3.getMessage!true(popidx); // full message, with the ending dot, and exact terminators
1198 pop3
.deleteMessage(popidx
);
1202 // ////////////////////////////////////////////////////////////////// //
1204 if (nntpgroup
.length
) CheckNNTP(); else CheckSMTP();
1205 } catch (Throwable e
) {
1206 conwriteln("ERROR checking account '", name
, "' (", accid
, "): ", e
.msg
);
1210 conwriteln("done checking account '", name
, "' (", accid
, ")...");
1212 if (vbwin
&& !vbwin
.closed
) {
1213 //{ import core.stdc.stdio : fprintf, stderr; fprintf(stderr, "POSTING %s: accid=%u\n", (isError ? "ERROR".ptr : "DONE".ptr), accid); }
1214 vbwin
.postEvent(new UpdatingAccountCompleteEvent(accid
));
1215 //{ import core.stdc.stdio : fprintf, stderr; fprintf(stderr, "POSTED %s: accid=%u\n", (isError ? "ERROR".ptr : "DONE".ptr), accid); }
1216 //sqlite3_sleep(1000);
1218 } catch (Throwable e
) {
1219 // here, we are dead and fucked (the exact order doesn't matter)
1220 //import core.stdc.stdlib : abort;
1221 import core
.stdc
.stdio
: fprintf
, stderr
;
1222 //import core.memory : GC;
1223 import core
.thread
: thread_suspendAll
;
1224 //GC.disable(); // yeah
1225 //thread_suspendAll(); // stop right here, you criminal scum!
1226 auto s
= e
.toString();
1227 fprintf(stderr
, "\n=== FATAL ===\n%.*s\n", cast(uint)s
.length
, s
.ptr
);
1228 //abort(); // die, you bitch!
1229 ownerTid
.send(ControlCommand(ControlCommand
.Kind
.CheckError
, accid
));
1232 //if (vbwin) vbwin.postEvent(evDoConCommands); }
1233 //{ import core.stdc.stdio : fprintf, stderr; fprintf(stderr, "SENDING %s: accid=%u\n", (isError ? "ERROR".ptr : "DONE".ptr), accid); }
1234 ownerTid
.send(ControlCommand((isError ? ControlCommand
.Kind
.CheckError
: ControlCommand
.Kind
.CheckDone
), accid
));
1235 //{ import core.stdc.stdio : fprintf, stderr; fprintf(stderr, "SENDT %s: accid=%u\n", (isError ? "ERROR".ptr : "DONE".ptr), accid); }
1237 import core
.memory
: GC
;
1241 //{ import core.stdc.stdio : fprintf, stderr; fprintf(stderr, "DONE with accid=%u\n", accid); }
1245 //==========================================================================
1249 //==========================================================================
1250 void controlThread (Tid ownerTid
) {
1252 bool doQuit
= false;
1254 static struct AccCheck
{
1260 AccCheck
[] accidCheckList
;
1261 accidCheckList
.reserve(128);
1264 CREATE TEMP TABLE IF NOT EXISTS checktimes (
1265 accid INTEGER PRIMARY KEY UNIQUE /* unique, never zero */
1266 , lastcheck INTEGER NOT NULL DEFAULT 0
1267 , checking INTEGER NOT NULL DEFAULT 0
1271 static stmtAllAccs
= LazyStatement
!"Conf"(`
1274 , checktime AS checktime
1276 WHERE nocheck=0 AND inbox<>''
1280 static stmtGetCheckTime
= LazyStatement
!"Conf"(`
1281 SELECT lastcheck AS lastcheck FROM checktimes WHERE accid=:accid LIMIT 1
1286 MonoTime lastCollect
= MonoTime
.currTime
;
1287 //accidCheckList ~= AccCheck();
1289 bool needUpdates
= false;
1290 bool forceAll
= false;
1292 if (doQuit
&& accidCheckList
.length
== 0) break;
1293 receiveTimeout((doQuit ?
50.msecs
: accidCheckList
.length || needUpdates || forceAll ?
1.seconds
: 60.seconds
),
1294 (ControlCommand cmd
) {
1295 final switch (cmd
.type
) {
1296 case ControlCommand
.Kind
.ForceUpdateAll
: forceAll
= true; break;
1297 case ControlCommand
.Kind
.Ping
: break;
1298 case ControlCommand
.Kind
.Quit
: doQuit
= true; break;
1299 case ControlCommand
.Kind
.DisableUpdates
: updatesDisabled
= true; break;
1300 case ControlCommand
.Kind
.EnableUpdates
: updatesDisabled
= false; break;
1301 case ControlCommand
.Kind
.CheckDone
:
1302 case ControlCommand
.Kind
.CheckError
:
1304 if (accidCheckList
.length
) {
1305 foreach (immutable idx
, const ref AccCheck nfo
; accidCheckList
) {
1306 if (nfo
.accid
== cmd
.accid
) {
1307 //if (!doQuit && vbwin && !vbwin.closed) vbwin.postEvent(new UpdatingAccountCompleteEvent(nfo.accid));
1308 foreach (immutable c
; idx
+1..accidCheckList
.length
) accidCheckList
[c
-1] = accidCheckList
[c
];
1309 accidCheckList
.length
-= 1;
1313 if (!doQuit
&& vbwin
&& !vbwin
.closed
&& accidCheckList
.length
== 0) vbwin
.postEvent(new UpdatingCompleteEvent());
1320 for (usize idx
= 0; idx
< accidCheckList
.length
; ) {
1321 if (accidCheckList
[idx
].accid
!= 0) {
1324 foreach (immutable c
; idx
+1..accidCheckList
.length
) accidCheckList
[c
-1] = accidCheckList
[c
];
1325 accidCheckList
.length
-= 1;
1330 for (usize idx
= 0; idx
< accidCheckList
.length
; ) {
1331 if (accidCheckList
[idx
].inprogress
) {
1334 foreach (immutable c
; idx
+1..accidCheckList
.length
) accidCheckList
[c
-1] = accidCheckList
[c
];
1335 accidCheckList
.length
-= 1;
1341 if (!needUpdates
&& !updatesDisabled
) {
1342 ulong ctt
= RunningAverageExp
.GetTickCount();
1343 foreach (auto arow
; stmtAllAccs
.st
.range
) {
1345 foreach (const ref AccCheck nfo
; accidCheckList
) if (nfo
.accid
== arow
.accid
!uint) { found
= true; break; }
1346 if (found
) continue;
1349 accidCheckList
~= AccCheck(arow
.accid
!uint);
1353 int upmins
= arow
.checktime
!int;
1354 if (upmins
< 1) upmins
= 1; else if (upmins
> 100000) upmins
= 100000;
1355 ulong lastcheck
= 0;
1356 foreach (auto crow
; stmtGetCheckTime
.st
.bind(":accid", arow
.accid
!uint).range
) lastcheck
= crow
.lastcheck
!ulong;
1357 lastcheck
+= upmins
*60; // next check time
1358 if (lastcheck
< ctt
) {
1360 accidCheckList
~= AccCheck(arow
.accid
!uint);
1364 conwriteln("check for accid ", arow.accid!uint, " in ", (lastcheck-ctt)/60, " minutes...");
1371 if (!updatesDisabled
) {
1372 foreach (ref AccCheck nfo
; accidCheckList
) {
1373 if (nfo
.inprogress
) break;
1374 if (vbwin
) vbwin
.postEvent(new UpdatingAccountEvent(nfo
.accid
));
1375 nfo
.tid
= spawn(&checkerThread
, thisTid
);
1376 nfo
.inprogress
= true;
1377 nfo
.tid
.send(CheckCommand(nfo
.accid
));
1382 bool hasProgress
= false;
1383 foreach (ref AccCheck nfo
; accidCheckList
) if (nfo
.inprogress
) { hasProgress
= true; break; }
1386 needUpdates
= false;
1390 immutable ctt
= MonoTime
.currTime
;
1391 if ((ctt
-lastCollect
).total
!"minutes" >= 1) {
1392 import core
.memory
: GC
;
1399 ownerTid
.send(ControlReply
.Quit
);
1400 } catch (Throwable e
) {
1401 // here, we are dead and fucked (the exact order doesn't matter)
1402 import core
.stdc
.stdlib
: abort
;
1403 import core
.stdc
.stdio
: fprintf
, stderr
;
1404 import core
.memory
: GC
;
1405 import core
.thread
: thread_suspendAll
;
1406 GC
.disable(); // yeah
1407 thread_suspendAll(); // stop right here, you criminal scum!
1408 auto s
= e
.toString();
1409 fprintf(stderr
, "\n=== FATAL ===\n%.*s\n", cast(uint)s
.length
, s
.ptr
);
1410 abort(); // die, you bitch!
1415 //==========================================================================
1419 //==========================================================================
1420 public void receiverDisable () {
1425 //==========================================================================
1427 // disableMailboxUpdates
1429 //==========================================================================
1430 public void disableMailboxUpdates () {
1431 if (!rcStarted
) return;
1432 controlThreadId
.send(ControlCommand(ControlCommand
.Kind
.DisableUpdates
));
1436 //==========================================================================
1438 // enableMailboxUpdates
1440 //==========================================================================
1441 public void enableMailboxUpdates () {
1442 if (!rcStarted
) return;
1443 controlThreadId
.send(ControlCommand(ControlCommand
.Kind
.EnableUpdates
));
1447 //==========================================================================
1449 // receiverForceUpdateAll
1451 //==========================================================================
1452 public void receiverForceUpdateAll () {
1453 if (!rcStarted
) return;
1454 controlThreadId
.send(ControlCommand(ControlCommand
.Kind
.ForceUpdateAll
));
1458 //==========================================================================
1462 //==========================================================================
1463 public void receiverInit () {
1464 if (rcStarted
) return;
1465 if (rcDisabled
) return;
1466 controlThreadId
= spawn(&controlThread
, thisTid
);
1471 //==========================================================================
1475 //==========================================================================
1476 public void receiverDeinit () {
1477 if (!rcStarted
) return;
1478 controlThreadId
.send(ControlCommand(ControlCommand
.Kind
.Quit
));
1482 (ControlReply reply
) {
1483 if (reply
== ControlReply
.Quit
) done
= true;