0

I am trying to use Google's Python API for Cloud Logging (here's their library repo and their documentation).

I am also trying to use Python threads (specifically the multiprocessing Pool to speed up the processing of a list. The issue is that after using the logger in the thread pool, if I use it again in my main function it fails (or hangs indefinitely).

Example code

Here's some example code that fails for me:

from google.cloud import logging
from multiprocessing import Pool
from threading import Lock

GENERAL_LOGGER = 'something'

log_client = logging.Client()
logger = log_client.logger(GENERAL_LOGGER)

logger_lock = Lock()


def main() -> None:
    global logger

    logger.log_struct({
        "msg": "This works",
    })

    items = [1, 2, 3, 4, 5]

    with Pool(2) as p:
        p.map(test_thread, items)

    logger.log_struct({
        "msg": "This fails",
    })


def test_thread(num) -> None:
    global logger

    # do some intense task...

    # then log
    logger_lock.acquire()
    logger.log_struct({"number": num}) # this succeeds
    logger_lock.release()


if __name__ == '__main__':
    main()

For the above code, once it tries to log This fails, it does not log, and instead hangs indefinitely. I then have to ^C, yielding the output below:

E0503 18:54:34.153646870  786847 ssl_transport_security.cc:510] Corruption detected.
E0503 18:54:34.154107944  786847 ssl_transport_security.cc:486] error:1e000065:Cipher functions:OPENSSL_internal:BAD_DECRYPT
E0503 18:54:34.154475815  786847 ssl_transport_security.cc:486] error:1000008b:SSL routines:OPENSSL_internal:DECRYPTION_FAILED_OR_BAD_RECORD_MAC
E0503 18:54:34.154670516  786847 secure_endpoint.cc:208]     Decryption error: TSI_DATA_CORRUPTED
E0503 18:54:34.154883721  786847 ssl_transport_security.cc:537] SSL_write failed with error SSL_ERROR_SSL.
^C
Process ForkPoolWorker-2:
Process ForkPoolWorker-1:
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 48, in mapstar
    return list(map(*args))
  File "so.py", line 38, in test_thread
    logger_lock.acquire()
KeyboardInterrupt
Traceback (most recent call last):
  File "so.py", line 44, in <module>
    main()
  File "so.py", line 25, in main
    p.map(test_thread, items)
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 765, in get
    self.wait(timeout)
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 762, in wait
    self._event.wait(timeout)
  File "/usr/lib/python3.8/threading.py", line 558, in wait
    signaled = self._cond.wait(timeout)
  File "/usr/lib/python3.8/threading.py", line 302, in wait
    waiter.acquire()
KeyboardInterrupt

My real error

The actual code I am having an issue with is too long to put here, however it may be of help to put its printed error here (since it does seem to differ slightly):

E0503 18:36:42.763982000  769264 ssl_transport_security.cc:510] Corruption detected.
E0503 18:36:42.764023851  769264 ssl_transport_security.cc:486] error:1e000065:Cipher functions:OPENSSL_internal:BAD_DECRYPT
E0503 18:36:42.764032243  769264 ssl_transport_security.cc:486] error:1000008b:SSL routines:OPENSSL_internal:DECRYPTION_FAILED_OR_BAD_RECORD_MAC
E0503 18:36:42.764038246  769264 secure_endpoint.cc:208]     Decryption error: TSI_DATA_CORRUPTED
E0503 18:36:42.764096712  769264 ssl_transport_security.cc:537] SSL_write failed with error SSL_ERROR_SSL.
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/google/api_core/grpc_helpers.py", line 73, in error_remapped_callable
    return callable_(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/grpc/_channel.py", line 946, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/usr/local/lib/python3.8/dist-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
        status = StatusCode.UNKNOWN
        details = "Stream removed"
        debug_error_string = "{"created":"@1620067002.764071153","description":"Error received from peer ipv4:172.217.11.202:443","file":"src/core/lib/surface/call.cc","file_line":1067,"grpc_message":"Stream removed","grpc_status":2}"
>

Traceback (most recent call last):
  File "/opt/scraper", line 331, in <module>
    main()
  File "/opt/scraper", line 162, in main
    logger.log_struct({
  File "/usr/local/lib/python3.8/dist-packages/google/cloud/logging_v2/logger.py", line 181, in log_struct
    self._do_log(client, StructEntry, info, **kw)
  File "/usr/local/lib/python3.8/dist-packages/google/cloud/logging_v2/logger.py", line 134, in _do_log
    client.logging_api.write_entries([api_repr])
  File "/usr/local/lib/python3.8/dist-packages/google/cloud/logging_v2/_gapic.py", line 149, in write_entries
    self._gapic_api.write_log_entries(request=request)
  File "/usr/local/lib/python3.8/dist-packages/google/cloud/logging_v2/services/logging_service_v2/client.py", line 591, in write_log_entries
    response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,)
  File "/usr/local/lib/python3.8/dist-packages/google/api_core/gapic_v1/method.py", line 145, in __call__
    return wrapped_func(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/google/api_core/retry.py", line 281, in retry_wrapped_func
    return retry_target(
  File "/usr/local/lib/python3.8/dist-packages/google/api_core/retry.py", line 184, in retry_target
    return target()
  File "/usr/local/lib/python3.8/dist-packages/google/api_core/grpc_helpers.py", line 75, in error_remapped_callable
    six.raise_from(exceptions.from_grpc_error(exc), exc)
  File "<string>", line 3, in raise_from
google.api_core.exceptions.Unknown: None Stream removed

It is potentially helpful to note that as long as I don't use threads everything works great, as expected. Something about using threads (specifically with Google's logger) causes this to break.

I am not super familiar with Python threading, so maybe this is a novice mistake. Many thanks for any help in advance.

1 Answers1

2

Figured it out (at least enough for my use case).

It seems that my issue was a misunderstanding with the how the multiprocessing Pool is meant to work. It seems that "in multiprocessing, an interpreter is created for every child process. The situation where threads fighting for GIL simple doesn’t exist as there is always only a main thread in every process" 1. (The GIL is explained in the article.)

This would be helpful if my code was CPU bound, but since I am I/O bound, "threading is still an appropriate model if you want to run multiple I/O-bound tasks simultaneously" 2. This seems to apply.

It seems that since multiprocessing has many interpreters they weren't able to share whatever rpc connection the Google logging library uses, causing it to fail.

I was able to find another SO question that references a Thread Pool API in Python that uses threads instead of multiple processes.

Using this along with Locks I was able to solve my issue!

[1] https://hackernoon.com/concurrent-programming-in-python-is-not-what-you-think-it-is-b6439c3f3e6a

[2] https://docs.python.org/3/library/threading.html#threading.Thread