Spill here means writing the in-memory dataframes to disk, which reduces the performance of pyspark, since writing to disk is slow.
Why psutil
To check the used memory of node.
This is the original snippet from pyspark source code shuffle.py taken from here which throws the warning. Below code defines a function to get the used memory if psutil is present or if the system is linux.
importing psutil and defining get_used_memory
try:
import psutil
def get_used_memory():
""" Return the used memory in MB """
process = psutil.Process(os.getpid())
if hasattr(process, "memory_info"):
info = process.memory_info()
else:
info = process.get_memory_info()
return info.rss >> 20
except ImportError:
def get_used_memory():
""" Return the used memory in MB """
if platform.system() == 'Linux':
for line in open('/proc/self/status'):
if line.startswith('VmRSS:'):
return int(line.split()[1]) >> 10
else:
warnings.warn("Please install psutil to have better "
"support with spilling")
if platform.system() == "Darwin":
import resource
rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
return rss >> 20
# TODO: support windows
return 0
Writing to disk
Below code invokes writing the dataframes to disk if used memory of node is greater than pre-set limit.
def mergeCombiners(self, iterator, check=True):
""" Merge (K,V) pair by mergeCombiner """
iterator = iter(iterator)
# speedup attribute lookup
d, comb, batch = self.data, self.agg.mergeCombiners, self.batch
c = 0
for k, v in iterator:
d[k] = comb(d[k], v) if k in d else v
if not check:
continue
c += 1
if c % batch == 0 and get_used_memory() > self.memory_limit:
self._spill()
self._partitioned_mergeCombiners(iterator, self._next_limit())
break
Spill
This code actually writes aka spills the dataframes to disk in case the memory used is greater than preset limit.
def _spill(self):
"""
dump already partitioned data into disks.
It will dump the data in batch for better performance.
"""
global MemoryBytesSpilled, DiskBytesSpilled
path = self._get_spill_dir(self.spills)
if not os.path.exists(path):
os.makedirs(path)
used_memory = get_used_memory()
if not self.pdata:
# The data has not been partitioned, it will iterator the
# dataset once, write them into different files, has no
# additional memory. It only called when the memory goes
# above limit at the first time.
# open all the files for writing
streams = [open(os.path.join(path, str(i)), 'w')
for i in range(self.partitions)]
for k, v in self.data.iteritems():
h = self._partition(k)
# put one item in batch, make it compatitable with load_stream
# it will increase the memory if dump them in batch
self.serializer.dump_stream([(k, v)], streams[h])
for s in streams:
DiskBytesSpilled += s.tell()
s.close()
self.data.clear()
self.pdata = [{} for i in range(self.partitions)]
else:
for i in range(self.partitions):
p = os.path.join(path, str(i))
with open(p, "w") as f:
# dump items in batch
self.serializer.dump_stream(self.pdata[i].iteritems(), f)
self.pdata[i].clear()
DiskBytesSpilled += os.path.getsize(p)
self.spills += 1
gc.collect() # release the memory as much as possible
MemoryBytesSpilled += (used_memory - get_used_memory()) << 20