1

I'm creating a wrapper for the Pushover API called py_pushover. One of the features is the ability to connect directly with their servers using a websocket. I've run into a little snafu when trying to 'test' my asynchrounous code. Running the following:

import py_pushover as py_po
from tests.helpers import app_key, device_id, secret
import time


def print_msg(messages):
    print(messages)

if __name__ == "__main__":
    cm = py_po.client.ClientManager(app_key, secret=secret, device_id=device_id)
    cm.listen_async(print_msg)

    for i in range(10):
        time.sleep(3)
        print("Meh")

    cm.stop_listening()
    cm.clear_server_messages()

Works exactly as desired. However, when I go to wrap the 'callback' function into a test suite:

class TestMessage(unittest.TestCase):
    def setUp(self):
        self.pm = py_po.message.MessageManager(app_key, user_key)
        self.client = py_po.client.ClientManager(app_key, secret=secret, device_id=device_id)
        self.cleanUpClient()
        self.client.listen_async(self.client_message_receieved)  # <--- BREAKS HERE!

    def tearDown(self):
        self.client.stop_listening()
        self.cleanUpClient()

    def cleanUpClient(self):
        self.client.retrieve_message()
        for msg in self.client.messages:
            if msg['priority'] >= py_po.PRIORITIES.EMERGENCY and msg['acked'] != 1:
                self.client.acknowledge_message(msg['receipt'])
        self.client.clear_server_messages()

        self.client.retrieve_message()
        self.assertEquals(len(self.client.messages), 0)

    def client_message_receieved(self, messages):
        self.stored_messages = messages

    def test_val_msg(self):

        # Testing a normal push message
        send_message = 'Testing normal push'
        self.pm.push_message(send_message, device='test_device')
        # self.client.retrieve_message()

        self.assertEquals(send_message, self.stored_messages[0]['message'])

I get the following error:

TypeError: cannot serialize '_io.TextIOWrapper' object

I'm assuming that this has be due to some pickling error, but I'm not quite sure where the issue is coming from. How do I setup a test suite that will allow me to test multi-processing code that calls a callback function?

My client code is as follows (slimmed down to be less of an eye sore):

import websocket
import logging
from multiprocessing import Process, Pipe

from py_pushover import BaseManager, send, base_url

logging.basicConfig(filename='client.log', level=logging.INFO)

class ClientManager(BaseManager):
    """
    Manages the interface between the Pushover Servers and user.  This can be instantiated with or without the user
    secret and device id.  If no secret is provided, the user MUST login before interfacing with the Pushover servers.
    If no device id is provided, the user MUST register this client as a device before interfacing with the Pushover
    servers.
    """
    _login_url = base_url + "users/login.json"
    _register_device_url = base_url + "devices.json"
    _message_url = base_url + "messages.json"
    _del_message_url = base_url + "devices/{device_id}/update_highest_message.json"
    _ack_message_url = base_url + "receipts/{receipt_id}/acknowledge.json"
    _ws_connect_url = "wss://client.pushover.net/push"
    _ws_login = "login:{device_id}:{secret}\n"

    def __init__(self, app_token, secret=None, device_id=None):
        """
        :param str app_token: application id from Pushover API
        :param str secret: (Optional) user secret given after validation of login
        :param str device_id: (Optional) device id of this client
        :return:
        """
        super(ClientManager, self).__init__(app_token)
        self.__secret__ = secret
        self.__device_id__ = device_id
        self.messages = []
        self._ws_app = websocket.WebSocketApp(
            self._ws_connect_url,
            on_open=self._on_ws_open,
            on_message=self._on_ws_message,
            on_error=self._on_ws_error,
            on_close=self._on_ws_close
        )
        self.__on_msg_receipt__ = None
        self.__p__ = Process()

    def listen(self, on_msg_receipt):
        """
        Listens for messages from the server.  When a message is received, a call to the on_msg_receipt function with a
          single parameter representing the messages received.

        :param on_msg_receipt: function to call when a message is received
        """
        self.__on_msg_receipt__ = on_msg_receipt
        self._ws_app.run_forever()

    def listen_async(self, on_msg_receipt):
        """
        Creates a Process for listening to the Pushover server for new messages.  This process then listens for messages
          from the server.  When a message is received, a call to the on_msg_receipt function with a single parameter
          representing the messages received.

        :param on_msg_receipt: function to call when a message is received
        """
        self.__p__ = Process(target=self.listen, args=(on_msg_receipt,))
        self.__p__.start()  # <-- BREAKS HERE!

    def stop_listening(self):
        """
        Stops the listening process from accepting any more messages.
        """
        if self.__p__:
            self.__p__.terminate()
            self.__p__ = None

    def _on_ws_open(self, ws):
        """
        Function used when the websocket is opened for the first time.

        :param ws: the websocket
        """
        logging.info("Opening connection to Pushover server...")
        ws.send(self._ws_login.format(device_id=self.__device_id__, secret=self.__secret__))
        logging.info("----Server Connection Established----")

    def _on_ws_message(self, ws, message):
        """
        Function used for when the websocket recieves a message.  Per the Pushover API guidelines 1 of 4 responses
        will be sent:

            1. `#` - Keep-alive packet, no response needed.
            2. `!` - A new message has arrived; you should perform a sync.
            3. `R` - Reload request; you should drop your connection and re-connect.
            4. `E` - Error; a permanent problem occured and you should not automatically re-connect.
                     Prompt the user to login again or re-enable the device.

        :param ws: the websocket
        :param message: message received from remote server
        """
        message = message.decode("utf-8")
        logging.debug("Message received: " + message)
        if message == "#":
            pass

        elif message == "!":
            self.retrieve_message()
            if self.__on_msg_receipt__:
                self.__on_msg_receipt__(self.messages)

        elif message == "R":
            logging.info("Reconnecting to server (requested from server)...")
            ws.close()
            self.listen(self.__on_msg_receipt__)

        elif message == "E":
            logging.error("Server connection failure!")

        else:  # message isn't of the type expected.  Raise an error.
            raise NotImplementedError  #todo Implement an appropriate exception

    def _on_ws_error(self, ws, error):
        """
        Function used when the websocket encounters an error.  The error is logged

        :param ws: the websocket
        :param error: the error encountered
        """
        logging.error('Error: ' + error)

    def _on_ws_close(self, ws):
        """
        Function used when the websocket closes the connection to the remote server.

        :param ws: the websocket
        """
        logging.info("----Server Connection Closed----")
        self._ws_app = None

Note: I'm not trying to promote my py_pushover module, but rather linked it here in case more code was needed to be looked at.

James Mertz
  • 8,459
  • 11
  • 60
  • 87
  • Guessing the issue is that for whatever reason the `listen_async` call serializes the callback... and it's trying to serialize your entire TestMessage class. Can you move the callback outsize of the class and see what happens? Somewhat of an aside, I'd recommend not testing against live netowork APIs... it will result in slow tests. Better to mock. – Lee Nov 10 '15 at 00:45
  • What do you mean by 'mock'? – James Mertz Nov 10 '15 at 03:58
  • 1
    https://docs.python.org/dev/library/unittest.mock.html – Lee Nov 10 '15 at 16:13

0 Answers0