| 1 | import re |
|---|
| 2 | import os |
|---|
| 3 | import anydbm |
|---|
| 4 | import cPickle as pickle |
|---|
| 5 | from copy import deepcopy |
|---|
| 6 | from pyndexter import * |
|---|
| 7 | from pyndexter.util import CacheDict |
|---|
| 8 | try: |
|---|
| 9 | set = set |
|---|
| 10 | except NameError: |
|---|
| 11 | from sets import Set as set |
|---|
| 12 | |
|---|
| 13 | |
|---|
| 14 | class PersistentDict(object): |
|---|
| 15 | """ A persistent, lazy, caching, dictionary. Uses the anydbm module for |
|---|
| 16 | persistence. """ |
|---|
| 17 | def __init__(self, file, mode='c', cache=2048): |
|---|
| 18 | self._cache = CacheDict(cache) |
|---|
| 19 | self._flush = {} |
|---|
| 20 | self.mode = mode == READWRITE and 'c' or 'r' |
|---|
| 21 | self.file = file |
|---|
| 22 | self._dbm = anydbm.open(self.file, self.mode) |
|---|
| 23 | |
|---|
| 24 | def __contains__(self, key): |
|---|
| 25 | key = key.encode('utf-8') |
|---|
| 26 | return key in self._cache or key in self._dbm |
|---|
| 27 | |
|---|
| 28 | def __getitem__(self, key): |
|---|
| 29 | key = key.encode('utf-8') |
|---|
| 30 | if key in self._cache: |
|---|
| 31 | return self._cache[key] |
|---|
| 32 | return self._cache.setdefault(key, pickle.loads(self._dbm[key])) |
|---|
| 33 | |
|---|
| 34 | def __setitem__(self, key, value): |
|---|
| 35 | key = key.encode('utf-8') |
|---|
| 36 | self._cache[key] = self._flush[key] = value |
|---|
| 37 | |
|---|
| 38 | def __delitem__(self, key): |
|---|
| 39 | found = False |
|---|
| 40 | key = key.encode('utf-8') |
|---|
| 41 | for data in (self._cache, self._flush, self._dbm): |
|---|
| 42 | if key in data: |
|---|
| 43 | del data[key] |
|---|
| 44 | found = True |
|---|
| 45 | if not found: |
|---|
| 46 | raise KeyError(key) |
|---|
| 47 | |
|---|
| 48 | def keys(self): |
|---|
| 49 | keys = set(self._cache.keys()) |
|---|
| 50 | keys.update(self._dbm.keys()) |
|---|
| 51 | return [key.decode('utf-8') for key in keys] |
|---|
| 52 | |
|---|
| 53 | def sync(self): |
|---|
| 54 | for key, value in self._flush.iteritems(): |
|---|
| 55 | self._dbm[key] = pickle.dumps(value, 2) |
|---|
| 56 | self._dbm.sync() |
|---|
| 57 | self._flush = {} |
|---|
| 58 | self._cache = {} |
|---|
| 59 | |
|---|
| 60 | |
|---|
| 61 | def synchronised(func): |
|---|
| 62 | """ Locking decorator. """ |
|---|
| 63 | import pyndexter.portalocker as portalocker |
|---|
| 64 | from time import strftime, localtime, time |
|---|
| 65 | |
|---|
| 66 | def synchronised(indexer, *args, **kwargs): |
|---|
| 67 | if indexer._locked: |
|---|
| 68 | return func(indexer, *args, **kwargs) |
|---|
| 69 | |
|---|
| 70 | if indexer.mode == READONLY: |
|---|
| 71 | mode = portalocker.LOCK_SH |
|---|
| 72 | else: |
|---|
| 73 | mode = portalocker.LOCK_EX |
|---|
| 74 | lockfile = open(indexer.lock_file, 'w+') |
|---|
| 75 | portalocker.lock(lockfile, mode) |
|---|
| 76 | # XXX This is not atomic |
|---|
| 77 | indexer._locked = True |
|---|
| 78 | lockfile.write(strftime('%x %X', localtime(time()))) |
|---|
| 79 | try: |
|---|
| 80 | return func(indexer, *args, **kwargs) |
|---|
| 81 | finally: |
|---|
| 82 | indexer._locked = False |
|---|
| 83 | lockfile.close() |
|---|
| 84 | return synchronised |
|---|
| 85 | |
|---|
| 86 | |
|---|
| 87 | class DefaultIndexer(Indexer): |
|---|
| 88 | """ Default indexer, using bigrams. """ |
|---|
| 89 | |
|---|
| 90 | capabilities = CAP_READONLY | CAP_HITCOUNT | CAP_UNION | \ |
|---|
| 91 | CAP_INTERSECTION | CAP_ITERATION | CAP_LIST | \ |
|---|
| 92 | CAP_ATTRIBUTES | CAP_WHOLEWORD | CAP_ASTERISK |
|---|
| 93 | |
|---|
| 94 | _tokeniser = re.compile(r'\w+') |
|---|
| 95 | |
|---|
| 96 | def __init__(self, path, source=None, mode=READWRITE, flush_every=128): |
|---|
| 97 | Indexer.__init__(self, source, mode, os.path.join(path, 'state.db')) |
|---|
| 98 | self.path = path |
|---|
| 99 | self._init_env(self.path) |
|---|
| 100 | self.lock_file = os.path.join(self.path, 'lock') |
|---|
| 101 | self._locked = False |
|---|
| 102 | self._open(mode) |
|---|
| 103 | self._flush_every = flush_every |
|---|
| 104 | self._index_count = 0 |
|---|
| 105 | |
|---|
| 106 | def index(self, document): |
|---|
| 107 | self._assert_rw() |
|---|
| 108 | if isinstance(document, basestring): |
|---|
| 109 | document = self.fetch(document) |
|---|
| 110 | node_words = set() |
|---|
| 111 | for word in set(self._tokeniser.findall(document.content)): |
|---|
| 112 | word = u' ' + word + u' ' |
|---|
| 113 | # Split word into bigrams and add to the bigram LUT |
|---|
| 114 | for bigram in self._bigram_word(word): |
|---|
| 115 | if bigram in self.bigrams: |
|---|
| 116 | words = self.bigrams[bigram] |
|---|
| 117 | words.add(word) |
|---|
| 118 | self.bigrams[bigram] = words |
|---|
| 119 | else: |
|---|
| 120 | self.bigrams[bigram] = set([word]) |
|---|
| 121 | |
|---|
| 122 | # Update word:uri mapping |
|---|
| 123 | if word in self.words: |
|---|
| 124 | uris = self.words[word] |
|---|
| 125 | uris.add(document.uri) |
|---|
| 126 | self.words[word] = uris |
|---|
| 127 | else: |
|---|
| 128 | self.words[word] = set([document.uri]) |
|---|
| 129 | |
|---|
| 130 | # Update attributes |
|---|
| 131 | self.attributes[document.uri] = deepcopy(document.attributes) |
|---|
| 132 | |
|---|
| 133 | self.uris[document.uri] = node_words |
|---|
| 134 | |
|---|
| 135 | self._index_count += 1 |
|---|
| 136 | if not self._index_count % self._flush_every: |
|---|
| 137 | self.words.sync() |
|---|
| 138 | self.bigrams.sync() |
|---|
| 139 | self.uris.sync() |
|---|
| 140 | self.attributes.sync() |
|---|
| 141 | |
|---|
| 142 | index = synchronised(index) |
|---|
| 143 | |
|---|
| 144 | def discard(self, document): |
|---|
| 145 | self._assert_rw() |
|---|
| 146 | for word in self.uris[document.uri]: |
|---|
| 147 | word_uris = self.words[word] |
|---|
| 148 | word_uris.discard(document.uri) |
|---|
| 149 | self.words[word] = word_uris |
|---|
| 150 | discard = synchronised(discard) |
|---|
| 151 | |
|---|
| 152 | def search(self, phrase, order_by=None, order_ascending=True, |
|---|
| 153 | order_type=str, intersection=True): |
|---|
| 154 | all_words = {} |
|---|
| 155 | words = [word.lower() for word in phrase.split()] |
|---|
| 156 | # First, find all possible words that each search word matches |
|---|
| 157 | for idx, word in enumerate(words): |
|---|
| 158 | if word[0] == '*': |
|---|
| 159 | word = word[1:] |
|---|
| 160 | else: |
|---|
| 161 | word = u' ' + word |
|---|
| 162 | if word[-1] == '*': |
|---|
| 163 | word = word[:-1] |
|---|
| 164 | else: |
|---|
| 165 | word = word + u' ' |
|---|
| 166 | words[idx] = word |
|---|
| 167 | bigrams = self._bigram_word(word) |
|---|
| 168 | all_words[word] = set([w for w in self._bigram_search(bigrams) |
|---|
| 169 | if word in w]) |
|---|
| 170 | |
|---|
| 171 | # # Next, find the intersection/union of all files that all words appear |
|---|
| 172 | # # in |
|---|
| 173 | # if not intersection: |
|---|
| 174 | # all_uris = set() |
|---|
| 175 | # for word in words: |
|---|
| 176 | # all_files.update(self.words[fullword |
|---|
| 177 | |
|---|
| 178 | first_set = 1 |
|---|
| 179 | all_uris = set() |
|---|
| 180 | for word in words: |
|---|
| 181 | # Find all uris that word appears in |
|---|
| 182 | word_uris = set() |
|---|
| 183 | for fullword in all_words[word]: |
|---|
| 184 | word_uris.update(set(self.words[fullword])) |
|---|
| 185 | |
|---|
| 186 | if first_set: |
|---|
| 187 | all_uris = word_uris |
|---|
| 188 | first_set = 0 |
|---|
| 189 | else: |
|---|
| 190 | all_uris.intersection_update(word_uris) |
|---|
| 191 | return DefaultSearch(self, phrase, all_uris) |
|---|
| 192 | search = synchronised(search) |
|---|
| 193 | |
|---|
| 194 | def close(self): |
|---|
| 195 | self.sync() |
|---|
| 196 | self.words = self.bigrams = self.uris = None |
|---|
| 197 | |
|---|
| 198 | def sync(self): |
|---|
| 199 | if self.mode == READWRITE: |
|---|
| 200 | self.words.sync() |
|---|
| 201 | self.bigrams.sync() |
|---|
| 202 | self.uris.sync() |
|---|
| 203 | self.attributes.sync() |
|---|
| 204 | self._sync_source_state() |
|---|
| 205 | sync = synchronised(sync) |
|---|
| 206 | |
|---|
| 207 | # Internal methods |
|---|
| 208 | def _open(self, mode): |
|---|
| 209 | # word:uri mapping |
|---|
| 210 | self.words = PersistentDict(os.path.join(self.path, 'words.db'), mode, 8192) |
|---|
| 211 | # bigram:word |
|---|
| 212 | self.bigrams = PersistentDict(os.path.join(self.path, 'bigrams.db'), mode, 4096) |
|---|
| 213 | # uri:words mapping |
|---|
| 214 | self.uris = PersistentDict(os.path.join(self.path, 'uris.db'), mode, 32) |
|---|
| 215 | # uri:attribute mapping |
|---|
| 216 | self.attributes = PersistentDict(os.path.join(self.path, 'attributes.db'), mode, 32) |
|---|
| 217 | _open = synchronised(_open) |
|---|
| 218 | |
|---|
| 219 | def _bigram_word(word): |
|---|
| 220 | for start in range(0, len(word) - 1): |
|---|
| 221 | yield word[start:start + 2] |
|---|
| 222 | _bigram_word = staticmethod(_bigram_word) |
|---|
| 223 | |
|---|
| 224 | def _bigram_search(self, bigrams): |
|---|
| 225 | """ Find all words containing matching bigrams. """ |
|---|
| 226 | first_hit = 1 |
|---|
| 227 | words = set() |
|---|
| 228 | for bigram in bigrams: |
|---|
| 229 | if bigram in self.bigrams: |
|---|
| 230 | if first_hit: |
|---|
| 231 | words = self.bigrams[bigram] |
|---|
| 232 | first_hit = 0 |
|---|
| 233 | else: |
|---|
| 234 | words.intersection_update(set(self.bigrams[bigram])) |
|---|
| 235 | else: |
|---|
| 236 | return () |
|---|
| 237 | return words |
|---|
| 238 | _bigram_search = synchronised(_bigram_search) |
|---|
| 239 | |
|---|
| 240 | |
|---|
| 241 | class DefaultSearch(Search): |
|---|
| 242 | def __iter__(self): |
|---|
| 243 | for uri in self.context: |
|---|
| 244 | yield Hit(**self.indexer.attributes[uri]) |
|---|
| 245 | |
|---|
| 246 | def __len__(self): |
|---|
| 247 | return len(self.context) |
|---|
| 248 | |
|---|
| 249 | def __getitem__(self, index): |
|---|
| 250 | return self.context[index] |
|---|