# Copyright 2014 Lars Wirzenius
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
# =*= License: GPL-3+ =*=
import email
import os
import distixlib
class TicketHasNoId(distixlib.StructuredError):
msg = "Can't save ticket without an id"
class TicketAlreadyInStoreError(distixlib.StructuredError):
msg = 'Ticket {ticket_id} already exists in store {dirname}'
class TicketNotInStoreError(distixlib.StructuredError):
msg = 'Ticket {ticket_id} is not in store {dirname}'
class TicketStore(object):
'''A ticket store in a repository.
This is just one set of tickets in a repository. The design allows
for many stores.
Tickets are stored in a directory. The name of the directory is
given to the initialiser.
'''
def __init__(self, dirname):
self._dirname = dirname
self._loader = distixlib.TicketLoader()
self._saver = distixlib.TicketSaver()
self._ticket_cache = _TicketCache()
self._dirty = False
def is_dirty(self):
return self._dirty or self._any_ticket_is_dirty()
def _any_ticket_is_dirty(self):
return any(ticket.is_dirty() for ticket in self._get_cached_tickets())
def _get_cached_tickets(self):
return self._ticket_cache.get_all()
def make_clean(self):
for ticket in self._get_cached_tickets():
ticket.make_clean()
self._dirty = False
def get_ticket_ids(self):
if os.path.exists(self._dirname):
return os.listdir(self._dirname)
return []
def clear_ticket_cache(self): # pragma: no cover
self._ticket_cache.clear()
def get_ticket(self, ticket_id_or_prefix):
'''Return a distixlib.Ticket for an existing ticket in the store.'''
ticket_id = self._find_ticket_id_from_prefix(ticket_id_or_prefix)
if ticket_id not in self._ticket_cache:
ticket_dir = self._get_dir_for_ticket(ticket_id)
ticket = self._load_ticket(ticket_dir)
self._ticket_cache.put(ticket)
return self._ticket_cache.get(ticket_id)
def _find_ticket_id_from_prefix(self, prefix):
cached_ids = self._ticket_cache.keys()
matches = self._find_ticket_ids_matching_prefix(cached_ids, prefix)
if len(matches) == 1:
return matches[0]
ticket_ids = self.get_ticket_ids()
matches = self._find_ticket_ids_matching_prefix(ticket_ids, prefix)
if len(matches) == 1:
return matches[0]
raise TicketNotInStoreError(ticket_id=prefix)
def _find_ticket_ids_matching_prefix(self, ticket_ids, prefix):
prefix = prefix.lower()
return [tid for tid in ticket_ids if tid.lower().startswith(prefix)]
def _load_ticket(self, ticket_dir):
return self._loader.load_from_directory(ticket_dir)
def get_tickets(self):
'''Return all the tickets in a store.'''
ticket_dirs = self._find_ticket_dirs()
loaded_tickets = [self._load_ticket(x) for x in ticket_dirs]
self._cache_tickets(loaded_tickets)
ticket_ids = [ticket.get_ticket_id() for ticket in loaded_tickets]
return self._load_tickets_from_ticket_cache(ticket_ids)
def _find_ticket_dirs(self):
return [
os.path.join(self._dirname, ticket_id)
for ticket_id in self.get_ticket_ids()
]
def _cache_tickets(self, tickets):
for ticket in tickets:
ticket_id = ticket.get_ticket_id()
if ticket_id not in self._ticket_cache:
self._ticket_cache.put(ticket)
def _load_tickets_from_ticket_cache(self, ticket_ids):
return [self._ticket_cache.get(ticket_id) for ticket_id in ticket_ids]
def add_ticket(self, ticket):
'''Add ticket to the store.
Return list of pathnames to all the files that got created.
'''
ticket_id = ticket.get_ticket_id()
if ticket_id is None:
raise TicketHasNoId()
if ticket_id in self.get_ticket_ids():
raise distixlib.TicketAlreadyInStoreError(
ticket_id=ticket_id, dirname=self._dirname)
self._dirty = True
self._ticket_cache.put(ticket)
self._create_store_directory()
return self._create_ticket(ticket)
def _create_store_directory(self):
if not os.path.exists(self._dirname):
os.mkdir(self._dirname)
def _create_ticket(self, ticket):
ticket_dir = self._get_dir_for_ticket(ticket.get_ticket_id())
return self._saver.create_ticket_on_disk(ticket, ticket_dir)
def _get_dir_for_ticket(self, ticket_id):
return os.path.join(self._dirname, ticket_id)
def _get_ticket_id(self, ticket): # pragma: no cover
ticket_id = ticket.get_ticket_id()
if ticket_id is None:
return ''
return ticket_id
def save_changes(self):
filenames = []
for ticket in self._get_cached_tickets():
if ticket.is_dirty():
filenames.extend(self._save_ticket(ticket))
self.make_clean()
return filenames
def _save_ticket(self, ticket):
ticket_dir = self._get_dir_for_ticket(ticket.get_ticket_id())
return self._saver.save_changes_to_ticket(ticket, ticket_dir)
def ticket_has_message_with_text(
self, ticket_id, msg_text): # pragma: no cover
filenames = self.get_message_filenames(ticket_id)
for filename in filenames:
if self._file_contains(filename, msg_text):
return True
return False
def get_message_filenames(self, ticket_id): # pragma: no cover
ticket_dir = self._get_dir_for_ticket(ticket_id)
maildir_pathname = self._saver._get_maildir_pathname(ticket_dir)
message_filenames = []
for dirname, subdirs, filenames in os.walk(maildir_pathname):
if '.empty' in subdirs:
subdirs.remove('.empty')
for filename in filenames:
message_filenames.append(os.path.join(dirname, filename))
return message_filenames
def _file_contains(self, filename, data): # pragma: no cover Note
# that the email.Message class may reformat headers, so that
# msg.as_string() doesn't return the pristine original message
# text. Thus we can't just compare file content with data.
# Instead we load the file content into a message, let any
# reformatting happen, and then compare msg.as_string() with
# data.
with open(filename) as f:
msg = email.message_from_file(f)
return msg.as_string() == data
class _TicketCache(object):
def __init__(self):
self._known_tickets = {}
def keys(self):
return self._known_tickets.keys()
def put(self, ticket):
self._known_tickets[ticket.get_ticket_id()] = ticket
def __contains__(self, ticket_id):
return ticket_id in self._known_tickets
def get(self, ticket_id):
return self._known_tickets[ticket_id]
def get_all(self):
return self._known_tickets.values()
def clear(self): # pragma: no cover
self._known_tickets.clear()