| 51 | | options = None |
| 52 | | |
| 53 | | class LiveServerTest(unittest2.TestCase): |
| 54 | | |
| 55 | | def setUp(self): |
| 56 | | self.client = imapclient.IMAPClient(options.host, use_uid=options.use_uid, |
| 57 | | ssl=options.ssl) |
| 58 | | self.client.login(options.username, options.password) |
| 59 | | self.clear_folders() |
| 60 | | self.unsub_all_folders() |
| 61 | | self.client.select_folder('INBOX') |
| 62 | | |
| 63 | | def tearDown(self): |
| 64 | | self.client.logout() |
| 65 | | |
| 66 | | def just_folder_names(self, dat): |
| 67 | | ret = [] |
| 68 | | for _, _, folder_name in dat: |
| 69 | | # gmail's "special" folders start with '[' |
| 70 | | if not folder_name.startswith('['): |
| 71 | | ret.append(folder_name) |
| 72 | | return ret |
| 73 | | |
| 74 | | def all_folder_names(self): |
| 75 | | return self.just_folder_names(self.client.list_folders()) |
| 76 | | |
| 77 | | def all_sub_folder_names(self): |
| 78 | | return self.just_folder_names(self.client.list_sub_folders()) |
| 79 | | |
| 80 | | def clear_folders(self): |
| 81 | | self.client.folder_encode = False |
| 82 | | for folder in self.all_folder_names(): |
| 83 | | if folder.upper() != 'INBOX': |
| | 50 | |
| | 51 | def createLiveTestClass(host, username, password, port, ssl, use_uid): |
| | 52 | |
| | 53 | class LiveTest(unittest2.TestCase): |
| | 54 | |
| | 55 | def setUp(self): |
| | 56 | self.client = imapclient.IMAPClient(host, port=port, use_uid=use_uid, ssl=ssl) |
| | 57 | self.client.login(username, password) |
| | 58 | self.clear_folders() |
| | 59 | self.unsub_all_folders() |
| | 60 | self.client.select_folder('INBOX') |
| | 61 | |
| | 62 | def tearDown(self): |
| | 63 | self.client.logout() |
| | 64 | |
| | 65 | def just_folder_names(self, dat): |
| | 66 | ret = [] |
| | 67 | for _, _, folder_name in dat: |
| | 68 | # gmail's "special" folders start with '[' |
| | 69 | if not folder_name.startswith('['): |
| | 70 | ret.append(folder_name) |
| | 71 | return ret |
| | 72 | |
| | 73 | def all_folder_names(self): |
| | 74 | return self.just_folder_names(self.client.list_folders()) |
| | 75 | |
| | 76 | def all_sub_folder_names(self): |
| | 77 | return self.just_folder_names(self.client.list_sub_folders()) |
| | 78 | |
| | 79 | def clear_folders(self): |
| | 80 | self.client.folder_encode = False |
| | 81 | for folder in self.all_folder_names(): |
| | 82 | if folder.upper() != 'INBOX': |
| | 83 | self.client.delete_folder(folder) |
| | 84 | self.client.folder_encode = True |
| | 85 | self.clear_folder('INBOX') |
| | 86 | |
| | 87 | def clear_folder(self, folder): |
| | 88 | self.client.select_folder(folder) |
| | 89 | self.client.delete_messages(self.client.search()) |
| | 90 | self.client.expunge() |
| | 91 | |
| | 92 | def unsub_all_folders(self): |
| | 93 | for folder in self.all_sub_folder_names(): |
| | 94 | self.client.unsubscribe_folder(folder) |
| | 95 | |
| | 96 | def is_gmail(self): |
| | 97 | return self.client._imap.host == 'imap.gmail.com' |
| | 98 | |
| | 99 | def test_capabilities(self): |
| | 100 | caps = self.client.capabilities() |
| | 101 | self.assertIsInstance(caps, tuple) |
| | 102 | self.assertGreater(len(caps), 1) |
| | 103 | for cap in caps: |
| | 104 | self.assertTrue(self.client.has_capability(cap)) |
| | 105 | self.assertFalse(self.client.has_capability('WONT EXIST')) |
| | 106 | |
| | 107 | def test_select_and_close(self): |
| | 108 | resp = self.client.select_folder('INBOX') |
| | 109 | self.assertIsInstance(resp['EXISTS'], int) |
| | 110 | self.assertEqual(resp['EXISTS'], 0) |
| | 111 | self.assertIsInstance(resp['RECENT'], int) |
| | 112 | self.assertIsInstance(resp['FLAGS'], tuple) |
| | 113 | self.assertGreater(len(resp['FLAGS']), 1) |
| | 114 | self.client.close_folder() |
| | 115 | |
| | 116 | def test_list_folders(self): |
| | 117 | some_folders = ['simple', r'foo\bar', r'test"folder"', u'L\xffR'] |
| | 118 | for name in some_folders: |
| | 119 | self.client.create_folder(name) |
| | 120 | |
| | 121 | folders = self.all_folder_names() |
| | 122 | self.assertGreater(len(folders), 1, 'No folders visible on server') |
| | 123 | self.assertIn('INBOX', [f.upper() for f in folders], 'INBOX not seen') |
| | 124 | |
| | 125 | for name in some_folders: |
| | 126 | self.assertIn(name, folders) |
| | 127 | |
| | 128 | #TODO: test LIST with wildcards |
| | 129 | |
| | 130 | def test_gmail_xlist(self): |
| | 131 | caps = self.client.capabilities() |
| | 132 | if self.is_gmail(): |
| | 133 | self.assertIn("XLIST", caps, "expected XLIST in Gmail's capabilities") |
| | 134 | |
| | 135 | def test_xlist(self): |
| | 136 | if not self.client.has_capability('XLIST'): |
| | 137 | return self.skipTest("Server doesn't support XLIST") |
| | 138 | |
| | 139 | result = self.client.xlist_folders() |
| | 140 | self.assertGreater(len(result), 0, 'No folders returned by XLIST') |
| | 141 | for flags, _, _ in result: |
| | 142 | if '\\INBOX' in [flag.upper() for flag in flags]: |
| | 143 | break |
| | 144 | else: |
| | 145 | self.fail('INBOX not returned in XLIST output') |
| | 146 | |
| | 147 | def test_subscriptions(self): |
| | 148 | test_folders = ['foobar', |
| | 149 | 'stuff & things', |
| | 150 | u'test & \u2622'] |
| | 151 | |
| | 152 | for folder in test_folders: |
| | 153 | self.client.create_folder(folder) |
| | 154 | |
| | 155 | all_folders = sorted(self.all_folder_names()) |
| | 156 | |
| | 157 | for folder in all_folders: |
| | 158 | self.client.subscribe_folder(folder) |
| | 159 | |
| | 160 | self.assertListEqual(all_folders, sorted(self.all_sub_folder_names())) |
| | 161 | |
| | 162 | for folder in all_folders: |
| | 163 | self.client.unsubscribe_folder(folder) |
| | 164 | self.assertListEqual(self.all_sub_folder_names(), []) |
| | 165 | |
| | 166 | self.assertRaises(imapclient.IMAPClient.Error, |
| | 167 | self.client.subscribe_folder, |
| | 168 | 'this folder is not likely to exist') |
| | 169 | |
| | 170 | |
| | 171 | def test_folders(self): |
| | 172 | self.assertTrue(self.client.folder_exists('INBOX')) |
| | 173 | self.assertFalse(self.client.folder_exists('this is very unlikely to exist')) |
| | 174 | |
| | 175 | test_folders = ['foobar', |
| | 176 | '"foobar"', |
| | 177 | 'foo "bar"', |
| | 178 | 'stuff & things', |
| | 179 | u'test & \u2622', |
| | 180 | '123'] |
| | 181 | |
| | 182 | for folder in test_folders: |
| | 183 | self.assertFalse(self.client.folder_exists(folder)) |
| | 184 | |
| | 185 | self.client.create_folder(folder) |
| | 186 | |
| | 187 | self.assertTrue(self.client.folder_exists(folder)) |
| | 188 | self.assertIn(folder, self.all_folder_names()) |
| | 189 | |
| | 190 | self.client.select_folder(folder) |
| | 191 | self.client.close_folder() |
| | 192 | |
| 85 | | self.client.folder_encode = True |
| 86 | | self.clear_folder('INBOX') |
| 87 | | |
| 88 | | def clear_folder(self, folder): |
| 89 | | self.client.select_folder(folder) |
| 90 | | self.client.delete_messages(self.client.search()) |
| 91 | | self.client.expunge() |
| 92 | | |
| 93 | | def unsub_all_folders(self): |
| 94 | | for folder in self.all_sub_folder_names(): |
| 95 | | self.client.unsubscribe_folder(folder) |
| 96 | | |
| 97 | | def is_gmail(self): |
| 98 | | return self.client._imap.host == 'imap.gmail.com' |
| 99 | | |
| 100 | | def test_capabilities(self): |
| 101 | | caps = self.client.capabilities() |
| 102 | | self.assertIsInstance(caps, tuple) |
| 103 | | self.assertGreater(len(caps), 1) |
| 104 | | for cap in caps: |
| 105 | | self.assertTrue(self.client.has_capability(cap)) |
| 106 | | self.assertFalse(self.client.has_capability('WONT EXIST')) |
| 107 | | |
| 108 | | def test_select_and_close(self): |
| 109 | | resp = self.client.select_folder('INBOX') |
| 110 | | self.assertIsInstance(resp['EXISTS'], int) |
| 111 | | self.assertEqual(resp['EXISTS'], 0) |
| 112 | | self.assertIsInstance(resp['RECENT'], int) |
| 113 | | self.assertIsInstance(resp['FLAGS'], tuple) |
| 114 | | self.assertGreater(len(resp['FLAGS']), 1) |
| 115 | | self.client.close_folder() |
| 116 | | |
| 117 | | def test_list_folders(self): |
| 118 | | some_folders = ['simple', r'foo\bar', r'test"folder"', u'L\xffR'] |
| 119 | | for name in some_folders: |
| 120 | | self.client.create_folder(name) |
| 121 | | |
| 122 | | folders = self.all_folder_names() |
| 123 | | self.assertGreater(len(folders), 1, 'No folders visible on server') |
| 124 | | self.assertIn('INBOX', [f.upper() for f in folders], 'INBOX not seen') |
| 125 | | |
| 126 | | for name in some_folders: |
| 127 | | self.assertIn(name, folders) |
| 128 | | |
| 129 | | #TODO: test LIST with wildcards |
| 130 | | |
| 131 | | def test_gmail_xlist(self): |
| 132 | | caps = self.client.capabilities() |
| 133 | | if self.is_gmail(): |
| 134 | | self.assertIn("XLIST", caps, "expected XLIST in Gmail's capabilities") |
| 135 | | |
| 136 | | def test_xlist(self): |
| 137 | | if not self.client.has_capability('XLIST'): |
| 138 | | return self.skipTest("Server doesn't support XLIST") |
| 139 | | |
| 140 | | result = self.client.xlist_folders() |
| 141 | | self.assertGreater(len(result), 0, 'No folders returned by XLIST') |
| 142 | | for flags, _, _ in result: |
| 143 | | if '\\INBOX' in [flag.upper() for flag in flags]: |
| 144 | | break |
| | 194 | self.assertFalse(self.client.folder_exists(folder)) |
| | 195 | |
| | 196 | |
| | 197 | def test_status(self): |
| | 198 | # Default behaviour should return 5 keys |
| | 199 | self.assertEqual(len(self.client.folder_status('INBOX')), 5) |
| | 200 | |
| | 201 | new_folder = u'test \u2622' |
| | 202 | self.client.create_folder(new_folder) |
| | 203 | try: |
| | 204 | status = self.client.folder_status(new_folder) |
| | 205 | self.assertEqual(status['MESSAGES'], 0) |
| | 206 | self.assertEqual(status['RECENT'], 0) |
| | 207 | self.assertEqual(status['UNSEEN'], 0) |
| | 208 | |
| | 209 | # Add a message to the folder, it should show up now. |
| | 210 | self.client.append(new_folder, SIMPLE_MESSAGE) |
| | 211 | |
| | 212 | status = self.client.folder_status(new_folder) |
| | 213 | self.assertEqual(status['MESSAGES'], 1) |
| | 214 | if not self.is_gmail(): |
| | 215 | self.assertEqual(status['RECENT'], 1) |
| | 216 | self.assertEqual(status['UNSEEN'], 1) |
| | 217 | finally: |
| | 218 | self.client.delete_folder(new_folder) |
| | 219 | |
| | 220 | def test_append(self): |
| | 221 | # Message time microseconds are set to 0 because the server will return |
| | 222 | # time with only seconds precision. |
| | 223 | msg_time = datetime.now().replace(microsecond=0) |
| | 224 | |
| | 225 | # Append message |
| | 226 | resp = self.client.append('INBOX', SIMPLE_MESSAGE, ('abc', 'def'), msg_time) |
| | 227 | self.assertIsInstance(resp, str) |
| | 228 | |
| | 229 | # Retrieve the just added message and check that all looks well |
| | 230 | self.assertEqual(self.client.select_folder('INBOX')['EXISTS'], 1) |
| | 231 | |
| | 232 | resp = self.client.fetch(self.client.search()[0], ('RFC822', 'FLAGS', 'INTERNALDATE')) |
| | 233 | |
| | 234 | self.assertEqual(len(resp), 1) |
| | 235 | msginfo = resp.values()[0] |
| | 236 | |
| | 237 | # Time should match the time we specified |
| | 238 | returned_msg_time = msginfo['INTERNALDATE'] |
| | 239 | self.assertIsNone(returned_msg_time.tzinfo) |
| | 240 | self.assertEqual(returned_msg_time, msg_time) |
| | 241 | |
| | 242 | # Flags should be the same |
| | 243 | self.assertIn('abc', msginfo['FLAGS']) |
| | 244 | self.assertIn('def', msginfo['FLAGS']) |
| | 245 | |
| | 246 | # Message body should match |
| | 247 | self.assertEqual(msginfo['RFC822'], SIMPLE_MESSAGE) |
| | 248 | |
| | 249 | |
| | 250 | def test_flags(self): |
| | 251 | self.client.append('INBOX', SIMPLE_MESSAGE) |
| | 252 | msgid = self.client.search()[0] |
| | 253 | |
| | 254 | def _flagtest(func, args, expected_flags): |
| | 255 | answer = func(msgid, *args) |
| | 256 | self.assertTrue(answer.has_key(msgid)) |
| | 257 | answer_flags = set(answer[msgid]) |
| | 258 | answer_flags.discard(r'\Recent') # Might be present but don't care |
| | 259 | self.assertSetEqual(answer_flags, set(expected_flags)) |
| | 260 | |
| | 261 | base_flags = ['abc', 'def'] |
| | 262 | _flagtest(self.client.set_flags, [base_flags], base_flags) |
| | 263 | _flagtest(self.client.get_flags, [], base_flags) |
| | 264 | _flagtest(self.client.add_flags, ['boo'], base_flags + ['boo']) |
| | 265 | _flagtest(self.client.remove_flags, ['boo'], base_flags) |
| | 266 | |
| | 267 | def test_search(self): |
| | 268 | # Add some test messages |
| | 269 | msg_tmpl = 'Subject: %s\r\n\r\nBody' |
| | 270 | subjects = ('a', 'b', 'c') |
| | 271 | for subject in subjects: |
| | 272 | msg = msg_tmpl % subject |
| | 273 | if subject == 'c': |
| | 274 | flags = (imapclient.DELETED,) |
| | 275 | else: |
| | 276 | flags = () |
| | 277 | self.client.append('INBOX', msg, flags) |
| | 278 | |
| | 279 | # Check we see all messages |
| | 280 | messages_all = self.client.search('ALL') |
| | 281 | if self.is_gmail(): |
| | 282 | # Gmail seems to never return deleted items. |
| | 283 | self.assertEqual(len(messages_all), len(subjects) - 1) |
| 216 | | self.assertEqual(status['RECENT'], 1) |
| 217 | | self.assertEqual(status['UNSEEN'], 1) |
| 218 | | finally: |
| 219 | | self.client.delete_folder(new_folder) |
| 220 | | |
| 221 | | def test_append(self): |
| 222 | | # Message time microseconds are set to 0 because the server will return |
| 223 | | # time with only seconds precision. |
| 224 | | msg_time = datetime.now().replace(microsecond=0) |
| 225 | | |
| 226 | | # Append message |
| 227 | | resp = self.client.append('INBOX', SIMPLE_MESSAGE, ('abc', 'def'), msg_time) |
| 228 | | self.assertIsInstance(resp, str) |
| 229 | | |
| 230 | | # Retrieve the just added message and check that all looks well |
| 231 | | self.assertEqual(self.client.select_folder('INBOX')['EXISTS'], 1) |
| 232 | | |
| 233 | | resp = self.client.fetch(self.client.search()[0], ('RFC822', 'FLAGS', 'INTERNALDATE')) |
| 234 | | |
| 235 | | self.assertEqual(len(resp), 1) |
| 236 | | msginfo = resp.values()[0] |
| 237 | | |
| 238 | | # Time should match the time we specified |
| 239 | | returned_msg_time = msginfo['INTERNALDATE'] |
| 240 | | self.assertIsNone(returned_msg_time.tzinfo) |
| 241 | | self.assertEqual(returned_msg_time, msg_time) |
| 242 | | |
| 243 | | # Flags should be the same |
| 244 | | self.assertIn('abc', msginfo['FLAGS']) |
| 245 | | self.assertIn('def', msginfo['FLAGS']) |
| 246 | | |
| 247 | | # Message body should match |
| 248 | | self.assertEqual(msginfo['RFC822'], SIMPLE_MESSAGE) |
| 249 | | |
| 250 | | |
| 251 | | def test_flags(self): |
| 252 | | self.client.append('INBOX', SIMPLE_MESSAGE) |
| 253 | | msgid = self.client.search()[0] |
| 254 | | |
| 255 | | def _flagtest(func, args, expected_flags): |
| 256 | | answer = func(msgid, *args) |
| 257 | | self.assertTrue(answer.has_key(msgid)) |
| 258 | | answer_flags = set(answer[msgid]) |
| 259 | | answer_flags.discard(r'\Recent') # Might be present but don't care |
| 260 | | self.assertSetEqual(answer_flags, set(expected_flags)) |
| 261 | | |
| 262 | | base_flags = ['abc', 'def'] |
| 263 | | _flagtest(self.client.set_flags, [base_flags], base_flags) |
| 264 | | _flagtest(self.client.get_flags, [], base_flags) |
| 265 | | _flagtest(self.client.add_flags, ['boo'], base_flags + ['boo']) |
| 266 | | _flagtest(self.client.remove_flags, ['boo'], base_flags) |
| 267 | | |
| 268 | | def test_search(self): |
| 269 | | # Add some test messages |
| 270 | | msg_tmpl = 'Subject: %s\r\n\r\nBody' |
| 271 | | subjects = ('a', 'b', 'c') |
| 272 | | for subject in subjects: |
| 273 | | msg = msg_tmpl % subject |
| 274 | | if subject == 'c': |
| 275 | | flags = (imapclient.DELETED,) |
| 276 | | else: |
| 277 | | flags = () |
| 278 | | self.client.append('INBOX', msg, flags) |
| 279 | | |
| 280 | | # Check we see all messages |
| 281 | | messages_all = self.client.search('ALL') |
| 282 | | if self.is_gmail(): |
| 283 | | # Gmail seems to never return deleted items. |
| 284 | | self.assertEqual(len(messages_all), len(subjects) - 1) |
| 285 | | else: |
| 286 | | self.assertEqual(len(messages_all), len(subjects)) |
| 287 | | self.assertListEqual(self.client.search(), messages_all) # Check default |
| 288 | | |
| 289 | | # Single criteria |
| 290 | | if not self.is_gmail(): |
| 291 | | self.assertEqual(len(self.client.search('DELETED')), 1) |
| 292 | | self.assertEqual(len(self.client.search('NOT DELETED')), len(subjects) - 1) |
| 293 | | self.assertListEqual(self.client.search('NOT DELETED'), self.client.search(['NOT DELETED'])) |
| 294 | | |
| 295 | | # Multiple criteria |
| 296 | | self.assertEqual(len(self.client.search(['NOT DELETED', 'SMALLER 100'])), len(subjects) - 1) |
| 297 | | self.assertEqual(len(self.client.search(['NOT DELETED', 'SUBJECT "a"'])), 1) |
| 298 | | self.assertEqual(len(self.client.search(['NOT DELETED', 'SUBJECT "c"'])), 0) |
| 299 | | |
| 300 | | |
| 301 | | def test_copy(self): |
| 302 | | self.client.select_folder('INBOX') |
| 303 | | self.client.append('INBOX', SIMPLE_MESSAGE) |
| 304 | | self.client.create_folder('target') |
| 305 | | msg_id = self.client.search()[0] |
| 306 | | |
| 307 | | self.client.copy(msg_id, 'target') |
| 308 | | |
| 309 | | self.client.select_folder('target') |
| 310 | | msgs = self.client.search() |
| 311 | | self.assertEqual(len(msgs), 1) |
| 312 | | msg_id = msgs[0] |
| 313 | | self.assertIn('something', self.client.fetch(msg_id, ['RFC822'])[msg_id]['RFC822']) |
| 314 | | |
| 315 | | |
| 316 | | def test_fetch(self): |
| 317 | | self.client.select_folder('INBOX') |
| 318 | | self.client.append('INBOX', MULTIPART_MESSAGE) |
| 319 | | |
| 320 | | fields = ['RFC822', 'FLAGS', 'INTERNALDATE', 'ENVELOPE'] |
| 321 | | msg_id = self.client.search()[0] |
| 322 | | resp = self.client.fetch(msg_id, fields) |
| 323 | | |
| 324 | | self.assertEqual(len(resp), 1) |
| 325 | | msginfo = resp[msg_id] |
| 326 | | |
| 327 | | self.assertSetEqual(set(msginfo.keys()), set(fields + ['SEQ'])) |
| 328 | | self.assertEqual(msginfo['SEQ'], 1) |
| 329 | | self.assertMultiLineEqual(msginfo['RFC822'], MULTIPART_MESSAGE) |
| 330 | | self.assertIsInstance(msginfo['INTERNALDATE'], datetime) |
| 331 | | self.assertIsInstance(msginfo['FLAGS'], tuple) |
| 332 | | self.assertTupleEqual(msginfo['ENVELOPE'], |
| 333 | | ('Tue, 16 Mar 2010 16:45:32 +0000', |
| 334 | | 'A multipart message', |
| 335 | | (('Bob Smith', None, 'bob', 'smith.com'),), |
| 336 | | (('Bob Smith', None, 'bob', 'smith.com'),), |
| 337 | | (('Bob Smith', None, 'bob', 'smith.com'),), |
| 338 | | (('Some One', None, 'some', 'one.com'),), |
| 339 | | None, None, None, |
| 340 | | '<1A472770E042064698CB5ADC83A12ACD39455AAB@ABC>')) |
| 341 | | |
| 342 | | |
| 343 | | def test_partial_fetch(self): |
| 344 | | self.client.append('INBOX', MULTIPART_MESSAGE) |
| 345 | | self.client.select_folder('INBOX') |
| 346 | | msg_id = self.client.search()[0] |
| 347 | | |
| 348 | | resp = self.client.fetch(msg_id, ['BODY[]<0.20>']) |
| 349 | | body = resp[msg_id]['BODY[]<0>'] |
| 350 | | self.assertEqual(len(body), 20) |
| 351 | | self.assertTrue(body.startswith('From: Bob Smith')) |
| 352 | | |
| 353 | | resp = self.client.fetch(msg_id, ['BODY[]<2.25>']) |
| 354 | | body = resp[msg_id]['BODY[]<2>'] |
| 355 | | self.assertEqual(len(body), 25) |
| 356 | | self.assertTrue(body.startswith('om: Bob Smith')) |
| 357 | | |
| 358 | | def test_BODYSTRUCTURE(self): |
| 359 | | self.client.select_folder('INBOX') |
| 360 | | self.client.append('INBOX', SIMPLE_MESSAGE) |
| 361 | | self.client.append('INBOX', MULTIPART_MESSAGE) |
| 362 | | msgs = self.client.search() |
| 363 | | |
| 364 | | fetched = self.client.fetch(msgs, ['BODY', 'BODYSTRUCTURE']) |
| 365 | | |
| 366 | | # The expected test data is the same for BODY and BODYSTRUCTURE |
| 367 | | # since we can't predicate what the server we're testing against |
| 368 | | # will return. |
| 369 | | |
| 370 | | expected = ('text', 'plain', ('charset', 'us-ascii'), None, None, '7bit', 5, 1) |
| 371 | | self.check_BODYSTRUCTURE(expected, fetched[msgs[0]]['BODY'], multipart=False) |
| 372 | | self.check_BODYSTRUCTURE(expected, fetched[msgs[0]]['BODYSTRUCTURE'], multipart=False) |
| 373 | | |
| 374 | | expected = ([('text', 'html', ('charset', 'us-ascii'), None, None, 'quoted-printable', 55, 3), |
| 375 | | ('text', 'plain', ('charset', 'us-ascii'), None, None, '7bit', 26, 1), |
| 376 | | ], |
| 377 | | 'mixed', |
| 378 | | ('boundary', '===============1534046211==')) |
| 379 | | self.check_BODYSTRUCTURE(expected, fetched[msgs[1]]['BODY'], multipart=True) |
| 380 | | self.check_BODYSTRUCTURE(expected, fetched[msgs[1]]['BODYSTRUCTURE'], multipart=True) |
| 381 | | |
| 382 | | def check_BODYSTRUCTURE(self, expected, actual, multipart=None): |
| 383 | | if multipart is not None: |
| 384 | | self.assertEqual(actual.is_multipart, multipart) |
| 385 | | |
| 386 | | # BODYSTRUCTURE lengths can various according to the server so |
| 387 | | # compare up until what is returned |
| 388 | | for e, a in zip(expected, actual): |
| 389 | | if have_matching_types(e, a, (list, tuple)): |
| 390 | | for expected_and_actual in zip(e, a): |
| 391 | | self.check_BODYSTRUCTURE(*expected_and_actual) |
| 392 | | else: |
| 393 | | if e == ('charset', 'us-ascii') and a is None: |
| 394 | | pass # Some servers (eg. Gmail) don't return a charset when it's us-ascii |
| | 290 | self.assertEqual(len(self.client.search('DELETED')), 1) |
| | 291 | self.assertEqual(len(self.client.search('NOT DELETED')), len(subjects) - 1) |
| | 292 | self.assertListEqual(self.client.search('NOT DELETED'), self.client.search(['NOT DELETED'])) |
| | 293 | |
| | 294 | # Multiple criteria |
| | 295 | self.assertEqual(len(self.client.search(['NOT DELETED', 'SMALLER 100'])), len(subjects) - 1) |
| | 296 | self.assertEqual(len(self.client.search(['NOT DELETED', 'SUBJECT "a"'])), 1) |
| | 297 | self.assertEqual(len(self.client.search(['NOT DELETED', 'SUBJECT "c"'])), 0) |
| | 298 | |
| | 299 | |
| | 300 | def test_copy(self): |
| | 301 | self.client.select_folder('INBOX') |
| | 302 | self.client.append('INBOX', SIMPLE_MESSAGE) |
| | 303 | self.client.create_folder('target') |
| | 304 | msg_id = self.client.search()[0] |
| | 305 | |
| | 306 | self.client.copy(msg_id, 'target') |
| | 307 | |
| | 308 | self.client.select_folder('target') |
| | 309 | msgs = self.client.search() |
| | 310 | self.assertEqual(len(msgs), 1) |
| | 311 | msg_id = msgs[0] |
| | 312 | self.assertIn('something', self.client.fetch(msg_id, ['RFC822'])[msg_id]['RFC822']) |
| | 313 | |
| | 314 | |
| | 315 | def test_fetch(self): |
| | 316 | self.client.select_folder('INBOX') |
| | 317 | self.client.append('INBOX', MULTIPART_MESSAGE) |
| | 318 | |
| | 319 | fields = ['RFC822', 'FLAGS', 'INTERNALDATE', 'ENVELOPE'] |
| | 320 | msg_id = self.client.search()[0] |
| | 321 | resp = self.client.fetch(msg_id, fields) |
| | 322 | |
| | 323 | self.assertEqual(len(resp), 1) |
| | 324 | msginfo = resp[msg_id] |
| | 325 | |
| | 326 | self.assertSetEqual(set(msginfo.keys()), set(fields + ['SEQ'])) |
| | 327 | self.assertEqual(msginfo['SEQ'], 1) |
| | 328 | self.assertMultiLineEqual(msginfo['RFC822'], MULTIPART_MESSAGE) |
| | 329 | self.assertIsInstance(msginfo['INTERNALDATE'], datetime) |
| | 330 | self.assertIsInstance(msginfo['FLAGS'], tuple) |
| | 331 | self.assertTupleEqual(msginfo['ENVELOPE'], |
| | 332 | ('Tue, 16 Mar 2010 16:45:32 +0000', |
| | 333 | 'A multipart message', |
| | 334 | (('Bob Smith', None, 'bob', 'smith.com'),), |
| | 335 | (('Bob Smith', None, 'bob', 'smith.com'),), |
| | 336 | (('Bob Smith', None, 'bob', 'smith.com'),), |
| | 337 | (('Some One', None, 'some', 'one.com'),), |
| | 338 | None, None, None, |
| | 339 | '<1A472770E042064698CB5ADC83A12ACD39455AAB@ABC>')) |
| | 340 | |
| | 341 | |
| | 342 | def test_partial_fetch(self): |
| | 343 | self.client.append('INBOX', MULTIPART_MESSAGE) |
| | 344 | self.client.select_folder('INBOX') |
| | 345 | msg_id = self.client.search()[0] |
| | 346 | |
| | 347 | resp = self.client.fetch(msg_id, ['BODY[]<0.20>']) |
| | 348 | body = resp[msg_id]['BODY[]<0>'] |
| | 349 | self.assertEqual(len(body), 20) |
| | 350 | self.assertTrue(body.startswith('From: Bob Smith')) |
| | 351 | |
| | 352 | resp = self.client.fetch(msg_id, ['BODY[]<2.25>']) |
| | 353 | body = resp[msg_id]['BODY[]<2>'] |
| | 354 | self.assertEqual(len(body), 25) |
| | 355 | self.assertTrue(body.startswith('om: Bob Smith')) |
| | 356 | |
| | 357 | def test_BODYSTRUCTURE(self): |
| | 358 | self.client.select_folder('INBOX') |
| | 359 | self.client.append('INBOX', SIMPLE_MESSAGE) |
| | 360 | self.client.append('INBOX', MULTIPART_MESSAGE) |
| | 361 | msgs = self.client.search() |
| | 362 | |
| | 363 | fetched = self.client.fetch(msgs, ['BODY', 'BODYSTRUCTURE']) |
| | 364 | |
| | 365 | # The expected test data is the same for BODY and BODYSTRUCTURE |
| | 366 | # since we can't predicate what the server we're testing against |
| | 367 | # will return. |
| | 368 | |
| | 369 | expected = ('text', 'plain', ('charset', 'us-ascii'), None, None, '7bit', 5, 1) |
| | 370 | self.check_BODYSTRUCTURE(expected, fetched[msgs[0]]['BODY'], multipart=False) |
| | 371 | self.check_BODYSTRUCTURE(expected, fetched[msgs[0]]['BODYSTRUCTURE'], multipart=False) |
| | 372 | |
| | 373 | expected = ([('text', 'html', ('charset', 'us-ascii'), None, None, 'quoted-printable', 55, 3), |
| | 374 | ('text', 'plain', ('charset', 'us-ascii'), None, None, '7bit', 26, 1), |
| | 375 | ], |
| | 376 | 'mixed', |
| | 377 | ('boundary', '===============1534046211==')) |
| | 378 | self.check_BODYSTRUCTURE(expected, fetched[msgs[1]]['BODY'], multipart=True) |
| | 379 | self.check_BODYSTRUCTURE(expected, fetched[msgs[1]]['BODYSTRUCTURE'], multipart=True) |
| | 380 | |
| | 381 | def check_BODYSTRUCTURE(self, expected, actual, multipart=None): |
| | 382 | if multipart is not None: |
| | 383 | self.assertEqual(actual.is_multipart, multipart) |
| | 384 | |
| | 385 | # BODYSTRUCTURE lengths can various according to the server so |
| | 386 | # compare up until what is returned |
| | 387 | for e, a in zip(expected, actual): |
| | 388 | if have_matching_types(e, a, (list, tuple)): |
| | 389 | for expected_and_actual in zip(e, a): |
| | 390 | self.check_BODYSTRUCTURE(*expected_and_actual) |