5

I'm writing program in Python which would be able to run untrusted python code in some kind of sandbox. So, I need a way to limit the amount of memory that untrusted code can allocate. At now I can limit the maximum length of range(), list, dictionary and the others by overriding default python data structures in my sandboxed environment.

Any ideas?

unutbu
  • 842,883
  • 184
  • 1,785
  • 1,677
Abzac
  • 2,631
  • 5
  • 21
  • 24
  • You probably just about need to do this at the OS level, not inside of Python (e.g., in Windows you might want to use a job object). – Jerry Coffin May 27 '13 at 19:38
  • Would help to specify which OS you need this to work on. – Aya May 27 '13 at 19:47
  • My OS is Windows but I'm planning to do everything in cross platform way. – Abzac May 27 '13 at 19:50
  • You really can't do this without *some* OS-specific code, as existing OSes (Windows, Mac, Linux, etc) just vary too much. You can take a cue from `subprocess`, though: `import sys` and then use code like `mswindows = (sys.platform == "win32")`, and define functions inside `if mswindows: ... else: ...`. – torek May 27 '13 at 21:28

2 Answers2

7

Under Unix, you could use resource.setrlimit(resource.RLIMIT_AS, ...) to restrict "the maximum area (in bytes) of address space which may be taken by the process."

import sys
import resource
soft, hard = 10**7, 10**7
# soft, hard = 10**8, 10**8   # uncommenting this allows program to finish
resource.setrlimit(resource.RLIMIT_AS,(soft, hard))
memory_hog = {}
try:
    for x in range(10000):
        print(x)
        memory_hog[str(x)]='The sky is so blue'
except MemoryError as err:
    sys.exit('memory exceeded')
    # memory exceeded
unutbu
  • 842,883
  • 184
  • 1,785
  • 1,677
2

I don't know how it is done under Windows. Hopefully someone else can supply that part of the solution.

Here's some example code to set the limit on Windows using ctypes...

import ctypes

PROCESS_SET_QUOTA = 0x100
PROCESS_TERMINATE = 0x1
JobObjectExtendedLimitInformation = 9
JOB_OBJECT_LIMIT_PROCESS_MEMORY = 0x100


class IO_COUNTERS(ctypes.Structure):
    _fields_ = [('ReadOperationCount', ctypes.c_uint64),
                ('WriteOperationCount', ctypes.c_uint64),
                ('OtherOperationCount', ctypes.c_uint64),
                ('ReadTransferCount', ctypes.c_uint64),
                ('WriteTransferCount', ctypes.c_uint64),
                ('OtherTransferCount', ctypes.c_uint64)]


class JOBOBJECT_BASIC_LIMIT_INFORMATION(ctypes.Structure):
    _fields_ = [('PerProcessUserTimeLimit', ctypes.c_int64),
                ('PerJobUserTimeLimit', ctypes.c_int64),
                ('LimitFlags', ctypes.c_uint32),
                ('MinimumWorkingSetSize', ctypes.c_void_p),
                ('MaximumWorkingSetSize', ctypes.c_void_p),
                ('ActiveProcessLimit', ctypes.c_uint32),
                ('Affinity', ctypes.c_void_p),
                ('PriorityClass', ctypes.c_uint32),
                ('SchedulingClass', ctypes.c_uint32)]


class JOBOBJECT_EXTENDED_LIMIT_INFORMATION(ctypes.Structure):
    _fields_ = [('BasicLimitInformation', JOBOBJECT_BASIC_LIMIT_INFORMATION),
                ('IoInfo', IO_COUNTERS),
                ('ProcessMemoryLimit', ctypes.c_void_p),
                ('JobMemoryLimit', ctypes.c_void_p),
                ('PeakProcessMemoryUsed', ctypes.c_void_p),
                ('PeakJobMemoryUsed', ctypes.c_void_p)]


# Set memory limit for process with specfied 'pid', to specified 'size' in bytes
def set_limit(pid, size):

    job_info = JOBOBJECT_EXTENDED_LIMIT_INFORMATION()
    out_size = ctypes.c_uint32()

    job = ctypes.windll.kernel32.CreateJobObjectA(None, None)
    assert job != 0

    success = ctypes.windll.kernel32.QueryInformationJobObject(job,
                                                               JobObjectExtendedLimitInformation,
                                                               ctypes.POINTER(JOBOBJECT_EXTENDED_LIMIT_INFORMATION)(job_info),
                                                               ctypes.sizeof(JOBOBJECT_EXTENDED_LIMIT_INFORMATION),
                                                               ctypes.POINTER(ctypes.c_uint32)(out_size))
    assert success

    job_info.BasicLimitInformation.LimitFlags |= JOB_OBJECT_LIMIT_PROCESS_MEMORY
    job_info.ProcessMemoryLimit = size
    success = ctypes.windll.kernel32.SetInformationJobObject(job,
                                                             JobObjectExtendedLimitInformation,
                                                             ctypes.POINTER(JOBOBJECT_EXTENDED_LIMIT_INFORMATION)(job_info),
                                                             ctypes.sizeof(JOBOBJECT_EXTENDED_LIMIT_INFORMATION))
    assert success

    process = ctypes.windll.kernel32.OpenProcess(PROCESS_SET_QUOTA | PROCESS_TERMINATE,
                                                 False, pid)
    assert process != 0

    success = ctypes.windll.kernel32.AssignProcessToJobObject(job, process)
    assert success


    success = ctypes.windll.kernel32.CloseHandle(job)
    assert success

    success = ctypes.windll.kernel32.CloseHandle(process)
    assert success


if __name__ == '__main__':

    import os

    five_mb = 5 * 1024 * 1024

    def can_we_allocate_five_mb():
        try:
            s = 'x' * five_mb
            return True
        except MemoryError:
            return False

    print can_we_allocate_five_mb()
    set_limit(os.getpid(), five_mb)
    print can_we_allocate_five_mb()

...although there's probably no need to create a separate job objects for each process - you should be able to associate all restricted processes with a single job.

Aya
  • 39,884
  • 6
  • 55
  • 55
  • In Python 3 you need to use CreateJobObjectW instead of CreateJobObjectA else only the first character of the job name is seen by Windows. – Roland Pihlakas Jan 09 '18 at 16:55