2

I have the following scenario:

  • multithreaded application
  • I am not in control of thread creation. This is managed by the framework (in this case celery)
  • I have some objects which are expensive to instantiate, and not thread safe. Making them thread safe is not an option.
  • The objects can be instantiated in multiple places, but if I am reinstantiating the same object in one thread where it has already been defined, the object should be reused.

I have come up with the following pattern:

#!/usr/bin/env python

import threading
import time

class MyObj1:
    def __init__(self, name):
        self.name = name

local = threading.local()
def get_local_obj(key, create_obj, *pars, **kwargs):
    d = local.__dict__
    if key in d: obj = d[key]
    else       :
        obj = create_obj(*pars, **kwargs)
        d[key] = obj
    return obj

class Worker(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        myobj1 = get_local_obj('obj1', MyObj1, (self.name))
        for _ in xrange(3):
            print myobj1.name
            time.sleep(1)

def test():
    ths = [Worker() for _ in xrange(2)]
    for t in ths : t.start()

test()

Here I am myself creating the threads, since this is just a test, but as said, in the real application I am not in control of the threads.

What I am interested in is in the function get_local_obj. I have several questions:

  1. Will this logic guarantee that the objects are not shared between threads?
  2. Will this logic guarantee that the objects are not instantiated more than once in a thread?
  3. Will this memory leak?
  4. Do you have any general comments about this approach? Any better suggestion for the scenario suggested above?

EDIT

Just to clarify: my application is multithraded, but it is not me who is creating the threads. I am simply creating some objects, which happen to run inside threads created by the framework. Some of my objects are not thread safe, so I need to create them only once per thread. Hence get_my_object.

EDIT

local = threading.local() must be defined on the global scope.

blueFast
  • 41,341
  • 63
  • 198
  • 344

3 Answers3

1

What about this one?

class Worker (Thread):
  def __init__(self):
    super(Worker,self).__init__()
    self.m_local = threading.local()

  def get_my_obj(self):
    try:
      obj = self.m_local.my_object
    except AttributeError:
      self.m_local.my_object = create_object()
      obj = self.m_local.my_object
    return obj

  def run(self):
    my_obj = self.get_my_obj()
    # ...

In the end it is similar to your example, just cleaner. You keep all the thread - specific code in one place, run function "does not know" anything about the initialization, it gets my_obj using the getter, ant the getter creates the object only once. threading.local will guarantee you that the data is thread specific - that's its job.

I don't see any reason for memory leak there. In the end, you need to sweat a bit to get the leak in python :)

Jakub M.
  • 32,471
  • 48
  • 110
  • 179
  • Thanks, but that won't do. I am not really creating threads in the application (WorkerThread is not mine). The framework is. And I need a *generic* way of creating thread local objects. Please see my edit. – blueFast Dec 21 '12 at 16:30
  • But your approach to creating the object is interesting. Why do you go the `try / except` route instead of the `__dict__` route that I chose? Any specfic advantage? – blueFast Dec 21 '12 at 16:34
  • I just don't like idea of accessing fields that are mend to be private – Jakub M. Dec 21 '12 at 16:49
  • I see your point. But since I am using a variable key, and not a fixed attribute, I can not use the `try / except` approach. Doing `obj = local[key]` and `local[key] = obj` raises a `TypeError: 'thread._local' object does not support item assignment` – blueFast Dec 21 '12 at 17:51
  • `x = threading.local()`, after that `print x.hello` raises `Attribute error`, but `x.hello = 100; print x.hello` prints the value, this is how you use it. – Jakub M. Dec 21 '12 at 22:48
  • sure, but my attribute has no fixed value. It is variable. I mean, can I do `x.key = 100` when key is a variable? I think I have to use setattr for that. – blueFast Dec 22 '12 at 08:33
  • You can always `x=threading.local(); x.my_container = dict()` and the do whatever you want with this dictionary – Jakub M. Dec 22 '12 at 09:14
1

FWIW, here's a modified version of your code, streamlined somewhat based on an answer and another to related questions. It's still basically the same pattern though.

#!/usr/bin/env python
import threading
import time
threadlocal = threading.local()

class MyObj1(object):
    def __init__(self, name):
        print 'in MyObj1.__init__(), name =', name
        self.name = name

def get_local_obj(varname, factory, *args, **kwargs):
    try:
        return getattr(threadlocal, varname)
    except AttributeError:
        obj = factory(*args, **kwargs)
        setattr(threadlocal, varname, obj)
        return obj

class Worker(threading.Thread):
    def __init__(self):
        super(Worker, self).__init__()

    def run(self):
        myobj1 = get_local_obj('obj1', MyObj1, self.name)
        for _ in xrange(3):
            print myobj1.name
            time.sleep(1)

def test():
    ths = [Worker() for _ in xrange(3)]
    for t in ths:
        t.start()

test()

Actually it's possible to do exactly same thing without a get_local_obj():

#!/usr/bin/env python
import threading
import time
threadlocal = threading.local()

class MyObj1(object):
    def __init__(self, name):
        print 'in MyObj1.__init__(), name =', name
        self.name = name

class Worker(threading.Thread):
    def __init__(self):
        super(Worker, self).__init__()

    def run(self):
        threadlocal.myobj1 = MyObj1(self.name)
        for _ in xrange(3):
            print threadlocal.myobj1.name
            time.sleep(1)

def test():
    ths = [Worker() for _ in xrange(3)]
    for t in ths:
        t.start()

test()
Community
  • 1
  • 1
martineau
  • 119,623
  • 25
  • 170
  • 301
  • I like the `setattr / getattr`. It is cleaner than using `__dict__` and it allows for variable attribute names, which is what I need. The problem with your code, (and my code and Jakub M's code), is that it does not work, as I found the hard way. We need to instantiate only once `threadLocal = threading.local()`, on the global scope, on the main thread. See this answer: http://stackoverflow.com/a/13240093/647991. I will modify my question accordingly. – blueFast Dec 22 '12 at 08:32
  • Yeah, the attribute getting/setting way seems a little more succinct. I wondered about the scoping of the `theading.local()`, fortunately that's easy to fix, thanks for pointing out the problem. I have another idea that I'm trying to figure out how to implement...and will post another answer if it pans out. – martineau Dec 22 '12 at 09:52
0

Here's a another different answer that utilizes an idea I had of having thread-level singletons. It gets completely rid of your get_local_obj() function. I haven't done a lot of testing, but so far it seems to work. It may be more than you want because it literally implements what you said you wanted in your last bullet-point:

  • The objects can be instantiated in multiple places, but if I am reinstantiating the same object in one thread where it has already been defined, the object should be reused.

#!/usr/bin/env python
import threading
import time
threadlocal = threading.local()

class ThreadSingleton(type):
    # called when instances of client classes are created
    def __call__(cls, *args, **kwargs):
        instances = threadlocal.__dict__.setdefault(cls.__name__+'.instances', {})
        if cls not in instances:
            instances[cls] = super(ThreadSingleton, cls).__call__(*args, **kwargs)
        return instances[cls]

class MyClass(object):
    __metaclass__ = ThreadSingleton
    def __init__(self, name):
        print 'in MyClass.__init__(), name =', name
        self.name = name

class Worker(threading.Thread):
    def __init__(self):
        super(Worker, self).__init__()

    def run(self):
        myobj1 = MyClass(self.name)
        for _ in xrange(3):
            print 'myobj1.name:', myobj1.name
            myobj2 = MyClass(self.name+'#2') # this returns myobj1
            print 'myobj2.name:', myobj2.name # so this prints myobj1.name
            time.sleep(1)

def test():
    ths = [Worker() for _ in xrange(3)]
    for t in ths:
        t.start()

test()
Note that the output will be somewhat jumbled as it is generated by the different threads. This can be fixed, but I decided not to complicate the essence of this answer by adding it.
martineau
  • 119,623
  • 25
  • 170
  • 301