Here is the code that I am working with. Currently my goal was for it to divide each chunk into smaller parts based on the number of threads, which should provide better load balancing across the threads. I am new to using threading in python, so I am not sure if what I am doing is optimal or not.
Here is the code I have created:
#so far this is the most optimal
import heapq
import re
import time
import psutil
from collections import defaultdict
import requests
import tracemalloc
from concurrent.futures import ThreadPoolExecutor
import os
# Load stop words
stop_words_url = "https://gist.githubusercontent.com/sebleier/554280/raw/7e0e4a1ce04c2bb7bd41089c9821dbcf6d0c786c/NLTK's%2520list%2520of%2520english%2520stopwords"
stop_words = set(requests.get(stop_words_url).text.split())
def tokenize(line):
return re.findall(r'\b\w+\b', line)
def count_words(chunk):
word_count = defaultdict(int)
for line in chunk:
words = tokenize(line)
for word in words:
if word.lower() not in stop_words:
word_count[word] += 1
return word_count
def top_k_words(word_count, k):
return heapq.nlargest(k, word_count.items(), key=lambda x: x[1])
def analyze_performance(file_path, k=10, chunk_size=10000, num_threads=os.cpu_count()):
start_time = time.time()
tracemalloc.start()
word_count = defaultdict(int)
with open(file_path, 'r', encoding='utf-8') as file:
chunk = []
for line in file:
chunk.append(line)
if len(chunk) == chunk_size:
with ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = {executor.submit(count_words, chunk[i::num_threads]) for i in range(num_threads)}
for future in futures:
chunk_word_count = future.result()
for word, count in chunk_word_count.items():
word_count[word] += count
chunk = []
if chunk:
with ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = {executor.submit(count_words, chunk[i::num_threads]) for i in range(num_threads)}
for future in futures:
chunk_word_count = future.result()
for word, count in chunk_word_count.items():
word_count[word] += count
top_k = top_k_words(word_count, k)
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
end_time = time.time()
elapsed_time = end_time - start_time
cpu_percent = psutil.cpu_percent()
print(f"Top {k} words: {top_k}")
print(f"Elapsed time: {elapsed_time:.2f} seconds")
print(f"CPU usage: {cpu_percent}%")
#print(f"Memory usage: {memory_usage / (1024 * 1024):.2f} MB")
return elapsed_time, cpu_percent, peak
if __name__ == "__main__":
file_paths = [
"small_50MB_dataset.txt",
]
chunk_sizes = [10, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000, 10000000000]
for file_path in file_paths:
print(f"Processing {file_path}")
for chunk_size in chunk_sizes:
for numThreads in range(1, os.cpu_count() + 1):
print("Partition Size:", chunk_size / (1024 * 1024), "MB", "chunk size of:", chunk_size)
elapsed_time, cpu_usage, memory_usage = analyze_performance(file_path, chunk_size=chunk_size, num_threads=numThreads)
print("\n")
result = {
"chunk_size": chunk_size,
"num_threads": numThreads,
"elapsed_time": elapsed_time,
"cpu_usage": cpu_usage,
"memory_usage": float(memory_usage / 10**6)
}
results.append(result)
# Create pandas DataFrame from the results
df2 = pd.DataFrame(results)
and here was the output:
+----+--------------+---------------+----------------+-------------+----------------+
| | chunk_size | num_threads | elapsed_time | cpu_usage | memory_usage |
|----+--------------+---------------+----------------+-------------+----------------|
| 40 | 10 | 1 | 51.2827 | 24.8 | 8.01863 |
| 41 | 10 | 2 | 60.1906 | 65.5 | 8.3454 |
| 42 | 100 | 1 | 32.4096 | 64.4 | 8.11009 |
| 43 | 100 | 2 | 33.402 | 60 | 8.16907 |
| 44 | 1000 | 1 | 25.7621 | 62.5 | 8.48084 |
| 45 | 1000 | 2 | 31.2087 | 65 | 9.02304 |
| 46 | 10000 | 1 | 24.5674 | 70.6 | 12.702 |
| 47 | 10000 | 2 | 23.1408 | 63.7 | 13.9474 |
| 48 | 100000 | 1 | 19.4707 | 58.7 | 43.1203 |
| 49 | 100000 | 2 | 21.5641 | 64.6 | 42.0958 |
| 50 | 1e+06 | 1 | 21.23 | 61.9 | 99.1393 |
| 51 | 1e+06 | 2 | 21.2195 | 60.7 | 104.215 |
| 52 | 1e+07 | 1 | 21.5565 | 64.3 | 99.153 |
| 53 | 1e+07 | 2 | 22.712 | 66.1 | 104.216 |
| 54 | 1e+08 | 1 | 20.8239 | 61.9 | 99.1389 |
| 55 | 1e+08 | 2 | 22.5298 | 63.9 | 104.217 |
| 56 | 1e+09 | 1 | 21.4913 | 64.3 | 99.1535 |
| 57 | 1e+09 | 2 | 20.9633 | 58.6 | 104.232 |
| 58 | 1e+10 | 1 | 21.4864 | 64.6 | 99.1389 |
| 59 | 1e+10 | 2 | 22.0327 | 63.9 | 104.216 |
+----+--------------+---------------+----------------+-------------+----------------+
Any advice on where to improve or what to change to get a more optimal solution would be greatly appritiated.