summaryrefslogtreecommitdiff
path: root/eocTests.py
diff options
context:
space:
mode:
authorLars Wirzenius <liw@esme>2005-12-03 20:41:56 +0200
committerLars Wirzenius <liw@esme>2005-12-03 20:41:56 +0200
commit0f7cd8b5551e2ec4333cb10561375872dd7e03d3 (patch)
tree217926700638acfa7fe1b523dae7f490308606e9 /eocTests.py
downloadeoc-0f7cd8b5551e2ec4333cb10561375872dd7e03d3.tar.gz
Initial import.
Diffstat (limited to 'eocTests.py')
-rw-r--r--eocTests.py1100
1 files changed, 1100 insertions, 0 deletions
diff --git a/eocTests.py b/eocTests.py
new file mode 100644
index 0000000..acf99b3
--- /dev/null
+++ b/eocTests.py
@@ -0,0 +1,1100 @@
+import unittest
+import shutil
+import os
+import re
+import time
+import string
+
+import eoc
+
+DOTDIR = "dot-dir-for-testing"
+eoc.DOTDIR = DOTDIR
+eoc.quiet = 1
+
+def no_op(*args):
+ pass
+
+class AddressCleaningTestCases(unittest.TestCase):
+
+ def setUp(self):
+ self.ap = eoc.AddressParser(["foo@EXAMPLE.com",
+ "bar@lists.example.com"])
+
+ def verify(self, address, wanted, skip_prefix=None, forced_domain=None):
+ self.ap.set_skip_prefix(skip_prefix)
+ self.ap.set_forced_domain(forced_domain)
+ address = self.ap.clean(address)
+ self.failUnlessEqual(address, wanted)
+
+ def testSimpleAddress(self):
+ self.verify("foo@example.com", "foo@example.com")
+
+ def testUpperCaseAddress(self):
+ self.verify("FOO@EXAMPLE.COM", "foo@example.com")
+
+ def testPrefixRemoval(self):
+ self.verify("foo@example.com", "foo@example.com",
+ skip_prefix="user-")
+ self.verify("user-foo@example.com", "foo@example.com",
+ skip_prefix="user-")
+
+ def testForcedDomain(self):
+ self.verify("foo@example.com", "foo@example.com",
+ forced_domain="example.com")
+ self.verify("foo@whatever.example.com", "foo@example.com",
+ forced_domain="example.com")
+
+ def testPrefixRemovalWithForcedDomain(self):
+ self.verify("foo@example.com", "foo@example.com",
+ skip_prefix="user-",
+ forced_domain="example.com")
+ self.verify("foo@whatever.example.com", "foo@example.com",
+ skip_prefix="user-",
+ forced_domain="example.com")
+ self.verify("user-foo@example.com", "foo@example.com",
+ skip_prefix="user-",
+ forced_domain="example.com")
+ self.verify("user-foo@whatever.example.com", "foo@example.com",
+ skip_prefix="user-",
+ forced_domain="example.com")
+
+class AddressParserTestCases(unittest.TestCase):
+
+ def setUp(self):
+ self.ap = eoc.AddressParser(["foo@EXAMPLE.com",
+ "bar@lists.example.com"])
+
+ def verify_parser(self, address, wanted_listname, wanted_parts):
+ listname, parts = self.ap.parse(address)
+ self.failUnlessEqual(listname, wanted_listname)
+ self.failUnlessEqual(parts, wanted_parts)
+
+ def testParser(self):
+ self.verify_parser("foo@example.com",
+ "foo@EXAMPLE.com",
+ [])
+ self.verify_parser("foo-subscribe@example.com",
+ "foo@EXAMPLE.com",
+ ["subscribe"])
+ self.verify_parser("foo-subscribe-joe=example.com@example.com",
+ "foo@EXAMPLE.com",
+ ["subscribe", "joe=example.com"])
+ self.verify_parser("foo-bounce-123-ABCDEF@example.com",
+ "foo@EXAMPLE.com",
+ ["bounce", "123", "abcdef"])
+
+class ParseRecipientAddressBase(unittest.TestCase):
+
+ def setUp(self):
+ self.lists = ["foo@example.com",
+ "bar@lists.example.com",
+ "foo-announce@example.com"]
+ self.mlm = eoc.MailingListManager(DOTDIR, lists=self.lists)
+
+ def environ(self, sender, recipient):
+ eoc.set_environ({
+ "SENDER": sender,
+ "RECIPIENT": recipient,
+ })
+
+class ParseUnsignedAddressTestCases(ParseRecipientAddressBase):
+
+ def testEmpty(self):
+ self.failUnlessRaises(eoc.UnknownList,
+ self.mlm.parse_recipient_address,
+ "", None, None)
+
+ def verify(self, address, skip_prefix, forced_domain, wanted_dict):
+ dict = self.mlm.parse_recipient_address(address, skip_prefix,
+ forced_domain)
+ self.failUnlessEqual(dict, wanted_dict)
+
+ def testSimpleAddresses(self):
+ self.verify("foo@example.com",
+ None,
+ None,
+ { "name": "foo@example.com", "command": "post" })
+ self.verify("FOO@EXAMPLE.COM",
+ None,
+ None,
+ { "name": "foo@example.com", "command": "post" })
+ self.verify("prefix-foo@example.com",
+ "prefix-",
+ None,
+ { "name": "foo@example.com", "command": "post" })
+ self.verify("bar@example.com",
+ None,
+ "lists.example.com",
+ { "name": "bar@lists.example.com", "command": "post" })
+ self.verify("prefix-bar@example.com",
+ "prefix-",
+ "lists.example.com",
+ { "name": "bar@lists.example.com", "command": "post" })
+
+ def testSubscription(self):
+ self.verify("foo-subscribe@example.com",
+ None,
+ None,
+ { "name": "foo@example.com",
+ "command": "subscribe",
+ "sender": "",
+ })
+ self.verify("foo-subscribe-joe-user=example.com@example.com",
+ None,
+ None,
+ { "name": "foo@example.com",
+ "command": "subscribe",
+ "sender": "joe-user@example.com",
+ })
+ self.verify("foo-unsubscribe@example.com",
+ None,
+ None,
+ { "name": "foo@example.com",
+ "command": "unsubscribe",
+ "sender": "",
+ })
+ self.verify("foo-unsubscribe-joe-user=example.com@example.com",
+ None,
+ None,
+ { "name": "foo@example.com",
+ "command": "unsubscribe",
+ "sender": "joe-user@example.com",
+ })
+
+ def testPost(self):
+ for name in self.lists:
+ self.verify(name, None, None, { "name": name, "command": "post" })
+
+ def testSimpleCommands(self):
+ for name in self.lists:
+ for command in ["help", "list", "owner"]:
+ localpart, domain = name.split("@")
+ address = "%s-%s@%s" % (localpart, command, domain)
+ self.verify(address, None, None,
+ { "name": name,
+ "command": command
+ })
+
+class ParseWellSignedAddressTestCases(ParseRecipientAddressBase):
+
+ def try_good_signature(self, command):
+ s = "foo-announce-%s-1" % command
+ hash = self.mlm.compute_hash("%s@%s" % (s, "example.com"))
+ local_part = "%s-%s" % (s, hash)
+ dict = self.mlm.parse_recipient_address("%s@example.com" % local_part,
+ None, None)
+ self.failUnlessEqual(dict,
+ {
+ "name": "foo-announce@example.com",
+ "command": command,
+ "id": "1",
+ })
+
+ def testProperlySignedCommands(self):
+ self.try_good_signature("subyes")
+ self.try_good_signature("subapprove")
+ self.try_good_signature("subreject")
+ self.try_good_signature("unsubyes")
+ self.try_good_signature("bounce")
+ self.try_good_signature("approve")
+ self.try_good_signature("reject")
+ self.try_good_signature("probe")
+
+class ParseBadlySignedAddressTestCases(ParseRecipientAddressBase):
+
+ def try_bad_signature(self, command_part):
+ self.failUnlessRaises(eoc.BadSignature,
+ self.mlm.parse_recipient_address,
+ "foo-announce-" + command_part +
+ "-123-badhash@example.com",
+ None, None)
+
+ def testBadlySignedCommands(self):
+ self.try_bad_signature("subyes")
+ self.try_bad_signature("subapprove")
+ self.try_bad_signature("subreject")
+ self.try_bad_signature("unsubyes")
+ self.try_bad_signature("bounce")
+ self.try_bad_signature("approve")
+ self.try_bad_signature("reject")
+ self.try_bad_signature("probe")
+
+class DotDirTestCases(unittest.TestCase):
+
+ def setUp(self):
+ self.secret_name = os.path.join(DOTDIR, "secret")
+
+ def tearDown(self):
+ shutil.rmtree(DOTDIR)
+
+ def dotdir_is_ok(self):
+ self.failUnless(os.path.isdir(DOTDIR))
+ self.failUnless(os.path.isfile(self.secret_name))
+
+ def testNoDotDirExists(self):
+ self.failIf(os.path.exists(DOTDIR))
+ mlm = eoc.MailingListManager(DOTDIR)
+ self.dotdir_is_ok()
+
+ def testDotDirDoesExistButSecretDoesNot(self):
+ self.failIf(os.path.exists(DOTDIR))
+ os.makedirs(DOTDIR)
+ self.failUnless(os.path.isdir(DOTDIR))
+ self.failIf(os.path.exists(self.secret_name))
+ mlm = eoc.MailingListManager(DOTDIR)
+ self.dotdir_is_ok()
+
+class ListBase(unittest.TestCase):
+
+ def setUp(self):
+ if os.path.exists(DOTDIR):
+ shutil.rmtree(DOTDIR)
+ self.mlm = eoc.MailingListManager(DOTDIR)
+
+ def tearDown(self):
+ self.mlm = None
+ shutil.rmtree(DOTDIR)
+
+class ListCreationTestCases(ListBase):
+
+ def setUp(self):
+ ListBase.setUp(self)
+ self.names = None
+
+ def listdir(self, listname):
+ return os.path.join(DOTDIR, listname)
+
+ def listdir_has_file(self, listdir, filename):
+ self.failUnless(os.path.isfile(os.path.join(listdir, filename)))
+ self.names.remove(filename)
+
+ def listdir_has_dir(self, listdir, dirname):
+ self.failUnless(os.path.isdir(os.path.join(listdir, dirname)))
+ self.names.remove(dirname)
+
+ def listdir_may_have_dir(self, listdir, dirname):
+ if dirname in self.names:
+ self.listdir_has_dir(listdir, dirname)
+
+ def listdir_is_ok(self, listname):
+ listdir = self.listdir(listname)
+ self.failUnless(os.path.isdir(listdir))
+ self.names = os.listdir(listdir)
+
+ self.listdir_has_file(listdir, "config")
+ self.listdir_has_file(listdir, "subscribers")
+
+ self.listdir_has_dir(listdir, "bounce-box")
+ self.listdir_has_dir(listdir, "subscription-box")
+
+ self.listdir_may_have_dir(listdir, "moderation-box")
+ self.listdir_may_have_dir(listdir, "templates")
+
+ # Make sure there are no extras.
+ self.failUnlessEqual(self.names, [])
+
+ def testCreateNew(self):
+ self.failIf(os.path.exists(self.listdir("foo@example.com")))
+ ml = self.mlm.create_list("foo@example.com")
+
+ self.failUnlessEqual(ml.__class__, eoc.MailingList)
+ self.failUnlessEqual(ml.dirname, self.listdir("foo@example.com"))
+
+ self.listdir_is_ok("foo@example.com")
+
+ def testCreateExisting(self):
+ list = self.mlm.create_list("foo@example.com")
+ self.failUnlessRaises(eoc.ListExists,
+ self.mlm.create_list, "foo@example.com")
+ self.listdir_is_ok("foo@example.com")
+
+class ListOptionTestCases(ListBase):
+
+ def check(self, ml, wanted):
+ self.failUnlessEqual(ml.cp.sections(), ["list"])
+ cpdict = {}
+ for key, value in ml.cp.items("list"):
+ cpdict[key] = value
+ self.failUnlessEqual(cpdict, wanted)
+
+ def testDefaultOptionsOnCreateAndOpenExisting(self):
+ self.mlm.create_list("foo@example.com")
+ ml = self.mlm.open_list("foo@example.com")
+ self.check(ml,
+ {
+ "owners": "",
+ "moderators": "",
+ "subscription": "free",
+ "posting": "free",
+ "archived": "no",
+ "mail-on-subscription-changes": "no",
+ "mail-on-forced-unsubscribe": "no",
+ "ignore-bounce": "no",
+ "language": "",
+ "pristine-headers": "",
+ })
+
+ def testChangeOptions(self):
+ # Create a list, change some options, and save the result.
+ ml = self.mlm.create_list("foo@example.com")
+ self.failUnlessEqual(ml.cp.get("list", "owners"), "")
+ self.failUnlessEqual(ml.cp.get("list", "posting"), "free")
+ ml.cp.set("list", "owners", "owner@example.com")
+ ml.cp.set("list", "posting", "moderated")
+ ml.save_config()
+
+ # Re-open the list and check that the new instance has read the
+ # values from the disk correctly.
+ ml2 = self.mlm.open_list("foo@example.com")
+ self.check(ml2,
+ {
+ "owners": "owner@example.com",
+ "moderators": "",
+ "subscription": "free",
+ "posting": "moderated",
+ "archived": "no",
+ "mail-on-subscription-changes": "no",
+ "mail-on-forced-unsubscribe": "no",
+ "ignore-bounce": "no",
+ "language": "",
+ "pristine-headers": "",
+ })
+
+class SubscriberDatabaseTestCases(ListBase):
+
+ def has_subscribers(self, ml, addrs):
+ subs = ml.subscribers.get_all()
+ subs.sort()
+ self.failUnlessEqual(subs, addrs)
+
+ def testAddAndRemoveSubscribers(self):
+ addrs = ["joe@example.com", "MARY@example.com", "bubba@EXAMPLE.com"]
+ addrs.sort()
+
+ ml = self.mlm.create_list("foo@example.com")
+ self.failUnlessEqual(ml.subscribers.get_all(), [])
+
+ self.failUnless(ml.subscribers.lock())
+ ml.subscribers.add_many(addrs)
+ self.has_subscribers(ml, addrs)
+ ml.subscribers.save()
+ self.failIf(ml.subscribers.locked)
+ ml = None
+
+ ml2 = self.mlm.open_list("foo@example.com")
+ self.has_subscribers(ml2, addrs)
+ ml2.subscribers.lock()
+ ml2.subscribers.remove(addrs[0])
+ self.has_subscribers(ml2, addrs[1:])
+
+ ml2.subscribers.save()
+
+ ml3 = self.mlm.open_list("foo@example.com")
+ self.has_subscribers(ml3, addrs[1:])
+
+ def testSubscribeTwice(self):
+ ml = self.mlm.create_list("foo@example.com")
+ self.failUnlessEqual(ml.subscribers.get_all(), [])
+ ml.subscribers.lock()
+ ml.subscribers.add("user@example.com")
+ ml.subscribers.add("USER@example.com")
+ self.failUnlessEqual(map(string.lower, ml.subscribers.get_all()),
+ map(string.lower, ["USER@example.com"]))
+
+ def testSubscriberAttributesAndGroups(self):
+ addrs = ["joe@example.com", "mary@example.com"]
+ addrs.sort()
+ ml = self.mlm.create_list("foo@example.com")
+ self.failUnlessEqual(ml.subscribers.groups(), [])
+ ml.subscribers.lock()
+ id = ml.subscribers.add_many(addrs)
+ self.failUnlessEqual(ml.subscribers.groups(), ["0"])
+ self.failUnlessEqual(ml.subscribers.get(id, "status"), "ok")
+ ml.subscribers.set(id, "status", "bounced")
+ self.failUnlessEqual(ml.subscribers.get(id, "status"), "bounced")
+ subs = ml.subscribers.in_group(id)
+ subs.sort()
+ self.failUnlessEqual(subs, addrs)
+
+class ModerationBoxTestCases(ListBase):
+
+ def testModerationBox(self):
+ ml = self.mlm.create_list("foo@example.com")
+ listdir = os.path.join(DOTDIR, "foo@example.com")
+ boxdir = os.path.join(listdir, "moderation-box")
+
+ self.failUnlessEqual(boxdir, ml.moderation_box.boxdir)
+ self.failUnless(os.path.isdir(boxdir))
+
+ mailtext = "From: foo\nTo: bar\n\nhello\n"
+ id = ml.moderation_box.add("foo", mailtext)
+ self.failUnless(ml.moderation_box.has(id))
+ self.failUnlessEqual(ml.moderation_box.get_address(id), "foo")
+ self.failUnlessEqual(ml.moderation_box.get(id), mailtext)
+
+ filename = os.path.join(boxdir, id)
+ self.failUnless(os.path.isfile(filename))
+ self.failUnless(os.path.isfile(filename + ".address"))
+
+ ml.moderation_box.remove(id)
+ self.failIf(ml.moderation_box.has(id))
+ self.failUnless(not os.path.exists(filename))
+
+class IncomingBase(unittest.TestCase):
+
+ def setUp(self):
+ if os.path.isdir(DOTDIR):
+ shutil.rmtree(DOTDIR)
+ self.mlm = eoc.MailingListManager(DOTDIR)
+ self.ml = None
+ ml = self.mlm.create_list("foo@EXAMPLE.com")
+ ml.cp.set("list", "owners", "listmaster@example.com")
+ ml.save_config()
+ ml.subscribers.lock()
+ ml.subscribers.add("USER1@example.com")
+ ml.subscribers.add("user2@EXAMPLE.com")
+ ml.subscribers.save()
+ self.write_file_in_listdir(ml, "headers-to-add", "X-Foo: foo\n")
+ self.write_file_in_listdir(ml, "headers-to-remove", "Received\n")
+ self.sent_mail = []
+
+ def tearDown(self):
+ shutil.rmtree(DOTDIR)
+
+ def write_file_in_listdir(self, ml, basename, contents):
+ f = open(os.path.join(ml.dirname, basename), "w")
+ f.write(contents)
+ f.close()
+
+ def configure_list(self, subscription, posting):
+ list = self.mlm.open_list("foo@example.com")
+ list.cp.set("list", "subscription", subscription)
+ list.cp.set("list", "posting", posting)
+ list.save_config()
+
+ def environ(self, sender, recipient):
+ eoc.set_environ({
+ "SENDER": sender,
+ "RECIPIENT": recipient,
+ })
+
+ def catch_sendmail(self, sender, recipients, text):
+ self.sent_mail.append({
+ "sender": sender,
+ "recipients": recipients,
+ "text": text,
+ })
+
+ def send(self, sender, recipient, text="", force_moderation=0,
+ force_posting=0):
+ self.environ(sender, recipient)
+ dict = self.mlm.parse_recipient_address(recipient, None, None)
+ dict["force-moderation"] = force_moderation
+ dict["force-posting"] = force_posting
+ self.ml = self.mlm.open_list(dict["name"])
+ if "\n\n" not in text:
+ text = "\n\n" + text
+ text = "Received: foobar\n" + text
+ self.ml.read_stdin = lambda t=text: t
+ self.mlm.send_mail = self.catch_sendmail
+ self.sent_mail = []
+ self.ml.obey(dict)
+
+ def sender_matches(self, mail, sender):
+ pat = "(?P<address>" + sender + ")"
+ m = re.match(pat, mail["sender"], re.I)
+ if m:
+ return m.group("address")
+ else:
+ return None
+
+ def replyto_matches(self, mail, replyto):
+ pat = "(.|\n)*(?P<address>" + replyto + ")"
+ m = re.match(pat, mail["text"], re.I)
+ if m:
+ return m.group("address")
+ else:
+ return None
+
+ def receiver_matches(self, mail, recipient):
+ return map(string.lower, mail["recipients"]) == [recipient.lower()]
+
+ def body_matches(self, mail, body):
+ if body:
+ pat = re.compile("(.|\n)*" + body + "(.|\n)*")
+ m = re.match(pat, mail["text"])
+ return m
+ else:
+ return 1
+
+ def headers_match(self, mail, header):
+ if header:
+ pat = re.compile("(.|\n)*" + header + "(.|\n)*", re.I)
+ m = re.match(pat, mail["text"])
+ return m
+ else:
+ return 1
+
+ def match(self, sender, replyto, receiver, body=None, header=None,
+ anti_header=None):
+ ret = None
+ for mail in self.sent_mail:
+ if replyto is None:
+ m1 = self.sender_matches(mail, sender)
+ m3 = self.receiver_matches(mail, receiver)
+ m4 = self.body_matches(mail, body)
+ m5 = self.headers_match(mail, header)
+ m6 = self.headers_match(mail, anti_header)
+ no_anti_header = anti_header == None or m6 == None
+ if m1 != None and m3 and m4 and m5 and no_anti_header:
+ ret = m1
+ self.sent_mail.remove(mail)
+ break
+ else:
+ m1 = self.sender_matches(mail, sender)
+ m2 = self.replyto_matches(mail, replyto)
+ m3 = self.receiver_matches(mail, receiver)
+ m4 = self.body_matches(mail, body)
+ m5 = self.headers_match(mail, header)
+ m6 = self.headers_match(mail, anti_header)
+ no_anti_header = anti_header == None or m6 == None
+ if m1 != None and m2 != None and m3 and m4 and m5 and \
+ no_anti_header:
+ ret = m2
+ self.sent_mail.remove(mail)
+ break
+ self.failUnless(ret != None)
+ return ret
+
+ def no_more_mail(self):
+ self.failUnlessEqual(self.sent_mail, [])
+
+
+class SimpleCommandAddressTestCases(IncomingBase):
+
+ def testHelp(self):
+ self.send("outsider@example.com", "foo-help@example.com")
+ self.match("foo-ignore@example.com", None, "outsider@example.com",
+ "Subject: Help for")
+ self.no_more_mail()
+
+ def testOwner(self):
+ self.send("outsider@example.com", "foo-owner@example.com", "abcde")
+ self.match("outsider@example.com", None, "listmaster@example.com",
+ "abcde")
+ self.no_more_mail()
+
+ def testIgnore(self):
+ self.send("outsider@example.com", "foo-ignore@example.com", "abcde")
+ self.no_more_mail()
+
+class OwnerCommandTestCases(IncomingBase):
+
+ def testList(self):
+ self.send("listmaster@example.com", "foo-list@example.com")
+ self.match("foo-ignore@example.com", None, "listmaster@example.com",
+ "[uU][sS][eE][rR][12]@" +
+ "[eE][xX][aA][mM][pP][lL][eE]\\.[cC][oO][mM]\n" +
+ "[uU][sS][eE][rR][12]@" +
+ "[eE][xX][aA][mM][pP][lL][eE]\\.[cC][oO][mM]\n")
+ self.no_more_mail()
+
+ def testListDenied(self):
+ self.send("outsider@example.com", "foo-list@example.com")
+ self.match("foo-ignore@example.com", None, "outsider@example.com",
+ "Subject: Subscriber list denied")
+ self.no_more_mail()
+
+ def testSetlist(self):
+ self.send("listmaster@example.com", "foo-setlist@example.com",
+ "From: foo\n\nnew1@example.com\nuser1@example.com\n")
+ a = self.match("foo-ignore@example.com",
+ "foo-setlistyes-[^@]*@example.com",
+ "listmaster@example.com",
+ "Subject: Please moderate subscriber list")
+ self.no_more_mail()
+
+ self.send("listmaster@example.com", a)
+ self.match("foo-ignore@example.com", None, "listmaster@example.com",
+ "Subject: Subscriber list has been changed")
+ self.match("foo-ignore@example.com", None, "new1@example.com",
+ "Subject: Welcome to")
+ self.match("foo-ignore@example.com", None, "user2@EXAMPLE.com",
+ "Subject: Goodbye from")
+ self.no_more_mail()
+
+ def testSetlistSilently(self):
+ self.send("listmaster@example.com", "foo-setlistsilently@example.com",
+ "From: foo\n\nnew1@example.com\nuser1@example.com\n")
+ a = self.match("foo-ignore@example.com",
+ "foo-setlistsilentyes-[^@]*@example.com",
+ "listmaster@example.com",
+ "Subject: Please moderate subscriber list")
+ self.no_more_mail()
+
+ self.send("listmaster@example.com", a)
+ self.match("foo-ignore@example.com", None, "listmaster@example.com",
+ "Subject: Subscriber list has been changed")
+ self.no_more_mail()
+
+ def testSetlistDenied(self):
+ self.send("outsider@example.com", "foo-setlist@example.com",
+ "From: foo\n\nnew1@example.com\nnew2@example.com\n")
+ self.match("foo-ignore@example.com",
+ None,
+ "outsider@example.com",
+ "Subject: You can't set the subscriber list")
+ self.no_more_mail()
+
+ def testSetlistBadlist(self):
+ self.send("listmaster@example.com", "foo-setlist@example.com",
+ "From: foo\n\nBlah blah blah.\n")
+ self.match("foo-ignore@example.com",
+ None,
+ "listmaster@example.com",
+ "Subject: Bad address list")
+ self.no_more_mail()
+
+ def testOwnerSubscribesSomeoneElse(self):
+ # Send subscription request. List sends confirmation request.
+ self.send("listmaster@example.com",
+ "foo-subscribe-outsider=example.com@example.com")
+ a = self.match("foo-ignore@example.com",
+ "foo-subyes-[^@]*@example.com",
+ "listmaster@example.com",
+ "Please confirm subscription")
+ self.no_more_mail()
+
+ # Confirm sub. req. List sends welcome.
+ self.send("listmaster@example.com", a)
+ self.match("foo-ignore@example.com",
+ None,
+ "outsider@example.com",
+ "Welcome to the")
+ self.no_more_mail()
+
+ def testOwnerUnubscribesSomeoneElse(self):
+ # Send unsubscription request. List sends confirmation request.
+ self.send("listmaster@example.com",
+ "foo-unsubscribe-outsider=example.com@example.com")
+ a = self.match("foo-ignore@example.com",
+ "foo-unsubyes-[^@]*@example.com",
+ "listmaster@example.com",
+ "Subject: Please confirm UNsubscription")
+ self.no_more_mail()
+
+ # Confirm sub. req. List sends welcome.
+ self.send("listmaster@example.com", a)
+ self.match("foo-ignore@example.com", None, "outsider@example.com",
+ "Goodbye")
+ self.no_more_mail()
+
+class SubscriptionTestCases(IncomingBase):
+
+ def confirm(self, recipient):
+ # List has sent confirmation request. Respond to it.
+ a = self.match("foo-ignore@example.com",
+ "foo-subyes-[^@]*@example.com",
+ recipient,
+ "Please confirm subscription")
+ self.no_more_mail()
+
+ # Confirm sub. req. List response will be analyzed later.
+ self.send("something.random@example.com", a)
+
+ def got_welcome(self, recipient):
+ self.match("foo-ignore@example.com",
+ None,
+ recipient,
+ "Welcome to the")
+ self.no_more_mail()
+
+ def approve(self, user_recipient):
+ self.match("foo-ignore@example.com", None, user_recipient)
+ a = self.match("foo-ignore@example.com",
+ "foo-subapprove-[^@]*@example.com",
+ "listmaster@example.com")
+ self.send("listmaster@example.com", a)
+
+ def reject(self, user_recipient):
+ self.match("foo-ignore@example.com", None, user_recipient)
+ a = self.match("foo-ignore@example.com",
+ "foo-subreject-[^@]*@example.com",
+ "listmaster@example.com")
+ self.send("listmaster@example.com", a)
+
+ def testSubscribeToUnmoderatedWithoutAddressNotOnList(self):
+ self.configure_list("free", "free")
+ self.send("outsider@example.com", "foo-subscribe@example.com")
+ self.confirm("outsider@example.com")
+ self.got_welcome("outsider@example.com")
+
+ def testSubscribeToUnmoderatedWithoutAddressAlreadyOnList(self):
+ self.configure_list("free", "free")
+ self.send("user1@example.com", "foo-subscribe@example.com")
+ self.confirm("user1@example.com")
+ self.got_welcome("user1@example.com")
+
+ def testSubscribeToUnmoderatedWithAddressNotOnList(self):
+ self.configure_list("free", "free")
+ self.send("somebody.else@example.com",
+ "foo-subscribe-outsider=example.com@example.com")
+ self.confirm("outsider@example.com")
+ self.got_welcome("outsider@example.com")
+
+ def testSubscribeToUnmoderatedWithAddressAlreadyOnList(self):
+ self.configure_list("free", "free")
+ self.send("somebody.else@example.com",
+ "foo-subscribe-user1=example.com@example.com")
+ self.confirm("user1@example.com")
+ self.got_welcome("user1@example.com")
+
+ def testSubscribeToModeratedWithoutAddressNotOnListApproved(self):
+ self.configure_list("moderated", "moderated")
+ self.send("outsider@example.com", "foo-subscribe@example.com")
+ self.confirm("outsider@example.com")
+ self.approve("outsider@example.com")
+ self.got_welcome("outsider@example.com")
+
+ def testSubscribeToModeratedWithoutAddressNotOnListRejected(self):
+ self.configure_list("moderated", "moderated")
+ self.send("outsider@example.com", "foo-subscribe@example.com")
+ self.confirm("outsider@example.com")
+ self.reject("outsider@example.com")
+
+ def testSubscribeToModeratedWithoutAddressAlreadyOnListApproved(self):
+ self.configure_list("moderated", "moderated")
+ self.send("user1@example.com", "foo-subscribe@example.com")
+ self.confirm("user1@example.com")
+ self.approve("user1@example.com")
+ self.got_welcome("user1@example.com")
+
+ def testSubscribeToModeratedWithoutAddressAlreadyOnListRejected(self):
+ self.configure_list("moderated", "moderated")
+ self.send("user1@example.com", "foo-subscribe@example.com")
+ self.confirm("user1@example.com")
+ self.reject("user1@example.com")
+
+ def testSubscribeToModeratedWithAddressNotOnListApproved(self):
+ self.configure_list("moderated", "moderated")
+ self.send("somebody.else@example.com",
+ "foo-subscribe-outsider=example.com@example.com")
+ self.confirm("outsider@example.com")
+ self.approve("outsider@example.com")
+ self.got_welcome("outsider@example.com")
+
+ def testSubscribeToModeratedWithAddressNotOnListRejected(self):
+ self.configure_list("moderated", "moderated")
+ self.send("somebody.else@example.com",
+ "foo-subscribe-outsider=example.com@example.com")
+ self.confirm("outsider@example.com")
+ self.reject("outsider@example.com")
+
+ def testSubscribeToModeratedWithAddressAlreadyOnListApproved(self):
+ self.configure_list("moderated", "moderated")
+ self.send("somebody.else@example.com",
+ "foo-subscribe-user1=example.com@example.com")
+ self.confirm("user1@example.com")
+ self.approve("user1@example.com")
+ self.got_welcome("user1@example.com")
+
+ def testSubscribeToModeratedWithAddressAlreadyOnListRejected(self):
+ self.configure_list("moderated", "moderated")
+ self.send("somebody.else@example.com",
+ "foo-subscribe-user1=example.com@example.com")
+ self.confirm("user1@example.com")
+ self.reject("user1@example.com")
+
+class UnsubscriptionTestCases(IncomingBase):
+
+ def confirm(self, recipient):
+ # List has sent confirmation request. Respond to it.
+ a = self.match("foo-ignore@example.com",
+ "foo-unsubyes-[^@]*@example.com",
+ recipient,
+ "Please confirm UNsubscription")
+ self.no_more_mail()
+
+ # Confirm sub. req. List response will be analyzed later.
+ self.send("something.random@example.com", a)
+
+ def got_goodbye(self, recipient):
+ self.match("foo-ignore@example.com",
+ None,
+ recipient,
+ "Goodbye from")
+ self.no_more_mail()
+
+ def testUnsubscribeWithoutAddressNotOnList(self):
+ self.send("outsider@example.com", "foo-unsubscribe@example.com")
+ self.confirm("outsider@example.com")
+ self.got_goodbye("outsider@example.com")
+
+ def testUnsubscribeWithoutAddressOnList(self):
+ self.send("user1@example.com", "foo-unsubscribe@example.com")
+ self.confirm("user1@example.com")
+ self.got_goodbye("user1@example.com")
+
+ def testUnsubscribeWithAddressNotOnList(self):
+ self.send("somebody.else@example.com",
+ "foo-unsubscribe-outsider=example.com@example.com")
+ self.confirm("outsider@example.com")
+ self.got_goodbye("outsider@example.com")
+
+ def testUnsubscribeWithAddressOnList(self):
+ self.send("somebody.else@example.com",
+ "foo-unsubscribe-user1=example.com@example.com")
+ self.confirm("user1@example.com")
+ self.got_goodbye("user1@example.com")
+
+class PostTestCases(IncomingBase):
+
+ msg = u"Subject: something \u00c4\n\nhello, world\n".encode("utf8")
+
+ def approve(self, user_recipient):
+ self.match("foo-ignore@example.com", None, user_recipient)
+ a = self.match("foo-ignore@example.com",
+ "foo-approve-[^@]*@example.com",
+ "listmaster@example.com")
+ self.send("listmaster@example.com", a)
+
+ def reject(self, user_recipient):
+ self.match("foo-ignore@example.com", None, user_recipient)
+ a = self.match("foo-ignore@example.com",
+ "foo-reject-[^@]*@example.com",
+ "listmaster@example.com")
+ self.send("listmaster@example.com", a)
+
+ def check_headers_are_encoded(self):
+ ok_chars = "\t\r\n"
+ for code in range(32, 127):
+ ok_chars = ok_chars + chr(code)
+ for mail in self.sent_mail:
+ text = mail["text"]
+ self.failUnless("\n\n" in text)
+ headers = text.split("\n\n")[0]
+ for c in headers:
+ if c not in ok_chars: print headers
+ self.failUnless(c in ok_chars)
+
+ def check_mail_to_list(self):
+ self.check_headers_are_encoded()
+ self.match("foo-bounce-.*@example.com", None, "USER1@example.com",
+ body="hello, world",
+ header="X-Foo: FOO",
+ anti_header="Received:")
+ self.match("foo-bounce-.*@example.com", None, "user2@EXAMPLE.com",
+ body="hello, world",
+ header="x-foo: foo",
+ anti_header="Received:")
+ self.no_more_mail()
+
+ def check_that_moderation_box_is_empty(self):
+ ml = self.mlm.open_list("foo@example.com")
+ self.failUnlessEqual(os.listdir(ml.moderation_box.boxdir), [])
+
+ def testSubscriberPostsToUnmoderated(self):
+ self.configure_list("free", "free")
+ self.send("user1@example.com", "foo@example.com",
+ self.msg)
+ self.check_mail_to_list()
+
+ def testOutsiderPostsToUnmoderated(self):
+ self.configure_list("free", "free")
+ self.send("outsider@example.com", "foo@example.com", self.msg)
+ self.check_mail_to_list()
+
+ def testSubscriberPostToAutomoderated(self):
+ self.configure_list("free", "auto")
+ self.check_that_moderation_box_is_empty()
+ self.send("user1@example.com", "foo@example.com", self.msg)
+ self.check_mail_to_list()
+ self.check_that_moderation_box_is_empty()
+
+ def testOutsiderPostsToAutomoderatedRejected(self):
+ self.configure_list("free", "auto")
+ self.check_that_moderation_box_is_empty()
+ self.send("outsider@example.com", "foo@example.com", self.msg)
+ self.reject("outsider@example.com")
+ self.check_that_moderation_box_is_empty()
+
+ def testOutsiderPostsToAutomoderatedApproved(self):
+ self.configure_list("free", "auto")
+ self.check_that_moderation_box_is_empty()
+ self.send("outsider@example.com", "foo@example.com", self.msg)
+ self.approve("outsider@example.com")
+ self.check_mail_to_list()
+ self.check_that_moderation_box_is_empty()
+
+ def testSubscriberPostsToModeratedRejected(self):
+ self.configure_list("free", "moderated")
+ self.check_that_moderation_box_is_empty()
+ self.send("user1@example.com", "foo@example.com", self.msg)
+ self.reject("user1@example.com")
+ self.check_that_moderation_box_is_empty()
+
+ def testOutsiderPostsToMderatedApproved(self):
+ self.configure_list("free", "moderated")
+ self.check_that_moderation_box_is_empty()
+ self.send("outsider@example.com", "foo@example.com", self.msg)
+ self.approve("outsider@example.com")
+ self.check_mail_to_list()
+ self.check_that_moderation_box_is_empty()
+
+ def testSubscriberPostsWithRequestToBeModerated(self):
+ self.configure_list("free", "free")
+
+ self.check_that_moderation_box_is_empty()
+ self.send("user1@example.com", "foo@example.com", self.msg,
+ force_moderation=1)
+ self.match("foo-ignore@example.com",
+ None,
+ "user1@example.com",
+ "Subject: Please wait")
+ a = self.match("foo-ignore@example.com",
+ "foo-approve-[^@]*@example.com",
+ "listmaster@example.com")
+ self.no_more_mail()
+
+ self.send("listmaster@example.com", a)
+ self.check_mail_to_list()
+ self.check_that_moderation_box_is_empty()
+
+ def testSubscriberPostsWithModerationOverride(self):
+ self.configure_list("moderated", "moderated")
+ self.send("user1@example.com", "foo@example.com", self.msg,
+ force_posting=1)
+ self.check_mail_to_list()
+ self.check_that_moderation_box_is_empty()
+
+class BounceTestCases(IncomingBase):
+
+ def check_subscriber_status(self, must_be):
+ ml = self.mlm.open_list("foo@example.com")
+ for id in ml.subscribers.groups():
+ self.failUnlessEqual(ml.subscribers.get(id, "status"), must_be)
+
+ def bounce_sent_mail(self):
+ for m in self.sent_mail[:]:
+ self.send("something@example.com", m["sender"], "eek")
+ self.failUnlessEqual(len(self.sent_mail), 0)
+
+ def send_mail_to_list_then_bounce_everything(self):
+ self.send("user@example.com", "foo@example.com", "hello")
+ for m in self.sent_mail[:]:
+ self.send("foo@example.com", m["sender"], "eek")
+ self.failUnlessEqual(len(self.sent_mail), 0)
+
+ def testBounceOnceThenRecover(self):
+ self.check_subscriber_status("ok")
+ self.send_mail_to_list_then_bounce_everything()
+
+ self.check_subscriber_status("bounced")
+
+ ml = self.mlm.open_list("foo@example.com")
+ for id in ml.subscribers.groups():
+ bounce_id = ml.subscribers.get(id, "bounce-id")
+ self.failUnless(bounce_id)
+ self.failUnless(ml.bounce_box.has(bounce_id))
+
+ bounce_ids = []
+ now = time.time()
+ ml = self.mlm.open_list("foo@example.com")
+ ml.subscribers.lock()
+ for id in ml.subscribers.groups():
+ timestamp = float(ml.subscribers.get(id, "timestamp-bounced"))
+ self.failUnless(abs(timestamp - now) < 10.0)
+ ml.subscribers.set(id, "timestamp-bounced", "69.0")
+ bounce_ids.append(ml.subscribers.get(id, "bounce-id"))
+ ml.subscribers.save()
+
+ self.mlm.cleaning_woman(no_op)
+ self.check_subscriber_status("probed")
+
+ for bounce_id in bounce_ids:
+ self.failUnless(ml.bounce_box.has(bounce_id))
+
+ self.mlm.cleaning_woman(no_op)
+ ml = self.mlm.open_list("foo@example.com")
+ self.failUnlessEqual(len(ml.subscribers.groups()), 2)
+ self.check_subscriber_status("ok")
+ for bounce_id in bounce_ids:
+ self.failUnless(not ml.bounce_box.has(bounce_id))
+
+ def testBounceProbeAlso(self):
+ self.check_subscriber_status("ok")
+ self.send_mail_to_list_then_bounce_everything()
+ self.check_subscriber_status("bounced")
+
+ ml = self.mlm.open_list("foo@example.com")
+ for id in ml.subscribers.groups():
+ bounce_id = ml.subscribers.get(id, "bounce-id")
+ self.failUnless(bounce_id)
+ self.failUnless(ml.bounce_box.has(bounce_id))
+
+ bounce_ids = []
+ now = time.time()
+ ml = self.mlm.open_list("foo@example.com")
+ ml.subscribers.lock()
+ for id in ml.subscribers.groups():
+ timestamp = float(ml.subscribers.get(id, "timestamp-bounced"))
+ self.failUnless(abs(timestamp - now) < 10.0)
+ ml.subscribers.set(id, "timestamp-bounced", "69.0")
+ bounce_ids.append(ml.subscribers.get(id, "bounce-id"))
+ ml.subscribers.save()
+
+ self.sent_mail = []
+ self.mlm.cleaning_woman(self.catch_sendmail)
+ self.check_subscriber_status("probed")
+ for bounce_id in bounce_ids:
+ self.failUnless(ml.bounce_box.has(bounce_id))
+ self.bounce_sent_mail()
+ self.check_subscriber_status("probebounced")
+
+ self.mlm.cleaning_woman(no_op)
+ ml = self.mlm.open_list("foo@example.com")
+ self.failUnlessEqual(len(ml.subscribers.groups()), 0)
+ for bounce_id in bounce_ids:
+ self.failUnless(not ml.bounce_box.has(bounce_id))
+
+ def testCleaningWomanJoinsAndBounceSplitsGroups(self):
+ # Check that each group contains one address and set the creation
+ # timestamp to an ancient time.
+ ml = self.mlm.open_list("foo@example.com")
+ bouncedir = os.path.join(ml.dirname, "bounce-box")
+ ml.subscribers.lock()
+ for id in ml.subscribers.groups():
+ addrs = ml.subscribers.in_group(id)
+ self.failUnlessEqual(len(addrs), 1)
+ bounce_id = ml.subscribers.get(id, "bounce-id")
+ self.failUnlessEqual(bounce_id, "..notexist..")
+ bounce_id = "bounce-" + id
+ ml.subscribers.set(id, "bounce-id", bounce_id)
+ bounce_path = os.path.join(bouncedir, bounce_id)
+ self.failUnless(not os.path.isfile(bounce_path))
+ f = open(bounce_path, "w")
+ f.close()
+ f = open(bounce_path + ".address", "w")
+ f.close()
+ self.failUnless(os.path.isfile(bounce_path))
+ ml.subscribers.set(id, "timestamp-created", "1")
+ ml.subscribers.save()
+
+ # Check that --cleaning-woman joins the two groups into one.
+ self.failUnlessEqual(len(ml.subscribers.groups()), 2)
+ self.mlm.cleaning_woman(no_op)
+ ml = self.mlm.open_list("foo@example.com")
+ self.failUnlessEqual(len(ml.subscribers.groups()), 1)
+ self.failUnlessEqual(os.listdir(bouncedir), [])
+
+ # Check that a bounce splits the single group.
+ self.send_mail_to_list_then_bounce_everything()
+ ml = self.mlm.open_list("foo@example.com")
+ self.failUnlessEqual(len(ml.subscribers.groups()), 2)
+
+ # Check that a --cleaning-woman immediately after doesn't join.
+ # (The groups are new, thus shouldn't be joined for a week.)
+ self.failUnlessEqual(len(ml.subscribers.groups()), 2)
+ self.mlm.cleaning_woman(no_op)
+ ml = self.mlm.open_list("foo@example.com")
+ self.failUnlessEqual(len(ml.subscribers.groups()), 2)