8 Public functions: Internaldate2tuple
14 # Author: Piers Lauder <piers@cs.su.oz.au> December 1997.
16 # Authentication code contributed by Donn Cave <donn@u.washington.edu> June 1998.
20 import binascii
, re
, socket
, string
, time
, random
, sys
27 AllowedVersions
= ('IMAP4REV1', 'IMAP4') # Most recent first
33 'APPEND': ('AUTH', 'SELECTED'),
34 'AUTHENTICATE': ('NONAUTH',),
35 'CAPABILITY': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'),
36 'CHECK': ('SELECTED',),
37 'CLOSE': ('SELECTED',),
38 'COPY': ('SELECTED',),
39 'CREATE': ('AUTH', 'SELECTED'),
40 'DELETE': ('AUTH', 'SELECTED'),
41 'EXAMINE': ('AUTH', 'SELECTED'),
42 'EXPUNGE': ('SELECTED',),
43 'FETCH': ('SELECTED',),
44 'LIST': ('AUTH', 'SELECTED'),
45 'LOGIN': ('NONAUTH',),
46 'LOGOUT': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'),
47 'LSUB': ('AUTH', 'SELECTED'),
48 'NOOP': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'),
49 'PARTIAL': ('SELECTED',),
50 'RENAME': ('AUTH', 'SELECTED'),
51 'SEARCH': ('SELECTED',),
52 'SELECT': ('AUTH', 'SELECTED'),
53 'STATUS': ('AUTH', 'SELECTED'),
54 'STORE': ('SELECTED',),
55 'SUBSCRIBE': ('AUTH', 'SELECTED'),
57 'UNSUBSCRIBE': ('AUTH', 'SELECTED'),
60 # Patterns to match server responses
62 Continuation
= re
.compile(r
'\+( (?P<data>.*))?')
63 Flags
= re
.compile(r
'.*FLAGS \((?P<flags>[^\)]*)\)')
64 InternalDate
= re
.compile(r
'.*INTERNALDATE "'
65 r
'(?P<day>[ 123][0-9])-(?P<mon>[A-Z][a-z][a-z])-(?P<year>[0-9][0-9][0-9][0-9])'
66 r
' (?P<hour>[0-9][0-9]):(?P<min>[0-9][0-9]):(?P<sec>[0-9][0-9])'
67 r
' (?P<zonen>[-+])(?P<zoneh>[0-9][0-9])(?P<zonem>[0-9][0-9])'
69 Literal
= re
.compile(r
'.*{(?P<size>\d+)}$')
70 Response_code
= re
.compile(r
'\[(?P<type>[A-Z-]+)( (?P<data>[^\]]*))?\]')
71 Untagged_response
= re
.compile(r
'\* (?P<type>[A-Z-]+)( (?P<data>.*))?')
72 Untagged_status
= re
.compile(r
'\* (?P<data>\d+) (?P<type>[A-Z-]+)( (?P<data2>.*))?')
78 """IMAP4 client class.
80 Instantiate with: IMAP4([host[, port]])
82 host - host's name (default: localhost);
83 port - port number (default: standard IMAP4 port).
85 All IMAP4rev1 commands are supported by methods of the same
88 All arguments to commands are converted to strings, except for
89 AUTHENTICATE, and the last argument to APPEND which is passed as
90 an IMAP4 literal. If necessary (the string contains
91 white-space and isn't enclosed with either parentheses or
92 double quotes) each string is quoted. However, the 'password'
93 argument to the LOGIN command is always quoted.
95 Each command returns a tuple: (type, [data, ...]) where 'type'
96 is usually 'OK' or 'NO', and 'data' is either the text from the
97 tagged response, or untagged results from command.
99 Errors raise the exception class <instance>.error("<reason>").
100 IMAP4 server errors raise <instance>.abort("<reason>"),
101 which is a sub-class of 'error'. Mailbox status changes
102 from READ-WRITE to READ-ONLY raise the exception class
103 <instance>.readonly("<reason>"), which is a sub-class of 'abort'.
105 "error" exceptions imply a program error.
106 "abort" exceptions imply the connection should be reset, and
107 the command re-tried.
108 "readonly" exceptions imply the command should be re-tried.
110 Note: to use this module, you must read the RFCs pertaining
111 to the IMAP4 protocol, as the semantics of the arguments to
112 each IMAP4 command are left to the invoker, not to mention
116 class error(Exception): pass # Logical errors - debug required
117 class abort(error
): pass # Service errors - close and retry
118 class readonly(abort
): pass # Mailbox status changed to READ-ONLY
120 mustquote
= re
.compile(r
"[^\w!#$%&'*+,.:;<=>?^`|~-]")
122 def __init__(self
, host
= '', port
= IMAP4_PORT
):
126 self
.state
= 'LOGOUT'
127 self
.literal
= None # A literal argument to a command
128 self
.tagged_commands
= {} # Tagged commands awaiting response
129 self
.untagged_responses
= {} # {typ: [data, ...], ...}
130 self
.continuation_response
= '' # Last continuation response
131 self
.is_readonly
= None # READ-ONLY desired state
134 # Open socket to server.
136 self
.open(host
, port
)
138 # Create unique tag for this session,
139 # and compile tagged response matcher.
141 self
.tagpre
= Int2AP(random
.randint(0, 31999))
142 self
.tagre
= re
.compile(r
'(?P<tag>'
144 + r
'\d+) (?P<type>[A-Z]+) (?P<data>.*)')
146 # Get server welcome message,
147 # request and store CAPABILITY response.
151 _mesg('new IMAP4 connection, tag=%s' % self
.tagpre
)
153 self
.welcome
= self
._get
_response
()
154 if self
.untagged_responses
.has_key('PREAUTH'):
156 elif self
.untagged_responses
.has_key('OK'):
157 self
.state
= 'NONAUTH'
159 raise self
.error(self
.welcome
)
162 self
._simple
_command
(cap
)
163 if not self
.untagged_responses
.has_key(cap
):
164 raise self
.error('no CAPABILITY response from server')
165 self
.capabilities
= tuple(string
.split(string
.upper(self
.untagged_responses
[cap
][-1])))
169 _mesg('CAPABILITIES: %s' % `self
.capabilities`
)
171 for version
in AllowedVersions
:
172 if not version
in self
.capabilities
:
174 self
.PROTOCOL_VERSION
= version
177 raise self
.error('server not IMAP4 compliant')
180 def __getattr__(self
, attr
):
181 # Allow UPPERCASE variants of IMAP4 command methods.
182 if Commands
.has_key(attr
):
183 return eval("self.%s" % string
.lower(attr
))
184 raise AttributeError("Unknown IMAP4 command: '%s'" % attr
)
191 def open(self
, host
, port
):
192 """Setup 'self.sock' and 'self.file'."""
193 self
.sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
194 self
.sock
.connect((self
.host
, self
.port
))
195 self
.file = self
.sock
.makefile('r')
199 """Return most recent 'RECENT' responses if any exist,
200 else prompt server for an update using the 'NOOP' command.
202 (typ, [data]) = <instance>.recent()
204 'data' is None if no new messages,
205 else list of RECENT responses, most recent last.
208 typ
, dat
= self
._untagged
_response
('OK', [None], name
)
211 typ
, dat
= self
.noop() # Prod server for response
212 return self
._untagged
_response
(typ
, dat
, name
)
215 def response(self
, code
):
216 """Return data for response 'code' if received, or None.
218 Old value for response 'code' is cleared.
220 (code, [data]) = <instance>.response(code)
222 return self
._untagged
_response
(code
, [None], string
.upper(code
))
226 """Return socket instance used to connect to IMAP4 server.
228 socket = <instance>.socket()
237 def append(self
, mailbox
, flags
, date_time
, message
):
238 """Append message to named mailbox.
240 (typ, [data]) = <instance>.append(mailbox, flags, date_time, message)
242 All args except `message' can be None.
248 if (flags
[0],flags
[-1]) != ('(',')'):
249 flags
= '(%s)' % flags
253 date_time
= Time2Internaldate(date_time
)
256 self
.literal
= message
257 return self
._simple
_command
(name
, mailbox
, flags
, date_time
)
260 def authenticate(self
, mechanism
, authobject
):
261 """Authenticate command - requires response processing.
263 'mechanism' specifies which authentication mechanism is to
264 be used - it must appear in <instance>.capabilities in the
265 form AUTH=<mechanism>.
267 'authobject' must be a callable object:
269 data = authobject(response)
271 It will be called to process server continuation responses.
272 It should return data that will be encoded and sent to server.
273 It should return None if the client abort response '*' should
276 mech
= string
.upper(mechanism
)
277 cap
= 'AUTH=%s' % mech
278 if not cap
in self
.capabilities
:
279 raise self
.error("Server doesn't allow %s authentication." % mech
)
280 self
.literal
= _Authenticator(authobject
).process
281 typ
, dat
= self
._simple
_command
('AUTHENTICATE', mech
)
283 raise self
.error(dat
[-1])
289 """Checkpoint mailbox on server.
291 (typ, [data]) = <instance>.check()
293 return self
._simple
_command
('CHECK')
297 """Close currently selected mailbox.
299 Deleted messages are removed from writable mailbox.
300 This is the recommended command before 'LOGOUT'.
302 (typ, [data]) = <instance>.close()
305 typ
, dat
= self
._simple
_command
('CLOSE')
311 def copy(self
, message_set
, new_mailbox
):
312 """Copy 'message_set' messages onto end of 'new_mailbox'.
314 (typ, [data]) = <instance>.copy(message_set, new_mailbox)
316 return self
._simple
_command
('COPY', message_set
, new_mailbox
)
319 def create(self
, mailbox
):
320 """Create new mailbox.
322 (typ, [data]) = <instance>.create(mailbox)
324 return self
._simple
_command
('CREATE', mailbox
)
327 def delete(self
, mailbox
):
328 """Delete old mailbox.
330 (typ, [data]) = <instance>.delete(mailbox)
332 return self
._simple
_command
('DELETE', mailbox
)
336 """Permanently remove deleted items from selected mailbox.
338 Generates 'EXPUNGE' response for each deleted message.
340 (typ, [data]) = <instance>.expunge()
342 'data' is list of 'EXPUNGE'd message numbers in order received.
345 typ
, dat
= self
._simple
_command
(name
)
346 return self
._untagged
_response
(typ
, dat
, name
)
349 def fetch(self
, message_set
, message_parts
):
350 """Fetch (parts of) messages.
352 (typ, [data, ...]) = <instance>.fetch(message_set, message_parts)
354 'data' are tuples of message part envelope and data.
357 typ
, dat
= self
._simple
_command
(name
, message_set
, message_parts
)
358 return self
._untagged
_response
(typ
, dat
, name
)
361 def list(self
, directory
='""', pattern
='*'):
362 """List mailbox names in directory matching pattern.
364 (typ, [data]) = <instance>.list(directory='""', pattern='*')
366 'data' is list of LIST responses.
369 typ
, dat
= self
._simple
_command
(name
, directory
, pattern
)
370 return self
._untagged
_response
(typ
, dat
, name
)
373 def login(self
, user
, password
):
374 """Identify client using plaintext password.
376 (typ, [data]) = <instance>.login(user, password)
378 NB: 'password' will be quoted.
380 #if not 'AUTH=LOGIN' in self.capabilities:
381 # raise self.error("Server doesn't allow LOGIN authentication." % mech)
382 typ
, dat
= self
._simple
_command
('LOGIN', user
, self
._quote
(password
))
384 raise self
.error(dat
[-1])
390 """Shutdown connection to server.
392 (typ, [data]) = <instance>.logout()
394 Returns server 'BYE' response.
396 self
.state
= 'LOGOUT'
397 try: typ
, dat
= self
._simple
_command
('LOGOUT')
398 except: typ
, dat
= 'NO', ['%s: %s' % sys
.exc_info()[:2]]
401 if self
.untagged_responses
.has_key('BYE'):
402 return 'BYE', self
.untagged_responses
['BYE']
406 def lsub(self
, directory
='""', pattern
='*'):
407 """List 'subscribed' mailbox names in directory matching pattern.
409 (typ, [data, ...]) = <instance>.lsub(directory='""', pattern='*')
411 'data' are tuples of message part envelope and data.
414 typ
, dat
= self
._simple
_command
(name
, directory
, pattern
)
415 return self
._untagged
_response
(typ
, dat
, name
)
419 """Send NOOP command.
421 (typ, data) = <instance>.noop()
425 _dump_ur(self
.untagged_responses
)
426 return self
._simple
_command
('NOOP')
429 def partial(self
, message_num
, message_part
, start
, length
):
430 """Fetch truncated part of a message.
432 (typ, [data, ...]) = <instance>.partial(message_num, message_part, start, length)
434 'data' is tuple of message part envelope and data.
437 typ
, dat
= self
._simple
_command
(name
, message_num
, message_part
, start
, length
)
438 return self
._untagged
_response
(typ
, dat
, 'FETCH')
441 def rename(self
, oldmailbox
, newmailbox
):
442 """Rename old mailbox name to new.
444 (typ, data) = <instance>.rename(oldmailbox, newmailbox)
446 return self
._simple
_command
('RENAME', oldmailbox
, newmailbox
)
449 def search(self
, charset
, *criteria
):
450 """Search mailbox for matching messages.
452 (typ, [data]) = <instance>.search(charset, criterium, ...)
454 'data' is space separated list of matching message numbers.
458 charset
= 'CHARSET ' + charset
459 typ
, dat
= apply(self
._simple
_command
, (name
, charset
) + criteria
)
460 return self
._untagged
_response
(typ
, dat
, name
)
463 def select(self
, mailbox
='INBOX', readonly
=None):
466 Flush all untagged responses.
468 (typ, [data]) = <instance>.select(mailbox='INBOX', readonly=None)
470 'data' is count of messages in mailbox ('EXISTS' response).
472 # Mandated responses are ('FLAGS', 'EXISTS', 'RECENT', 'UIDVALIDITY')
473 self
.untagged_responses
= {} # Flush old responses.
474 self
.is_readonly
= readonly
479 typ
, dat
= self
._simple
_command
(name
, mailbox
)
481 self
.state
= 'AUTH' # Might have been 'SELECTED'
483 self
.state
= 'SELECTED'
484 if self
.untagged_responses
.has_key('READ-ONLY') \
488 _dump_ur(self
.untagged_responses
)
489 raise self
.readonly('%s is not writable' % mailbox
)
490 return typ
, self
.untagged_responses
.get('EXISTS', [None])
493 def status(self
, mailbox
, names
):
494 """Request named status conditions for mailbox.
496 (typ, [data]) = <instance>.status(mailbox, names)
499 if self
.PROTOCOL_VERSION
== 'IMAP4':
500 raise self
.error('%s unimplemented in IMAP4 (obtain IMAP4rev1 server, or re-code)' % name
)
501 typ
, dat
= self
._simple
_command
(name
, mailbox
, names
)
502 return self
._untagged
_response
(typ
, dat
, name
)
505 def store(self
, message_set
, command
, flag_list
):
506 """Alters flag dispositions for messages in mailbox.
508 (typ, [data]) = <instance>.store(message_set, command, flag_list)
510 typ
, dat
= self
._simple
_command
('STORE', message_set
, command
, flag_list
)
511 return self
._untagged
_response
(typ
, dat
, 'FETCH')
514 def subscribe(self
, mailbox
):
515 """Subscribe to new mailbox.
517 (typ, [data]) = <instance>.subscribe(mailbox)
519 return self
._simple
_command
('SUBSCRIBE', mailbox
)
522 def uid(self
, command
, *args
):
523 """Execute "command arg ..." with messages identified by UID,
524 rather than message number.
526 (typ, [data]) = <instance>.uid(command, arg1, arg2, ...)
528 Returns response appropriate to 'command'.
530 command
= string
.upper(command
)
531 if not Commands
.has_key(command
):
532 raise self
.error("Unknown IMAP4 UID command: %s" % command
)
533 if self
.state
not in Commands
[command
]:
534 raise self
.error('command %s illegal in state %s'
535 % (command
, self
.state
))
537 typ
, dat
= apply(self
._simple
_command
, (name
, command
) + args
)
538 if command
== 'SEARCH':
542 return self
._untagged
_response
(typ
, dat
, name
)
545 def unsubscribe(self
, mailbox
):
546 """Unsubscribe from old mailbox.
548 (typ, [data]) = <instance>.unsubscribe(mailbox)
550 return self
._simple
_command
('UNSUBSCRIBE', mailbox
)
553 def xatom(self
, name
, *args
):
554 """Allow simple extension commands
555 notified by server in CAPABILITY response.
557 (typ, [data]) = <instance>.xatom(name, arg, ...)
559 if name
[0] != 'X' or not name
in self
.capabilities
:
560 raise self
.error('unknown extension command: %s' % name
)
561 return apply(self
._simple
_command
, (name
,) + args
)
568 def _append_untagged(self
, typ
, dat
):
570 if dat
is None: dat
= ''
571 ur
= self
.untagged_responses
574 _mesg('untagged_responses[%s] %s += ["%s"]' %
575 (typ
, len(ur
.get(typ
,'')), dat
))
582 def _check_bye(self
):
583 bye
= self
.untagged_responses
.get('BYE')
585 raise self
.abort(bye
[-1])
588 def _command(self
, name
, *args
):
590 if self
.state
not in Commands
[name
]:
593 'command %s illegal in state %s' % (name
, self
.state
))
595 for typ
in ('OK', 'NO', 'BAD'):
596 if self
.untagged_responses
.has_key(typ
):
597 del self
.untagged_responses
[typ
]
599 if self
.untagged_responses
.has_key('READ-ONLY') \
600 and not self
.is_readonly
:
601 raise self
.readonly('mailbox status changed to READ-ONLY')
603 tag
= self
._new
_tag
()
604 data
= '%s %s' % (tag
, name
)
606 if arg
is None: continue
607 data
= '%s %s' % (data
, self
._checkquote
(arg
))
609 literal
= self
.literal
610 if literal
is not None:
612 if type(literal
) is type(self
._command
):
616 data
= '%s {%s}' % (data
, len(literal
))
625 self
.sock
.send('%s%s' % (data
, CRLF
))
626 except socket
.error
, val
:
627 raise self
.abort('socket error: %s' % val
)
633 # Wait for continuation response
635 while self
._get
_response
():
636 if self
.tagged_commands
[tag
]: # BAD/NO?
642 literal
= literator(self
.continuation_response
)
646 _mesg('write literal size %s' % len(literal
))
649 self
.sock
.send(literal
)
651 except socket
.error
, val
:
652 raise self
.abort('socket error: %s' % val
)
660 def _command_complete(self
, name
, tag
):
663 typ
, data
= self
._get
_tagged
_response
(tag
)
664 except self
.abort
, val
:
665 raise self
.abort('command: %s => %s' % (name
, val
))
666 except self
.error
, val
:
667 raise self
.error('command: %s => %s' % (name
, val
))
670 raise self
.error('%s command error: %s %s' % (name
, typ
, data
))
674 def _get_response(self
):
676 # Read response and store.
678 # Returns None for continuation responses,
679 # otherwise first response line received.
681 resp
= self
._get
_line
()
683 # Command completion response?
685 if self
._match
(self
.tagre
, resp
):
686 tag
= self
.mo
.group('tag')
687 if not self
.tagged_commands
.has_key(tag
):
688 raise self
.abort('unexpected tagged response: %s' % resp
)
690 typ
= self
.mo
.group('type')
691 dat
= self
.mo
.group('data')
692 self
.tagged_commands
[tag
] = (typ
, [dat
])
696 # '*' (untagged) responses?
698 if not self
._match
(Untagged_response
, resp
):
699 if self
._match
(Untagged_status
, resp
):
700 dat2
= self
.mo
.group('data2')
703 # Only other possibility is '+' (continuation) response...
705 if self
._match
(Continuation
, resp
):
706 self
.continuation_response
= self
.mo
.group('data')
707 return None # NB: indicates continuation
709 raise self
.abort("unexpected response: '%s'" % resp
)
711 typ
= self
.mo
.group('type')
712 dat
= self
.mo
.group('data')
713 if dat
is None: dat
= '' # Null untagged response
714 if dat2
: dat
= dat
+ ' ' + dat2
716 # Is there a literal to come?
718 while self
._match
(Literal
, dat
):
720 # Read literal direct from connection.
722 size
= string
.atoi(self
.mo
.group('size'))
725 _mesg('read literal size %s' % size
)
726 data
= self
.file.read(size
)
728 # Store response with literal as tuple
730 self
._append
_untagged
(typ
, (dat
, data
))
732 # Read trailer - possibly containing another literal
734 dat
= self
._get
_line
()
736 self
._append
_untagged
(typ
, dat
)
738 # Bracketed response information?
740 if typ
in ('OK', 'NO', 'BAD') and self
._match
(Response_code
, dat
):
741 self
._append
_untagged
(self
.mo
.group('type'), self
.mo
.group('data'))
744 if self
.debug
>= 1 and typ
in ('NO', 'BAD', 'BYE'):
745 _mesg('%s response: %s' % (typ
, dat
))
750 def _get_tagged_response(self
, tag
):
753 result
= self
.tagged_commands
[tag
]
754 if result
is not None:
755 del self
.tagged_commands
[tag
]
758 # Some have reported "unexpected response" exceptions.
759 # Note that ignoring them here causes loops.
760 # Instead, send me details of the unexpected response and
761 # I'll update the code in `_get_response()'.
765 except self
.abort
, val
:
774 line
= self
.file.readline()
776 raise self
.abort('socket error: EOF')
778 # Protocol mandates all lines terminated by CRLF
789 def _match(self
, cre
, s
):
791 # Run compiled regular expression match method on 's'.
792 # Save result, return success.
794 self
.mo
= cre
.match(s
)
796 if self
.mo
is not None and self
.debug
>= 5:
797 _mesg("\tmatched r'%s' => %s" % (cre
.pattern
, `self
.mo
.groups()`
))
798 return self
.mo
is not None
803 tag
= '%s%s' % (self
.tagpre
, self
.tagnum
)
804 self
.tagnum
= self
.tagnum
+ 1
805 self
.tagged_commands
[tag
] = None
809 def _checkquote(self
, arg
):
811 # Must quote command args if non-alphanumeric chars present,
812 # and not already quoted.
814 if type(arg
) is not type(''):
816 if (arg
[0],arg
[-1]) in (('(',')'),('"','"')):
818 if self
.mustquote
.search(arg
) is None:
820 return self
._quote
(arg
)
823 def _quote(self
, arg
):
825 arg
= string
.replace(arg
, '\\', '\\\\')
826 arg
= string
.replace(arg
, '"', '\\"')
831 def _simple_command(self
, name
, *args
):
833 return self
._command
_complete
(name
, apply(self
._command
, (name
,) + args
))
836 def _untagged_response(self
, typ
, dat
, name
):
840 if not self
.untagged_responses
.has_key(name
):
842 data
= self
.untagged_responses
[name
]
845 _mesg('untagged_responses[%s] => %s' % (name
, data
))
846 del self
.untagged_responses
[name
]
851 class _Authenticator
:
853 """Private class to provide en/decoding
854 for base64-based authentication conversation.
857 def __init__(self
, mechinst
):
858 self
.mech
= mechinst
# Callable object to provide/process data
860 def process(self
, data
):
861 ret
= self
.mech(self
.decode(data
))
863 return '*' # Abort conversation
864 return self
.encode(ret
)
866 def encode(self
, inp
):
868 # Invoke binascii.b2a_base64 iteratively with
869 # short even length buffers, strip the trailing
870 # line feed from the result and append. "Even"
871 # means a number that factors to both 6 and 8,
872 # so when it gets to the end of the 8-bit input
873 # there's no partial 6-bit output.
883 e
= binascii
.b2a_base64(t
)
888 def decode(self
, inp
):
891 return binascii
.a2b_base64(inp
)
895 Mon2num
= {'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6,
896 'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12}
898 def Internaldate2tuple(resp
):
900 """Convert IMAP4 INTERNALDATE to UT.
902 Returns Python time module tuple.
905 mo
= InternalDate
.match(resp
)
909 mon
= Mon2num
[mo
.group('mon')]
910 zonen
= mo
.group('zonen')
912 for name
in ('day', 'year', 'hour', 'min', 'sec', 'zoneh', 'zonem'):
913 exec "%s = string.atoi(mo.group('%s'))" % (name
, name
)
915 # INTERNALDATE timezone must be subtracted to get UT
917 zone
= (zoneh
*60 + zonem
)*60
921 tt
= (year
, mon
, day
, hour
, min, sec
, -1, -1, -1)
923 utc
= time
.mktime(tt
)
925 # Following is necessary because the time module has no 'mkgmtime'.
926 # 'mktime' assumes arg in local timezone, so adds timezone/altzone.
928 lt
= time
.localtime(utc
)
929 if time
.daylight
and lt
[-1]:
930 zone
= zone
+ time
.altzone
932 zone
= zone
+ time
.timezone
934 return time
.localtime(utc
- zone
)
940 """Convert integer to A-P string representation."""
942 val
= ''; AP
= 'ABCDEFGHIJKLMNOP'
945 num
, mod
= divmod(num
, 16)
951 def ParseFlags(resp
):
953 """Convert IMAP4 flags response to python tuple."""
955 mo
= Flags
.match(resp
)
959 return tuple(string
.split(mo
.group('flags')))
962 def Time2Internaldate(date_time
):
964 """Convert 'date_time' to IMAP4 INTERNALDATE representation.
966 Return string in form: '"DD-Mmm-YYYY HH:MM:SS +HHMM"'
969 dttype
= type(date_time
)
970 if dttype
is type(1) or dttype
is type(1.1):
971 tt
= time
.localtime(date_time
)
972 elif dttype
is type(()):
974 elif dttype
is type(""):
975 return date_time
# Assume in correct format
976 else: raise ValueError
978 dt
= time
.strftime("%d-%b-%Y %H:%M:%S", tt
)
981 if time
.daylight
and tt
[-1]:
984 zone
= -time
.timezone
985 return '"' + dt
+ " %+02d%02d" % divmod(zone
/60, 60) + '"'
991 def _mesg(s
, secs
=None):
994 tm
= time
.strftime('%M:%S', time
.localtime(secs
))
995 sys
.stderr
.write(' %s.%02d %s\n' % (tm
, (secs
*100)%100, s
))
999 # Dump untagged responses (in `dict').
1004 l
= map(lambda x
,j
=j
:'%s: "%s"' % (x
[0], x
[1][0] and j(x
[1], '" "') or ''), l
)
1005 _mesg('untagged responses dump:%s%s' % (t
, j(l
, t
)))
1007 _cmd_log
= [] # Last `_cmd_log_len' interactions
1011 # Keep log of last `_cmd_log_len' interactions for debugging.
1012 if len(_cmd_log
) == _cmd_log_len
:
1014 _cmd_log
.append((time
.time(), line
))
1017 _mesg('last %d IMAP4 interactions:' % len(_cmd_log
))
1018 for secs
,line
in _cmd_log
:
1023 if __name__
== '__main__':
1025 import getopt
, getpass
, sys
1028 optlist
, args
= getopt
.getopt(sys
.argv
[1:], 'd:')
1029 except getopt
.error
, val
:
1032 for opt
,val
in optlist
:
1036 if not args
: args
= ('',)
1040 USER
= getpass
.getuser()
1041 PASSWD
= getpass
.getpass("IMAP password for %s on %s" % (USER
, host
or "localhost"))
1043 test_mesg
= 'From: %s@localhost\nSubject: IMAP4 test\n\ndata...\n' % USER
1045 ('login', (USER
, PASSWD
)),
1046 ('create', ('/tmp/xxx 1',)),
1047 ('rename', ('/tmp/xxx 1', '/tmp/yyy')),
1048 ('CREATE', ('/tmp/yyz 2',)),
1049 ('append', ('/tmp/yyz 2', None, None, test_mesg
)),
1050 ('list', ('/tmp', 'yy*')),
1051 ('select', ('/tmp/yyz 2',)),
1052 ('search', (None, 'SUBJECT', 'test')),
1053 ('partial', ('1', 'RFC822', 1, 1024)),
1054 ('store', ('1', 'FLAGS', '(\Deleted)')),
1062 ('response',('UIDVALIDITY',)),
1063 ('uid', ('SEARCH', 'ALL')),
1064 ('response', ('EXISTS',)),
1065 ('append', (None, None, None, test_mesg
)),
1071 _mesg('%s %s' % (cmd
, args
))
1072 typ
, dat
= apply(eval('M.%s' % cmd
), args
)
1073 _mesg('%s => %s %s' % (cmd
, typ
, dat
))
1078 _mesg('PROTOCOL_VERSION = %s' % M
.PROTOCOL_VERSION
)
1080 for cmd
,args
in test_seq1
:
1083 for ml
in run('list', ('/tmp/', 'yy%')):
1084 mo
= re
.match(r
'.*"([^"]+)"$', ml
)
1085 if mo
: path
= mo
.group(1)
1086 else: path
= string
.split(ml
)[-1]
1087 run('delete', (path
,))
1089 for cmd
,args
in test_seq2
:
1090 dat
= run(cmd
, args
)
1092 if (cmd
,args
) != ('uid', ('SEARCH', 'ALL')):
1095 uid
= string
.split(dat
[-1])
1096 if not uid
: continue
1097 run('uid', ('FETCH', '%s' % uid
[-1],
1098 '(FLAGS INTERNALDATE RFC822.SIZE RFC822.HEADER RFC822.TEXT)'))
1100 print '\nAll tests OK.'
1103 print '\nTests failed.'
1107 If you would like to see debugging output,