Changeset 219:64d0a76281b8
- Timestamp:
- 17/04/11 10:55:21 (13 months ago)
- Branch:
- default
- Files:
-
- 2 modified
-
imapclient/imapclient.py (modified) (9 diffs)
-
livetest.py (modified) (3 diffs)
Legend:
- Unmodified
- Added
- Removed
-
imapclient/imapclient.py
r218 r219 6 6 import response_lexer 7 7 from operator import itemgetter 8 import socket 9 import time 8 10 import warnings 9 11 10 try: 11 import imaplib2 as imaplib 12 using_imaplib2 = True 13 except ImportError: 14 using_imaplib2 = False 15 import imaplib 16 12 import imaplib 17 13 #imaplib.Debug = 5 18 14 … … 26 22 27 23 28 __all__ = ['IMAPClient', 'DELETED', 'SEEN', 'ANSWERED', 'FLAGGED', 'DRAFT', 29 'RECENT', 'using_imaplib2'] 24 __all__ = ['IMAPClient', 'DELETED', 'SEEN', 'ANSWERED', 'FLAGGED', 'DRAFT', 'RECENT'] 30 25 31 26 from response_parser import parse_response, parse_fetch_response … … 34 29 if 'XLIST' not in imaplib.Commands: 35 30 imaplib.Commands['XLIST'] = imaplib.Commands['LIST'] 31 32 # ...and IDLE 33 if 'IDLE' not in imaplib.Commands: 34 imaplib.Commands['IDLE'] = imaplib.Commands['APPEND'] 36 35 37 36 … … 116 115 self.use_uid = use_uid 117 116 self.folder_encode = True 117 self._idle_tag = None 118 118 119 119 … … 295 295 typ, data = self._imap.select(self._encode_folder_name(folder), readonly) 296 296 self._checkok('select', typ, data) 297 if using_imaplib2: 298 untagged_responses = self._response_to_dict(self._imap.untagged_responses) 299 else: 300 untagged_responses = self._imap.untagged_responses 301 return self._process_select_response(untagged_responses) 302 303 def _response_to_dict(self, resp): 304 dictresp = {} 305 306 if len(resp) > 0: 307 for item in resp: 308 dictresp[item[0]] = item[1] 309 310 return dictresp 297 return self._process_select_response(self._imap.untagged_responses) 311 298 312 299 def _process_select_response(self, resp): … … 326 313 return out 327 314 328 def idle(self , timeout=None, callback=None, **kwargs):315 def idle(self): 329 316 """Put server into IDLE mode. 330 317 … … 341 328 #XXX what about finding out what the IDLE data was? 342 329 """ 343 if not using_imaplib2: 344 raise self.Error('The imaplib2 module is required to use IDLE') 345 346 if callback: 347 cb_arg_was_given = 'cb_arg' in kwargs 348 def _wrapped_cb(resp): 349 if resp[0]: 350 success = True 351 typ, data = resp[0] 352 response = (typ, data[0]) 330 self._idle_tag = self._imap._command('IDLE') 331 resp = self._imap._get_response() 332 if resp is not None: 333 raise self.Error('Unexpected IDLE response: %s' % resp) 334 335 def idle_check(self, timeout=None): 336 """XXX 337 """ 338 timeouts = 0 339 resps = [] 340 start_t = time.time() 341 def done(): 342 if timeout is None: 343 if resps and timeouts > 1: 344 return True 345 else: 346 if time.time() - start_t >= timeout: 347 return True 348 return False 349 350 # make the socket non-blocking so the timeout can be 351 # implemented for this call 352 self._imap.sock.settimeout(0.1) 353 try: 354 while not done(): 355 try: 356 line = self._imap._get_line() 357 except socket.timeout: 358 timeouts += 1 353 359 else: 354 success = False 355 response = resp[2] 356 if cb_arg_was_given: 357 cb_arg = resp[1] 358 callback(success, response, cb_arg) 359 else: 360 callback(success, response) 361 self._imap.idle(timeout, callback=_wrapped_cb, cb_arg=kwargs.get('cb_arg')) 362 else: 363 typ, data = self._imap.idle(timeout) 364 self._checkok('idle', typ, data) 365 return data[0] 366 360 resps.append(_parse_idle_response(line)) 361 return resps 362 finally: 363 self._imap.sock.settimeout(None) 364 365 def idle_done(self): 366 """XXX 367 """ 368 self._imap.send('DONE\r\n') 369 # Slurp up any remaining IDLE data until the IDLE is done 370 tag = self._idle_tag 371 tagged_commands = self._imap.tagged_commands 372 resps = [] 373 while True: 374 line = self._imap._get_response() 375 if tagged_commands[tag]: 376 break 377 resps.append(_parse_idle_response(line)) 378 self._idle_tag = None 379 typ, data = tagged_commands.pop(tag) 380 self._checkok('idle', typ, data) 381 return data[0], resps 382 367 383 def folder_status(self, folder, what=None): 368 384 """Requests the status from folder. … … 592 608 else: 593 609 tag = self._imap._command('FETCH', msg_list, parts_list, modifiers_list) 594 if using_imaplib2: 595 typ, data = self._imap._command_complete(tag, 'FETCH') 596 else: 597 typ, data = self._imap._command_complete('FETCH', tag) 610 typ, data = self._imap._command_complete('FETCH', tag) 598 611 self._checkok('fetch', typ, data) 599 612 typ, data = self._imap._untagged_response(typ, data, 'FETCH') … … 791 804 arg = arg.replace('"', '\\"') 792 805 return '"%s"' % arg 806 807 808 def _parse_idle_response(text): 809 assert text.startswith('* ') 810 text = text[2:] 811 if text.startswith(('OK ', 'NO ')): 812 return tuple(text.split(' ', 1)) 813 return parse_response([text]) 814 -
livetest.py
r218 r219 9 9 import os 10 10 import sys 11 import time12 11 import threading 13 12 from datetime import datetime … … 453 452 454 453 def test_idle(self): 455 if not imapclient.using_imaplib2: 456 return self.skipTest("imaplib2 is not installed") 457 458 idle_event = threading.Event() 459 cb_data = {} 460 def cb(success, response, cb_arg): 461 idle_event.set() 462 cb_data['success'] = success 463 cb_data['idle_response'] = response 464 cb_data['cb_arg'] = cb_arg 454 if not self.client.has_capability('IDLE'): 455 return self.skipTest("Server doesn't support IDLE") 456 457 #XXX update the example too 458 #XXX timeout check 459 #XXX out of order and interrupted idle 465 460 466 461 # Start main connection idling 467 462 self.client.select_folder('INBOX') 468 self.client.idle( timeout=20, callback=cb, cb_arg='foo')463 self.client.idle() 469 464 470 465 # Start a new connection and upload a new message … … 474 469 client2.logout() 475 470 476 idle_event.wait(15) # Wait for IDLE callback to trigger 477 478 self.assertTrue(idle_event.is_set()) 479 self.assertTrue(cb_data['success']) 480 idle_status, idle_text = cb_data['idle_response'] 481 self.assertEqual(idle_status, 'OK') 482 # Can't be too specific as different servers return different text 483 self.assertIn('idle', idle_text.lower()) 484 self.assertEqual(cb_data['cb_arg'], 'foo') 471 # Check for the idle data 472 responses = self.client.idle_check(timeout=1) 473 text, more_responses = self.client.idle_done() 474 475 self.assertIn((1, 'EXISTS'), responses) 476 self.assertIn('idle', text.lower()) 477 self.assertIsInstance(more_responses, list) 485 478 486 479 return LiveTest
