root/pyndexter/trunk/pyndexter/default.py @ 332

Revision 332, 8.0 KB (checked in by athomas, 4 years ago)

pyndexter:

  • Added a horribly inefficient built-in indexer, default.DefaultIndexer. There seems to be a memory leak somewhere, so on large datasets the indexer will consume large amounts of memory.
  • Added a util module with CacheDict, currently used by the default indexer.
  • Added some more CAP_ bits.
  • Source state data is now accumulated by the Source classes __iter__() method, no longer requiring a full walk of the source to collect state. This means an Indexer.update() will automagically do the right thing.
  • Factored out some common environment initialisation code into Indexer._init_env().
  • Factored FileSource include/exclude/predicate code into base Source class so it can be reused.
Line 
1import re
2import os
3import anydbm
4import cPickle as pickle
5from copy import deepcopy
6from pyndexter import *
7from pyndexter.util import CacheDict
8try:
9    set = set
10except NameError:
11    from sets import Set as set
12
13
14class PersistentDict(object):
15    """ A persistent, lazy, caching, dictionary. Uses the anydbm module for
16    persistence. """
17    def __init__(self, file, mode='c', cache=2048):
18        self._cache = CacheDict(cache)
19        self._flush = {}
20        self.mode = mode == READWRITE and 'c' or 'r'
21        self.file = file
22        self._dbm = anydbm.open(self.file, self.mode)
23
24    def __contains__(self, key):
25        key = key.encode('utf-8')
26        return key in self._cache or key in self._dbm
27
28    def __getitem__(self, key):
29        key = key.encode('utf-8')
30        if key in self._cache:
31            return self._cache[key]
32        return self._cache.setdefault(key, pickle.loads(self._dbm[key]))
33
34    def __setitem__(self, key, value):
35        key = key.encode('utf-8')
36        self._cache[key] = self._flush[key] = value
37
38    def __delitem__(self, key):
39        found = False
40        key = key.encode('utf-8')
41        for data in (self._cache, self._flush, self._dbm):
42            if key in data:
43                del data[key]
44                found = True
45        if not found:
46            raise KeyError(key)
47
48    def keys(self):
49        keys = set(self._cache.keys())
50        keys.update(self._dbm.keys())
51        return [key.decode('utf-8') for key in keys]
52
53    def sync(self):
54        for key, value in self._flush.iteritems():
55            self._dbm[key] = pickle.dumps(value, 2)
56        self._dbm.sync()
57        self._flush = {}
58        self._cache = {}
59
60
61def synchronised(func):
62    """ Locking decorator. """
63    import pyndexter.portalocker as portalocker
64    from time import strftime, localtime, time
65
66    def synchronised(indexer, *args, **kwargs):
67        if indexer._locked:
68            return func(indexer, *args, **kwargs)
69
70        if indexer.mode == READONLY:
71            mode = portalocker.LOCK_SH
72        else:
73            mode = portalocker.LOCK_EX
74        lockfile = open(indexer.lock_file, 'w+')
75        portalocker.lock(lockfile, mode)
76        # XXX This is not atomic
77        indexer._locked = True
78        lockfile.write(strftime('%x %X', localtime(time())))
79        try:
80            return func(indexer, *args, **kwargs)
81        finally:
82            indexer._locked = False
83            lockfile.close()
84    return synchronised
85
86
87class DefaultIndexer(Indexer):
88    """ Default indexer, using bigrams. """
89
90    capabilities = CAP_READONLY | CAP_HITCOUNT | CAP_UNION | \
91                   CAP_INTERSECTION | CAP_ITERATION | CAP_LIST | \
92                   CAP_ATTRIBUTES | CAP_WHOLEWORD | CAP_ASTERISK
93
94    _tokeniser = re.compile(r'\w+')
95
96    def __init__(self, path, source=None, mode=READWRITE, flush_every=128):
97        Indexer.__init__(self, source, mode, os.path.join(path, 'state.db'))
98        self.path = path
99        self._init_env(self.path)
100        self.lock_file = os.path.join(self.path, 'lock')
101        self._locked = False
102        self._open(mode)
103        self._flush_every = flush_every
104        self._index_count = 0
105
106    def index(self, document):
107        self._assert_rw()
108        if isinstance(document, basestring):
109            document = self.fetch(document)
110        node_words = set()
111        for word in set(self._tokeniser.findall(document.content)):
112            word = u' ' + word + u' '
113            # Split word into bigrams and add to the bigram LUT
114            for bigram in self._bigram_word(word):
115                if bigram in self.bigrams:
116                    words = self.bigrams[bigram]
117                    words.add(word)
118                    self.bigrams[bigram] = words
119                else:
120                    self.bigrams[bigram] = set([word])
121
122            # Update word:uri mapping
123            if word in self.words:
124                uris = self.words[word]
125                uris.add(document.uri)
126                self.words[word] = uris
127            else:
128                self.words[word] = set([document.uri])
129           
130            # Update attributes
131            self.attributes[document.uri] = deepcopy(document.attributes)
132
133        self.uris[document.uri] = node_words
134
135        self._index_count += 1
136        if not self._index_count % self._flush_every:
137            self.words.sync()
138            self.bigrams.sync()
139            self.uris.sync()
140            self.attributes.sync()
141           
142    index = synchronised(index)
143
144    def discard(self, document):
145        self._assert_rw()
146        for word in self.uris[document.uri]:
147            word_uris = self.words[word]
148            word_uris.discard(document.uri)
149            self.words[word] = word_uris
150    discard = synchronised(discard)
151
152    def search(self, phrase, order_by=None, order_ascending=True,
153               order_type=str, intersection=True):
154        all_words = {}
155        words = [word.lower() for word in phrase.split()]
156        # First, find all possible words that each search word matches
157        for idx, word in enumerate(words):
158            if word[0] == '*':
159                word = word[1:]
160            else:
161                word = u' ' + word
162            if word[-1] == '*':
163                word = word[:-1]
164            else:
165                word = word + u' '
166            words[idx] = word
167            bigrams = self._bigram_word(word)
168            all_words[word] = set([w for w in self._bigram_search(bigrams)
169                                   if word in w])
170
171#        # Next, find the intersection/union of all files that all words appear
172#        # in
173#        if not intersection:
174#            all_uris = set()
175#            for word in words:
176#                all_files.update(self.words[fullword
177
178        first_set = 1
179        all_uris = set()
180        for word in words:
181            # Find all uris that word appears in
182            word_uris = set()
183            for fullword in all_words[word]:
184                word_uris.update(set(self.words[fullword]))
185
186            if first_set:
187                all_uris = word_uris
188                first_set = 0
189            else:
190                all_uris.intersection_update(word_uris)
191        return DefaultSearch(self, phrase, all_uris)
192    search = synchronised(search)
193
194    def close(self):
195        self.sync()
196        self.words = self.bigrams = self.uris = None
197
198    def sync(self):
199        if self.mode == READWRITE:
200            self.words.sync()
201            self.bigrams.sync()
202            self.uris.sync()
203            self.attributes.sync()
204            self._sync_source_state()
205    sync = synchronised(sync)
206
207    # Internal methods
208    def _open(self, mode):
209        # word:uri mapping
210        self.words = PersistentDict(os.path.join(self.path, 'words.db'), mode, 8192)
211        # bigram:word
212        self.bigrams = PersistentDict(os.path.join(self.path, 'bigrams.db'), mode, 4096)
213        # uri:words mapping
214        self.uris = PersistentDict(os.path.join(self.path, 'uris.db'), mode, 32)
215        # uri:attribute mapping
216        self.attributes = PersistentDict(os.path.join(self.path, 'attributes.db'), mode, 32)
217    _open = synchronised(_open)
218
219    def _bigram_word(word):
220        for start in range(0, len(word) - 1):
221            yield word[start:start + 2]
222    _bigram_word = staticmethod(_bigram_word)
223
224    def _bigram_search(self, bigrams):
225        """ Find all words containing matching bigrams. """
226        first_hit = 1
227        words = set()
228        for bigram in bigrams:
229            if bigram in self.bigrams:
230                if first_hit:
231                    words = self.bigrams[bigram]
232                    first_hit = 0
233                else:
234                    words.intersection_update(set(self.bigrams[bigram]))
235            else:
236                return ()
237        return words
238    _bigram_search = synchronised(_bigram_search)
239
240
241class DefaultSearch(Search):
242    def __iter__(self):
243        for uri in self.context:
244            yield Hit(**self.indexer.attributes[uri])
245
246    def __len__(self):
247        return len(self.context)
248
249    def __getitem__(self, index):
250        return self.context[index]
Note: See TracBrowser for help on using the browser.