I am trying to use Google's Python API for Cloud Logging (here's their library repo and their documentation).
I am also trying to use Python threads (specifically the multiprocessing Pool to speed up the processing of a list. The issue is that after using the logger
in the thread pool, if I use it again in my main
function it fails (or hangs indefinitely).
Example code
Here's some example code that fails for me:
from google.cloud import logging
from multiprocessing import Pool
from threading import Lock
GENERAL_LOGGER = 'something'
log_client = logging.Client()
logger = log_client.logger(GENERAL_LOGGER)
logger_lock = Lock()
def main() -> None:
global logger
logger.log_struct({
"msg": "This works",
})
items = [1, 2, 3, 4, 5]
with Pool(2) as p:
p.map(test_thread, items)
logger.log_struct({
"msg": "This fails",
})
def test_thread(num) -> None:
global logger
# do some intense task...
# then log
logger_lock.acquire()
logger.log_struct({"number": num}) # this succeeds
logger_lock.release()
if __name__ == '__main__':
main()
For the above code, once it tries to log This fails
, it does not log, and instead hangs indefinitely. I then have to ^C
, yielding the output below:
E0503 18:54:34.153646870 786847 ssl_transport_security.cc:510] Corruption detected.
E0503 18:54:34.154107944 786847 ssl_transport_security.cc:486] error:1e000065:Cipher functions:OPENSSL_internal:BAD_DECRYPT
E0503 18:54:34.154475815 786847 ssl_transport_security.cc:486] error:1000008b:SSL routines:OPENSSL_internal:DECRYPTION_FAILED_OR_BAD_RECORD_MAC
E0503 18:54:34.154670516 786847 secure_endpoint.cc:208] Decryption error: TSI_DATA_CORRUPTED
E0503 18:54:34.154883721 786847 ssl_transport_security.cc:537] SSL_write failed with error SSL_ERROR_SSL.
^C
Process ForkPoolWorker-2:
Process ForkPoolWorker-1:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.8/multiprocessing/pool.py", line 125, in worker
result = (True, func(*args, **kwds))
File "/usr/lib/python3.8/multiprocessing/pool.py", line 48, in mapstar
return list(map(*args))
File "so.py", line 38, in test_thread
logger_lock.acquire()
KeyboardInterrupt
Traceback (most recent call last):
File "so.py", line 44, in <module>
main()
File "so.py", line 25, in main
p.map(test_thread, items)
File "/usr/lib/python3.8/multiprocessing/pool.py", line 364, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/lib/python3.8/multiprocessing/pool.py", line 765, in get
self.wait(timeout)
File "/usr/lib/python3.8/multiprocessing/pool.py", line 762, in wait
self._event.wait(timeout)
File "/usr/lib/python3.8/threading.py", line 558, in wait
signaled = self._cond.wait(timeout)
File "/usr/lib/python3.8/threading.py", line 302, in wait
waiter.acquire()
KeyboardInterrupt
My real error
The actual code I am having an issue with is too long to put here, however it may be of help to put its printed error here (since it does seem to differ slightly):
E0503 18:36:42.763982000 769264 ssl_transport_security.cc:510] Corruption detected.
E0503 18:36:42.764023851 769264 ssl_transport_security.cc:486] error:1e000065:Cipher functions:OPENSSL_internal:BAD_DECRYPT
E0503 18:36:42.764032243 769264 ssl_transport_security.cc:486] error:1000008b:SSL routines:OPENSSL_internal:DECRYPTION_FAILED_OR_BAD_RECORD_MAC
E0503 18:36:42.764038246 769264 secure_endpoint.cc:208] Decryption error: TSI_DATA_CORRUPTED
E0503 18:36:42.764096712 769264 ssl_transport_security.cc:537] SSL_write failed with error SSL_ERROR_SSL.
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/google/api_core/grpc_helpers.py", line 73, in error_remapped_callable
return callable_(*args, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/grpc/_channel.py", line 946, in __call__
return _end_unary_response_blocking(state, call, False, None)
File "/usr/local/lib/python3.8/dist-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.UNKNOWN
details = "Stream removed"
debug_error_string = "{"created":"@1620067002.764071153","description":"Error received from peer ipv4:172.217.11.202:443","file":"src/core/lib/surface/call.cc","file_line":1067,"grpc_message":"Stream removed","grpc_status":2}"
>
Traceback (most recent call last):
File "/opt/scraper", line 331, in <module>
main()
File "/opt/scraper", line 162, in main
logger.log_struct({
File "/usr/local/lib/python3.8/dist-packages/google/cloud/logging_v2/logger.py", line 181, in log_struct
self._do_log(client, StructEntry, info, **kw)
File "/usr/local/lib/python3.8/dist-packages/google/cloud/logging_v2/logger.py", line 134, in _do_log
client.logging_api.write_entries([api_repr])
File "/usr/local/lib/python3.8/dist-packages/google/cloud/logging_v2/_gapic.py", line 149, in write_entries
self._gapic_api.write_log_entries(request=request)
File "/usr/local/lib/python3.8/dist-packages/google/cloud/logging_v2/services/logging_service_v2/client.py", line 591, in write_log_entries
response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,)
File "/usr/local/lib/python3.8/dist-packages/google/api_core/gapic_v1/method.py", line 145, in __call__
return wrapped_func(*args, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/google/api_core/retry.py", line 281, in retry_wrapped_func
return retry_target(
File "/usr/local/lib/python3.8/dist-packages/google/api_core/retry.py", line 184, in retry_target
return target()
File "/usr/local/lib/python3.8/dist-packages/google/api_core/grpc_helpers.py", line 75, in error_remapped_callable
six.raise_from(exceptions.from_grpc_error(exc), exc)
File "<string>", line 3, in raise_from
google.api_core.exceptions.Unknown: None Stream removed
It is potentially helpful to note that as long as I don't use threads everything works great, as expected. Something about using threads (specifically with Google's logger) causes this to break.
I am not super familiar with Python threading, so maybe this is a novice mistake. Many thanks for any help in advance.