| | 170 | class TestIdle(IMAPClientTest): |
| | 171 | |
| | 172 | def test_idle(self): |
| | 173 | self.client._imap._command.return_value = sentinel.tag |
| | 174 | self.client._imap._get_response.return_value = None |
| | 175 | |
| | 176 | self.client.idle() |
| | 177 | |
| | 178 | self.client._imap._command.assert_called_with('IDLE') |
| | 179 | self.assertEqual(self.client._idle_tag, sentinel.tag) |
| | 180 | |
| | 181 | def test_idle_check_blocking(self): |
| | 182 | self.client._imap.sock = Mock() |
| | 183 | counter = itertools.count() |
| | 184 | def fake_get_line(): |
| | 185 | count = counter.next() |
| | 186 | if count == 0: |
| | 187 | return '* 1 EXISTS' |
| | 188 | elif count == 1: |
| | 189 | return '* 0 EXPUNGE' |
| | 190 | else: |
| | 191 | raise socket.timeout |
| | 192 | self.client._imap._get_line = fake_get_line |
| | 193 | |
| | 194 | responses = self.client.idle_check() |
| | 195 | self.assertListEqual([(1, 'EXISTS'), (0, 'EXPUNGE')], responses) |
| | 196 | |
| | 197 | def test_idle_check_timeout(self): |
| | 198 | self.client._imap.sock = Mock() |
| | 199 | def fake_get_line(): |
| | 200 | time.sleep(0.1) |
| | 201 | raise socket.timeout |
| | 202 | self.client._imap._get_line = fake_get_line |
| | 203 | |
| | 204 | t0 = time.time() |
| | 205 | responses = self.client.idle_check(timeout=0.5) |
| | 206 | t1 = time.time() |
| | 207 | |
| | 208 | self.assertListEqual([], responses) |
| | 209 | self.assertLess(t1 - t0, 0.8) |
| | 210 | |
| | 211 | def test_idle_check_with_data(self): |
| | 212 | self.client._imap.sock = Mock() |
| | 213 | |
| | 214 | counter = itertools.count() |
| | 215 | def fake_get_line(): |
| | 216 | count = counter.next() |
| | 217 | if count == 0: |
| | 218 | return '* 99 EXISTS' |
| | 219 | else: |
| | 220 | raise socket.timeout |
| | 221 | self.client._imap._get_line = fake_get_line |
| | 222 | |
| | 223 | t0 = time.time() |
| | 224 | responses = self.client.idle_check() |
| | 225 | t1 = time.time() |
| | 226 | |
| | 227 | self.assertListEqual([(99, 'EXISTS')], responses) |
| | 228 | self.assertLess(t1 - t0, 0.4) |
| | 229 | |
| | 230 | def test_idle_done(self): |
| | 231 | self.client._idle_tag = sentinel.tag |
| | 232 | self.client._imap.tagged_commands = {sentinel.tag: None} |
| | 233 | |
| | 234 | counter = itertools.count() |
| | 235 | def fake_get_response(): |
| | 236 | count = counter.next() |
| | 237 | if count == 0: |
| | 238 | return '* 99 EXISTS' |
| | 239 | self.client._imap.tagged_commands[sentinel.tag] = ('OK', ['Idle done']) |
| | 240 | self.client._imap._get_response = fake_get_response |
| | 241 | |
| | 242 | text, responses = self.client.idle_done() |
| | 243 | |
| | 244 | self.assertEqual(self.client._imap.tagged_commands, {}) |
| | 245 | self.assertEqual(text, 'Idle done') |
| | 246 | self.assertListEqual([(99, 'EXISTS')], responses) |
| | 247 | |