1

This error has been arising for quite sometime but doesn't like to appear very frequently, it's time I squashed it.

I see that it appears whenever I have more than a single thread. The applications quite extensive so I'll be posting the code snippet down below.

I'm using the datetime.datetime strptime to format my message into a datetime object. When I use this within a multithreaded function, that error arises on the one thread but works perfectly fine on the other.

The error

Exception in thread Thread-7:
Traceback (most recent call last):
  File "C:\Users\kdarling\AppData\Local\Continuum\anaconda3\envs\lobsandbox\lib\threading.py", line 801, in __bootstrap_inner
    self.run()
  File "C:\Users\kdarling\AppData\Local\Continuum\anaconda3\envs\lobsandbox\lib\threading.py", line 754, in run
    self.__target(*self.__args, **self.__kwargs)
  File "d:\kdarling_lob\gryphon_eagle\proj_0012_s2_lobcorr\main\lobx\src\correlator\handlers\inputhandler.py", line 175, in _read_socket
    parsed_submit_time = datetime.datetime.strptime(message['msg']['submit_time'], '%Y-%m-%dT%H:%M:%S.%fZ')
AttributeError: 'module' object has no attribute '_strptime'

Alright so this is a bit strange as this was called on the first thread but the second thread works completely fine by using datetime.datetime.

Thoughts

  • Am I overwriting anything? It is Python afterall. Nope, I don't use datetime anywhere.
  • I am using inheritance through ABC, how about the parent? Nope, this bug was happening long before and I don't overwrite anything in the parent.

My next go to was thinking "Is is this Python blackmagic with the Datetime module?" in which I decided to time.strptime.

The error

Exception in thread Thread-6:
Traceback (most recent call last):
  File "C:\Users\kdarling\AppData\Local\Continuum\anaconda3\envs\lobsandbox\lib\threading.py", line 801, in __bootstrap_inner
    self.run()
  File "C:\Users\kdarling\AppData\Local\Continuum\anaconda3\envs\lobsandbox\lib\threading.py", line 754, in run
    self.__target(*self.__args, **self.__kwargs)
  File "d:\kdarling_lob\gryphon_eagle\proj_0012_s2_lobcorr\main\lobx\src\correlator\handlers\inputhandler.py", line 176, in _read_socket
    parsed_submit_time = time.strptime(message['msg']['submit_time'], '%Y-%m-%dT%H:%M:%S.%fZ')
AttributeError: 'module' object has no attribute '_strptime_time'

The code

import datetime
import heapq
import json
import os
import socket
import sys
import time

from io import BytesIO
from threading import Thread

from handler import Handler


class InputStreamHandler(Handler):

    def __init__(self, configuration, input_message_heap):
        """
        Initialization function for InputStreamHandler.
        :param configuration: Configuration object that stores specific information.
        :type configuration: Configuration
        :param input_message_heap: Message heap that consumers thread will populate.
        :type input_message_heap: Heap
        """
        super(InputStreamHandler, self).__init__()
        self.release_size = configuration.get_release_size()
        self.input_src = configuration.get_input_source()
        self.input_message_heap = input_message_heap
        self.root_path = os.path.join(configuration.get_root_log_directory(), 'input', 'sensor_data')
        self.logging = configuration.get_logger()
        self.Status = configuration.Status

        self.get_input_status_fn = configuration.get_input_functioning_status
        self.update_input_status = configuration.set_input_functioning_status

        if configuration.get_input_state() == self.Status.ONLINE:
            self._input_stream = Thread(target=self._spinup_sockets)
        elif configuration.get_input_state() == self.Status.OFFLINE:
            self._input_stream = Thread(target=self._read_files)

    def start(self):
        """
        Starts the input stream thread to begin consuming data from the sensors connected.
        :return: True if thread hasn't been started, else False on multiple start fail.
        """
        try: 
            self.update_input_status(self.Status.ONLINE)
            self._input_stream.start()
            self.logging.info('Successfully started Input Handler.')
        except RuntimeError:
            return False
        return True

    def status(self):
        """
        Displays the status of the thread, useful for offline reporting.
        """
        return self.get_input_status_fn()

    def stop(self):
        """
        Stops the input stream thread by ending the looping process.
        """
        if self.get_input_status_fn() == self.Status.ONLINE:
            self.logging.info('Closing Input Handler execution thread.')
            self.update_input_status(self.Status.OFFLINE)
            self._input_stream.join()

    def _read_files(self):
        pass

    def _spinup_sockets(self):
        """
        Enacts sockets onto their own thread to collect messages.
        Ensures that blocking doesn't occur on the main thread.
        """
        active_threads = {}
        while self.get_input_status_fn() == self.Status.ONLINE:
            # Check if any are online
            if all([value['state'] == self.Status.OFFLINE for value in self.input_src.values()]):
                self.update_input_status(self.Status.OFFLINE)
                for active_thread in active_threads.values():
                    active_thread.join()
                break

            for key in self.input_src.keys():
                # Check if key exists, if not, spin up call
                if (key not in active_threads  or not active_threads[key].isAlive()) and self.input_src[key]['state'] == self.Status.ONLINE:
                    active_threads[key] = Thread(target=self._read_socket, args=(key, active_threads,))
                    active_threads[key].start()
        print(self.input_src)

    def _read_socket(self, key, cache):
        """
        Reads data from a socket, places message into the queue, and pop the key.
        :param key: Key corresponding to socket.
        :type key: UUID String
        :param cache: Key cache that corresponds the key and various others.
        :type cache: Dictionary
        """
        message = None
        try:
            sensor_socket = self.input_src[key]['sensor']

        ...

            message = json.loads(stream.getvalue().decode('utf-8'))
            if 'submit_time' in message['msg'].keys():
        # Inherited function
                self.write_to_log_file(self.root_path + key, message, self.release_size)
                message['key'] = key
                parsed_submit_time = time.strptime(message['msg']['submit_time'], '%Y-%m-%dT%H:%M:%S.%fZ')
                heapq.heappush(self.input_message_heap, (parsed_submit_time, message))
            cache.pop(key)
        except:
        pass

Additional thought

  • When this error wasn't being thrown, the two threads share a common function as seen called write_to_log_file. Sometimes, an error would occur where when the write_to_log_file was checking if the OS has a specific directory, it would return False even though it was there. Could this be something with Python and accessing the same function at the same time, even for outside modules? The error was never consistent as well.

Overall, this error won't arise when only running a single thread/connection.

Kyle
  • 793
  • 6
  • 24
  • 1
    Possibly related https://stackoverflow.com/questions/32245560/module-object-has-no-attribute-strptime-with-several-threads-python, https://stackoverflow.com/questions/51619040/attributeerror-module-object-has-no-attribute-strptime-threading-time-str – snakecharmerb Jul 20 '19 at 08:32
  • @snakecharmerb Exactly the problem, crazy Python 2 has this problem. – Kyle Jul 22 '19 at 12:37

0 Answers0