from rdflib import Graph, namespace from collections import defaultdict import time def merge_tuples(a_tup, b_tup, b_excl): b_list = list(b_tup) del b_list[b_excl] return tuple(list(a_tup)+b_list) def hash_join(a_table, b_table, a_index=1, b_index=0): # Create a hash dict and fill it with the first table hash_dict = defaultdict(list) for a_tup in a_table: hash_dict[a_tup[a_index]].append(a_tup) # For each element in second table check elements from first table by index return [merge_tuples(a_tup, b_tup, b_index) for b_tup in b_table for a_tup in hash_dict[b_tup[b_index]]] class SortMergeJoin: def get_key(self, a): return self.selected_tuples[a][self.indexes[a]] def active_key(self): return self.get_key(self.a_is_active) def inactive_key(self): return self.get_key(not self.a_is_active) def __init__(self, a_table, b_table, a_index=1, b_index=0): # First sort both tables and create iterators from that self.iterators = { True: iter(sorted(a_table, key=lambda tup: tup[a_index])), False: iter(sorted(b_table, key=lambda tup: tup[b_index])) } self.indexes = { True: a_index, False: b_index } try: self.selected_tuples = { True: next(self.iterators[True]), False: next(self.iterators[False]) } except StopIteration: return self.a_is_active = True self.result = list() def join(self): while True: try: while self.active_key() <= self.inactive_key(): if self.active_key() == self.inactive_key(): self.result.append(merge_tuples(self.selected_tuples[True], self.selected_tuples[False], self.indexes[False])) self.selected_tuples[self.a_is_active] = next(self.iterators[self.a_is_active]) self.a_is_active = not self.a_is_active except StopIteration: break return self.result def nested_loop_join(a_table, b_table, a_index=1, b_index=0): return [merge_tuples(a_tup, b_tup, b_index) for a_tup in a_table for b_tup in b_table if a_tup[a_index] is b_tup[b_index]] def compare(graph): print(f"{len(graph)} records loaded") wsdbm = namespace.Namespace("wsdbm:") rev = namespace.Namespace("rev:") properties = { "follow": wsdbm.follows, "friend": wsdbm.friendOf, "like": wsdbm.likes, "review": rev.hasReview } comp_tables = dict() for p in properties: comp_tables[p] = [(s.n3(), o.n3()) for s, _, o in graph.triples((None, properties[p], None))] hash_start = time.time() joinh1 = hash_join(comp_tables["follow"], comp_tables["friend"]) joinh2 = hash_join(joinh1, comp_tables["like"], 2) joinh3 = hash_join(joinh2, comp_tables["review"], 3) print(f"{time.time()-hash_start}s for Hash Join ({len(joinh3)} items)") merge_sort_start = time.time() joinsm1 = SortMergeJoin(comp_tables["follow"], comp_tables["friend"]).join() joinsm2 = SortMergeJoin(joinsm1, comp_tables["like"], 2).join() joinsm3 = SortMergeJoin(joinsm2, comp_tables["review"], 3).join() print(f"{time.time()-merge_sort_start}s for Sort Merge Join ({len(joinsm3)} items)") loop_start = time.time() joinnl1 = hash_join(comp_tables["follow"], comp_tables["friend"]) joinnl2 = hash_join(joinnl1, comp_tables["like"], 2) joinnl3 = hash_join(joinnl2, comp_tables["review"], 3) print(f"{time.time()-loop_start}s for Nested Loop Join ({len(joinnl3)} items)") g = Graph() g.parse("watdiv-url100k.txt", format="nt") compare(g) h = Graph() h.parse("watdiv.10M.nt", format="nt") compare(h)