A search engine with page rank, in 200 lines
~12 mins read
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
import re
import math
import operator
import logging
from collections import defaultdict, Counter
import numpy as np
class Tokenizer:
def __init__(self, stop_words, ):
self.stop_words = stop_words
def token_generator(self, s):
letters_and_digits_only = re.compile('[a-z0-9]+')
for token in letters_and_digits_only.findall(s.lower()):
if token not in self.stop_words:
yield token
class Scorer:
@staticmethod
def get_tf_idf(tf, number_of_pages, df):
weighted_tf = 1 + math.log10(tf)
idf = math.log10(number_of_pages / df)
return weighted_tf * idf
class Indexer:
def __init__(self, tokenizer, scorer):
self.tokenizer = tokenizer
self.scorer = scorer
def get_count_index(self, pages):
# count terms
count_index = defaultdict(list)
for url, content, links in pages:
counts = Counter(self.tokenizer.token_generator(content))
for token, count in counts.items():
count_index[token].append((url, count))
return count_index
def get_weighted_index(self, count_index, number_of_pages):
# add tf-idf weights
weighted_index = defaultdict(list)
for token, docs in count_index.items():
df = len(docs)
for url, count in docs:
weight = self.scorer.get_tf_idf(count, number_of_pages, df)
weighted_index[token].append((url, weight))
return weighted_index
def get_normalized_index(self, weighted_index):
# normalize tf-idf weights
weights_by_url = defaultdict(list)
for docs in weighted_index.values():
for url, weight in docs:
weights_by_url[url].append(weight)
normalized_index = defaultdict(list)
for term, docs in weighted_index.items():
for url, weight in docs:
doc_norm = np.linalg.norm(weights_by_url.get(url))
if doc_norm == 0:
normalized_weight = weight
else:
normalized_weight = round(weight / doc_norm, 3)
normalized_index[term].append((url, normalized_weight))
return normalized_index
def get_index(self, pages):
number_of_pages = len(pages)
count_index = self.get_count_index(pages)
weighted_index = self.get_weighted_index(count_index, number_of_pages)
normalized_index = self.get_normalized_index(weighted_index)
return normalized_index
class PageRank:
def __init__(self, scorer):
self.max_iterations = 10000
self.teleport_rate = 0.1
self.page_rank = None
self.scorer = scorer
def get_cosine_similarity_scores(self, index, number_of_pages, query):
query_terms = set(query.split())
related_terms = set(term for term in query_terms if term in index)
query_tf_idfs = {term: self.scorer.get_tf_idf(1, number_of_pages, len(index[term])) for term in related_terms}
query_vector_norm = np.linalg.norm(list(query_tf_idfs.values()))
scores = defaultdict(int)
for term in related_terms:
query_term_weight = query_tf_idfs.get(term) / query_vector_norm
for url, weight in index[term]:
scores[url] += weight * query_term_weight
return sorted(scores.items(), key=operator.itemgetter(1), reverse=True)
@staticmethod
def split_link_weight(number_of_pages, url, links):
if not links:
return 1 / number_of_pages
if url in links:
return 1 / len(links)
else:
return 0
def create_transition_matrix(self, pages):
urls = [url for url, content, links in pages]
number_of_pages = len(pages)
transition_probabilities = [
[
self.split_link_weight(number_of_pages, url, links) for url in urls
]
for url, content, links in pages
]
transition_matrix = np.matrix(transition_probabilities)
markov_transition_matrix = self.teleport(number_of_pages, transition_matrix)
return markov_transition_matrix
def teleport(self, number_of_pages, transition_matrix):
return transition_matrix * (1 - self.teleport_rate) + self.teleport_rate / number_of_pages
def power_method(self, number_of_pages, transition_matrix):
# initial ranking score is 1/N for every page
ranks = [1 / number_of_pages] * number_of_pages
for i in range(self.max_iterations):
new_ranks = ranks * transition_matrix
logging.debug(f"step {i} {new_ranks}")
if np.allclose(ranks, new_ranks):
break
ranks = new_ranks
return ranks.reshape(-1, ).tolist().pop()
def create_page_rank(self, pages):
transition_matrix = self.create_transition_matrix(pages)
logging.debug(f"transition_matrix {transition_matrix}")
ranks = self.power_method(len(pages), transition_matrix)
urls = [page[0] for page in pages]
page_rank = {
url: round(rank, 3)
for url, rank in zip(urls, ranks)
}
return page_rank
class SearchEngine():
def __init__(self, indexer, ranker):
self.indexer = indexer
self.ranker = ranker
self.index = self.page_rank = self.number_of_pages = None
def start(self, pages):
self.index = self.indexer.get_index(pages)
self.page_rank = self.ranker.create_page_rank(pages)
self.number_of_pages = len(pages)
def search(self, query):
cosine_similarity_scores = self.ranker.get_cosine_similarity_scores(self.index, self.number_of_pages, query)
logging.debug(f"cosine_similarity_scores {cosine_similarity_scores}")
final_scores = [(url, score * self.page_rank.get(url)) for url, score in cosine_similarity_scores]
results = sorted(final_scores, key=operator.itemgetter(1), reverse=True)
return results
def test_search_engine(pages):
stop_words = {"the", "a", "an", "is", "this", "to"}
tokenizer = Tokenizer(stop_words)
scorer = Scorer()
indexer = Indexer(tokenizer, scorer)
ranker = PageRank(scorer)
engine = SearchEngine(indexer, ranker)
engine.start(pages)
logging.debug(f"page_rank {engine.page_rank}")
logging.debug(engine.index)
while True:
query = input("search > ")
if query:
results = engine.search(query)
if results:
print(results)
else:
print("no results")
if __name__ == "__main__":
pages = [
("a.com", "oh romeo wherefore art thou?", ["b.com", "d.com", "e.com"]),
("b.com", "These Violent Delights Have Violent Ends", ["d.com", "c.com"]),
("c.com", "The fool doth think he is wise, but the wise man knows himself to be a fool.", ["d.com", "b.com"]),
("d.com", "Love all, trust a few, do wrong to none.", ["a.com", "b.com"]),
("e.com", "Though this be madness, yet there is method in't.", ["c.com", "a.com"]),
]
logging.getLogger().setLevel("DEBUG")
test_search_engine(pages)