High-level view of the problem
I have X sources that contain info about assets (hostname, IPs, MACs, os, etc.) in our environment. The sources contain anywhere from 1500 to 150k entries (at least the ones I use now). My script is supposed to query each of them, gather that data, deduplicate it by merging info about the same assets from different sources, and return unified list of all entries. My current implementation does work, but it's slow for bigger datasets. I'm curious if there is better way to accomplish what I'm trying to do.
Universal problem
Deduplication of data by merging similar entries with the caveat that merging two assets might change whether the resulting asset will be similar to the third asset that was similar to the first two before merging.
Example:
~ similarity, + merging
(before) A ~ B ~ C
(after) (A+B) ~ C or (A+B) !~ C
I tried looking for people having the same issue, I only found What is an elegant way to remove duplicate mutable objects in a list in Python?, but it didn't include merging of data which is crucial in my case.
The classes used
Simplified for ease of reading and understanding with unneeded parts removed - general functionality is intact.
class Entry:
def __init__(self, source: List[str], mac: List[str] = [], ip: List[str] = [], hostname: List[str] = [], os: OS = OS.UNKNOWN, details: dict = {}):
# SO: Sorting and sanitization removed for simplicity
self.source = source
self.mac = mac
self.ip = ip
self.hostname = hostname
self.os = os
self.details = details
def __eq__(self, other):
if isinstance(other, Entry):
return (self.source == other.source and
self.os == other.os and
self.hostname == other.hostname and
self.mac == other.mac and
self.ip == other.ip)
return NotImplemented
def is_similar(self, other) -> bool:
def same_entry(l1: list, l2: list) -> bool:
return not set(l1).isdisjoint(l2)
if isinstance(other, Entry):
if self.os == OS.UNKNOWN or other.os == OS.UNKNOWN or self.os == other.os:
empty_hostnames = self.hostname == [] or other.hostname == []
empty_macs = self.mac == [] or other.mac == []
return (same_entry(self.hostname, other.hostname) or
(empty_hostnames and same_entry(self.mac, other.mac)) or
(empty_hostnames and empty_macs and same_entry(self.ip, other.ip)))
return False
def merge(self, other: 'Entry'):
self.source = _merge_lists(self.source, other.source)
self.hostname = _merge_lists(self.hostname, other.hostname)
self.mac = _merge_lists(self.mac, other.mac)
self.ip = _merge_lists(self.ip, other.ip)
self.os = self.os if self.os != OS.UNKNOWN else other.os
self.details = _merge_dicts(self.details, other.details)
def representation(self) -> str:
# Might be useful if anyone wishes to run the code
return f'<Entry from {self.source}: hostname={self.hostname}, MAC={self.mac}, IP={self.ip}, OS={self.os.value}, details={self.details}>'
def _merge_lists(l1: list, l2: list):
return list(set(l1) | set(l2))
def _merge_dicts(d1: dict, d2: dict):
"""
Merge two dicts without overwriting any data.
"""
# If either is empty, return the other one
if not d1:
return d2
if not d2:
return d1
if d1 == d2:
return d1
result = d1
for k, v in d2.items():
if k in result:
result[k + '_'] = v
else:
result[k] = v
return result
class OS(Enum):
'''
Enum specifying the operating system of the asset.
'''
UNKNOWN = 'Unknown'
WINDOWS = 'Windows'
LINUX = 'Linux'
MACOS = 'MacOS'
Algorithms
Eeach algorithm take a list of lists of entries from different sources, eq:
entries = [[entries from source A], [entries from source B], ..., [entries from source Z]]
Main deduplication function
It's the main function used in each algorithm. It takes list of entries from 2 different sources and combines that into list containing assets with information merged if needed.
It's probably the part I need help the most. It's the only way I could think of. Because of that, I focused on how to run this function multiple times faster, but making this one faster would be the best in terms of reducing runtime.
def deduplicate(en1: List[Entry], en2: List[Entry]) -> List[Entry]:
"""
Deduplicates entries from provided lists by merging similar entries.
Entries in the lists are supposed to be already deduplicated.
"""
# If either is empty, return the other one
if not en1:
return en2
if not en2:
return en1
result = []
# Iterate over longer and check for similar in shorter
if len(en2) > len(en1):
en1, en2 = en2, en1
for e in en1:
# walrus operator in Python 3.8 or newer
while (similar := next((y for y in en2 if y.is_similar(e)), None)) is not None:
e.merge(similar)
en2.remove(similar)
del similar
result.append(e)
result.extend(en2)
return result
A reason why normal deduplication (eg. using sets) isn't applicable here is because of merging one entry with another new entries might become similar, eg.:
In [2]: e1 = Entry(['SRC_A'], [], ['1.1.1.1'], [], OS.UNKNOWN)
In [3]: e2 = Entry(['SRC_A'], ['aa:bb:cc:dd:ee:ff'], ['1.1.1.1'], [], OS.UNKNOWN)
In [4]: e3 = Entry(['SRC_A'], ['aa:bb:cc:dd:ee:ff'], [], [], OS.UNKNOWN)
In [5]: e1.is_similar(e2)
Out[5]: True
In [6]: e1.is_similar(e3) # at first it's not similar
Out[6]: False
In [7]: e1.merge(e2)
In [8]: e1.is_similar(e3) # but after merging it is
Out[8]: True
1st approach - sequential
My first idea was the simplest one, just simple recursion.
def dedup_multiple(lists: List[List[Entry]]) -> List[Entry]:
"""Deduplication helper allowing for providing more than 2 sources."""
if len(lists) == 1:
return lists[0]
return deduplicate(lists[0], dedup_multiple(lists[1:]))
2nd approach - multithreading using Pool
That's the approach I'm using at the moment. So far it's the fastest one and fairly simple.
def async_dedup(lists: List[List[Entry]]) -> List[Entry]:
"""Asynchronous deduplication helper allowing for providing more than 2 sources."""
with mp.Pool() as pool:
while len(lists) > 1:
if len(lists) % 2 == 1:
lists.append([])
data = [(lists[i], lists[i+1]) for i in range(0, len(lists), 2)]
lists = pool.map_async(_internal_deduplication, data).get()
return lists[0]
def _internal_deduplication(en):
return deduplicate(*en)
But I realized really fast that if one task takes much longer than the rest (for example because deduplicating the biggest source), everything else wait instead of working.
3rd approach - multithreading using Queue and Process
As I was trying to speed up 2nd approach I came across How to use python multiprocessing pool in continuous loop and Filling a queue and managing multiprocessing in python, and I came up with the following solution.
def async_dedup2(lists: List[List[Entry]]) -> List[Entry]:
tasks_number = min(os.cpu_count(), len(lists) // 2)
args = lists[:tasks_number]
with mp.Manager() as manager:
queue = manager.Queue()
for l in lists[tasks_number:]:
queue.put(l)
processes = []
for arg in args:
proc = mp.Process(target=test, args=(queue, arg, ))
proc.start()
processes.append(proc)
for proc in processes:
proc.join()
return queue.get()
def test(queue: mp.Queue, arg: List[Entry]):
while not queue.empty():
try:
arg2: List[Entry] = queue.get()
except Empty:
continue
arg = deduplicate(arg, arg2)
queue.put(arg)
I thought it would be the best solution as there wouldn't be a moment when a data isn't processed if possible, but after testing it was almost always slightly slower than 2nd approach.
Runtime comparison
Source A 1510
Source B 1509
Source C 5000
Source D 4460
Source E 5000
Source F 2084
Deduplicating.....
SYNC - Execution time: 188.6127771000 - Count: 13540
ASYNC - Execution time: 68.249583 - Count: 13532
ASYNC2 - Execution time: 69.416046 - Count: 13532
Source A 1510
Source B 1509
Source C 11821
Source D 13871
Source E 5001
Source F 2333
Deduplicating.....
ASYNC - Execution time: 424.405793 - Count: 26229
ASYNC2 - Execution time: 522.697551 - Count: 26405