# Copyright 2014 Lars Wirzenius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # =*= License: GPL-3+ =*= import email import os import distixlib class TicketHasNoId(distixlib.StructuredError): msg = "Can't save ticket without an id" class TicketAlreadyInStoreError(distixlib.StructuredError): msg = 'Ticket {ticket_id} already exists in store {dirname}' class TicketNotInStoreError(distixlib.StructuredError): msg = 'Ticket {ticket_id} is not in store {dirname}' class TicketStore(object): '''A ticket store in a repository. This is just one set of tickets in a repository. The design allows for many stores. Tickets are stored in a directory. The name of the directory is given to the initialiser. ''' def __init__(self, dirname): self._dirname = dirname self._loader = distixlib.TicketLoader() self._saver = distixlib.TicketSaver() self._ticket_cache = _TicketCache() self._dirty = False def is_dirty(self): return self._dirty or self._any_ticket_is_dirty() def _any_ticket_is_dirty(self): return any(ticket.is_dirty() for ticket in self._get_cached_tickets()) def _get_cached_tickets(self): return self._ticket_cache.get_all() def make_clean(self): for ticket in self._get_cached_tickets(): ticket.make_clean() self._dirty = False def get_ticket_ids(self): if os.path.exists(self._dirname): return os.listdir(self._dirname) return [] def clear_ticket_cache(self): # pragma: no cover self._ticket_cache.clear() def get_ticket(self, ticket_id_or_prefix): '''Return a distixlib.Ticket for an existing ticket in the store.''' ticket_id = self._find_ticket_id_from_prefix(ticket_id_or_prefix) if ticket_id not in self._ticket_cache: ticket_dir = self._get_dir_for_ticket(ticket_id) ticket = self._load_ticket(ticket_dir) self._ticket_cache.put(ticket) return self._ticket_cache.get(ticket_id) def _find_ticket_id_from_prefix(self, prefix): cached_ids = self._ticket_cache.keys() matches = self._find_ticket_ids_matching_prefix(cached_ids, prefix) if len(matches) == 1: return matches[0] ticket_ids = self.get_ticket_ids() matches = self._find_ticket_ids_matching_prefix(ticket_ids, prefix) if len(matches) == 1: return matches[0] raise TicketNotInStoreError(ticket_id=prefix) def _find_ticket_ids_matching_prefix(self, ticket_ids, prefix): prefix = prefix.lower() return [tid for tid in ticket_ids if tid.lower().startswith(prefix)] def _load_ticket(self, ticket_dir): return self._loader.load_from_directory(ticket_dir) def get_tickets(self): '''Return all the tickets in a store.''' ticket_dirs = self._find_ticket_dirs() loaded_tickets = [self._load_ticket(x) for x in ticket_dirs] self._cache_tickets(loaded_tickets) ticket_ids = [ticket.get_ticket_id() for ticket in loaded_tickets] return self._load_tickets_from_ticket_cache(ticket_ids) def _find_ticket_dirs(self): return [ os.path.join(self._dirname, ticket_id) for ticket_id in self.get_ticket_ids() ] def _cache_tickets(self, tickets): for ticket in tickets: ticket_id = ticket.get_ticket_id() if ticket_id not in self._ticket_cache: self._ticket_cache.put(ticket) def _load_tickets_from_ticket_cache(self, ticket_ids): return [self._ticket_cache.get(ticket_id) for ticket_id in ticket_ids] def add_ticket(self, ticket): '''Add ticket to the store. Return list of pathnames to all the files that got created. ''' ticket_id = ticket.get_ticket_id() if ticket_id is None: raise TicketHasNoId() if ticket_id in self.get_ticket_ids(): raise distixlib.TicketAlreadyInStoreError( ticket_id=ticket_id, dirname=self._dirname) self._dirty = True self._ticket_cache.put(ticket) self._create_store_directory() return self._create_ticket(ticket) def _create_store_directory(self): if not os.path.exists(self._dirname): os.mkdir(self._dirname) def _create_ticket(self, ticket): ticket_dir = self._get_dir_for_ticket(ticket.get_ticket_id()) return self._saver.create_ticket_on_disk(ticket, ticket_dir) def _get_dir_for_ticket(self, ticket_id): return os.path.join(self._dirname, ticket_id) def _get_ticket_id(self, ticket): # pragma: no cover ticket_id = ticket.get_ticket_id() if ticket_id is None: return '' return ticket_id def save_changes(self): filenames = [] for ticket in self._get_cached_tickets(): if ticket.is_dirty(): filenames.extend(self._save_ticket(ticket)) self.make_clean() return filenames def _save_ticket(self, ticket): ticket_dir = self._get_dir_for_ticket(ticket.get_ticket_id()) return self._saver.save_changes_to_ticket(ticket, ticket_dir) def ticket_has_message_with_text( self, ticket_id, msg_text): # pragma: no cover filenames = self.get_message_filenames(ticket_id) for filename in filenames: if self._file_contains(filename, msg_text): return True return False def get_message_filenames(self, ticket_id): # pragma: no cover ticket_dir = self._get_dir_for_ticket(ticket_id) maildir_pathname = self._saver._get_maildir_pathname(ticket_dir) message_filenames = [] for dirname, subdirs, filenames in os.walk(maildir_pathname): if '.empty' in subdirs: subdirs.remove('.empty') for filename in filenames: message_filenames.append(os.path.join(dirname, filename)) return message_filenames def _file_contains(self, filename, data): # pragma: no cover Note # that the email.Message class may reformat headers, so that # msg.as_string() doesn't return the pristine original message # text. Thus we can't just compare file content with data. # Instead we load the file content into a message, let any # reformatting happen, and then compare msg.as_string() with # data. with open(filename) as f: msg = email.message_from_file(f) return msg.as_string() == data class _TicketCache(object): def __init__(self): self._known_tickets = {} def keys(self): return self._known_tickets.keys() def put(self, ticket): self._known_tickets[ticket.get_ticket_id()] = ticket def __contains__(self, ticket_id): return ticket_id in self._known_tickets def get(self, ticket_id): return self._known_tickets[ticket_id] def get_all(self): return self._known_tickets.values() def clear(self): # pragma: no cover self._known_tickets.clear()