5

I'm beginning to learn Spark using pyspark and wonder what the meaning of the following log message is?

UserWarning: Please install psutil to have better support with spilling

The operation that is causing the spill is a join between two RDDs:

print(user_types.join(user_genres).collect())

This may sound somewhat obvious, but my first question is

I did indeed install psutil, and the warning went away, but I'd like to understand what exactly is occurring. There is a very similar question here, but the OP was asking mostly how to just install psutil.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
Yu Chen
  • 6,540
  • 6
  • 51
  • 86
  • This warning doesn't indicate a spill (it [is invoked on module import](https://github.com/apache/spark/blob/0ebb0c0d4dd3e192464dc5e0e6f01efa55b945ed/python/pyspark/shuffle.py#L34-L69) before any real processing takes place) - it just means that you don't use GNU/Linux and don't have recommended dependencies installed. Spills, if present, will be visible in a log and / or Spark web UI. – Alper t. Turker Jul 07 '18 at 22:35
  • @Yu Chen Please ask the other questions separately. – Jacek Laskowski Jul 08 '18 at 06:06
  • @Yu Chen please accept the answer if you found it useful enough, so that others can also benefit from it. – Rahul Chawla Jul 10 '18 at 05:35

1 Answers1

7

Spill here means writing the in-memory dataframes to disk, which reduces the performance of pyspark, since writing to disk is slow.

Why psutil

To check the used memory of node.

This is the original snippet from pyspark source code shuffle.py taken from here which throws the warning. Below code defines a function to get the used memory if psutil is present or if the system is linux.

importing psutil and defining get_used_memory

try:
    import psutil
    def get_used_memory():
        """ Return the used memory in MB """
        process = psutil.Process(os.getpid())
        if hasattr(process, "memory_info"):
            info = process.memory_info()
        else:
            info = process.get_memory_info()
        return info.rss >> 20
except ImportError:
    def get_used_memory():
        """ Return the used memory in MB """
        if platform.system() == 'Linux':
            for line in open('/proc/self/status'):
                if line.startswith('VmRSS:'):
                    return int(line.split()[1]) >> 10
        else:
            warnings.warn("Please install psutil to have better "
                          "support with spilling")
            if platform.system() == "Darwin":
                import resource
                rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
                return rss >> 20
            # TODO: support windows
        return 0

Writing to disk

Below code invokes writing the dataframes to disk if used memory of node is greater than pre-set limit.

def mergeCombiners(self, iterator, check=True):
        """ Merge (K,V) pair by mergeCombiner """
        iterator = iter(iterator)
        # speedup attribute lookup
        d, comb, batch = self.data, self.agg.mergeCombiners, self.batch
        c = 0
        for k, v in iterator:
            d[k] = comb(d[k], v) if k in d else v
            if not check:
                continue
            c += 1
            if c % batch == 0 and get_used_memory() > self.memory_limit:
                self._spill()
                self._partitioned_mergeCombiners(iterator, self._next_limit())
                break

Spill

This code actually writes aka spills the dataframes to disk in case the memory used is greater than preset limit.

def _spill(self):
        """
        dump already partitioned data into disks.
        It will dump the data in batch for better performance.
        """
        global MemoryBytesSpilled, DiskBytesSpilled
        path = self._get_spill_dir(self.spills)
        if not os.path.exists(path):
            os.makedirs(path)
        used_memory = get_used_memory()
        if not self.pdata:
            # The data has not been partitioned, it will iterator the
            # dataset once, write them into different files, has no
            # additional memory. It only called when the memory goes
            # above limit at the first time.
            # open all the files for writing
            streams = [open(os.path.join(path, str(i)), 'w')
                       for i in range(self.partitions)]
            for k, v in self.data.iteritems():
                h = self._partition(k)
                # put one item in batch, make it compatitable with load_stream
                # it will increase the memory if dump them in batch
                self.serializer.dump_stream([(k, v)], streams[h])
            for s in streams:
                DiskBytesSpilled += s.tell()
                s.close()
            self.data.clear()
            self.pdata = [{} for i in range(self.partitions)]
        else:
            for i in range(self.partitions):
                p = os.path.join(path, str(i))
                with open(p, "w") as f:
                    # dump items in batch
                    self.serializer.dump_stream(self.pdata[i].iteritems(), f)
                self.pdata[i].clear()
                DiskBytesSpilled += os.path.getsize(p)
        self.spills += 1
        gc.collect()  # release the memory as much as possible
        MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
Community
  • 1
  • 1
Rahul Chawla
  • 1,048
  • 10
  • 15