def add_items(self, data):
for vec in data:
# 1. 寻找最近的 ef 个候选节点 (search_layer)
frontier = MinHeap(entry_point)
results = MaxHeap(entry_point)
visited = {entry_point}
while frontier:
curr_dist, curr = frontier.pop()
# 剪枝:如果当前最近的候选 > 结果集中最远的
if curr_dist > results.furthest():
break # 局部最优,停止向该方向扩展
for nb in curr.neighbors:
if nb not in visited:
visited.add(nb)
d = dist(vec, nb)
if d < results.furthest() or len < ef:
frontier.push(nb); results.push(nb)
# 保持 results 大小不超过 ef
if len > ef: results.pop_furthest()
# 2. 选出 M 个最近并连接
neighbors = pick_top_m(results, M)
for nb in neighbors: connect(vec, nb)
def knn_query(self, query, k):
# 搜索逻辑与建图几乎一致,只是不建立连接
frontier = MinHeap(entry); results = MaxHeap(entry)
while frontier:
d, curr = frontier.pop()
# 剪枝:不再深入更远的区域
if d > results.furthest(): break
for nb in curr.neighbors:
if nb not in visited:
dist = calc_dist(query, nb)
if dist < results.furthest():
frontier.push(nb); results.push(nb)
return results.top_k(k)