root/imapclient/response_lexer.py

Revision 229:85569119917a, 5.2 KB (checked in by Menno Smits <menno@…>, 8 months ago)

Halfway through re-working how BODY/BODYSTRUCTURE is handled so the parser doesn't have to treat paren lists without whitespace between them specially.

The failing tests light the way.

Line 
1# Copyright (c) 2011, Menno Smits
2# Released subject to the New BSD License
3# Please see http://en.wikipedia.org/wiki/BSD_licenses
4
5"""
6A lexical analyzer class for IMAP responses.
7
8Although Lexer does all the work, TokenSource is probably the class to
9use for external callers.
10"""
11
12# This was heavily inspired by (ie, ripped off from) python 2.6's shlex
13# module with further inspiration from the patch in
14# http://bugs.python.org/issue7594, but redone to be specific to IMAPs
15# requirements while offering nice performance by using generators everywhere.
16
17__all__ = ["Lexer"]
18
19
20class TokenSource(object):
21    """
22    A simple iterator for the Lexer class that also provides access to
23    the current IMAP literal.
24    """
25
26    def __init__(self, text):
27        lex = Lexer()
28        lex.sources = (LiteralHandlingIter(lex, chunk) for chunk in text)
29        self.lex = lex
30        self.src = iter(lex)
31
32    @property
33    def current_literal(self):
34        return self.lex.current_source.literal
35
36    def __iter__(self):
37        return self.src
38   
39
40class Lexer(object):
41    "A lexical analyzer class for IMAP"
42
43    CTRL_CHARS = ''.join([chr(ch) for ch in range(32)])
44    SPECIALS = r' ()%"[' + CTRL_CHARS
45    ALL_CHARS = [chr(ch) for ch in range(256)]
46    NON_SPECIALS = frozenset([ch for ch in ALL_CHARS if ch not in SPECIALS])
47    WHITESPACE = frozenset(' \t\r\n')
48
49    def __init__(self):
50        self.sources = None
51        self.current_source = None
52
53    def read_until(self, stream_i, end_char, escape=True):
54        token = ''
55        try:
56            for nextchar in stream_i:
57                if escape and nextchar == "\\":
58                    escaper = nextchar
59                    nextchar = stream_i.next()
60                    if nextchar != escaper and nextchar != end_char:
61                        token += escaper
62                elif nextchar == end_char:
63                    break
64                token += nextchar
65            else:
66                raise ValueError("No closing %r" % end_char)
67        except StopIteration:
68            raise ValueError("No closing %r" % end_char)
69        return token + end_char
70
71    def read_token_stream(self, stream_i):
72        whitespace = self.WHITESPACE
73        wordchars = self.NON_SPECIALS
74        read_until = self.read_until
75
76        while True:
77            # whitespace
78            for nextchar in stream_i:
79                if nextchar not in whitespace:
80                    stream_i.push(nextchar)
81                    break    # done skipping over the whitespace
82
83            # non whitespace
84            token = ''
85            for nextchar in stream_i:
86                if nextchar in wordchars:
87                    token += nextchar
88                elif nextchar == '[':
89                    token += nextchar + read_until(stream_i, ']', escape=False)
90                else:
91                    if nextchar in whitespace:
92                        yield token
93                    elif nextchar == '"':
94                        assert not token
95                        yield nextchar + read_until(stream_i, nextchar)
96                    else:
97                        # Other punctuation, eg. "(". This ends the current token.
98                        if token:
99                            yield token
100                        yield nextchar
101                    break
102            else:
103                if token:
104                    yield token
105                break
106
107    def __iter__(self):
108        "Generate tokens"
109        for source in self.sources:
110            self.current_source = source
111            for tok in self.read_token_stream(iter(source)):
112                yield tok
113
114
115# imaplib has poor handling of 'literals' - it both fails to remove the
116# {size} marker, and fails to keep responses grouped into the same logical
117# 'line'.  What we end up with is a list of response 'records', where each
118# record is either a simple string, or tuple of (str_with_lit, literal) -
119# where str_with_lit is a string with the {xxx} marker at its end.  Note
120# that each elt of this list does *not* correspond 1:1 with the untagged
121# responses.
122# (http://bugs.python.org/issue5045 also has comments about this)
123# So: we have a special file-like object for each of these records.  When
124# a string literal is finally processed, we peek into this file-like object
125# to grab the literal.
126class LiteralHandlingIter:
127    def __init__(self, lexer, resp_record):
128        self.lexer = lexer
129        if isinstance(resp_record, tuple):
130            # A 'record' with a string which includes a literal marker, and
131            # the literal itself.
132            src_text, self.literal = resp_record
133            assert src_text.endswith("}"), src_text
134            self.src_text = src_text
135        else:
136            # just a line with no literals.
137            self.src_text = resp_record
138            self.literal = None
139
140    def __iter__(self):
141        return PushableIterator(self.src_text)
142
143
144class PushableIterator(object):
145
146    NO_MORE = object()
147
148    def __init__(self, it):
149        self.it = iter(it)
150        self.pushed = []
151
152    def __iter__(self):
153        return self
154
155    def next(self):
156        if self.pushed:
157            return self.pushed.pop()
158        return self.it.next()
159
160    def push(self, item):
161        self.pushed.append(item)
162
163       
164       
Note: See TracBrowser for help on using the browser.