-1

The variable SCRIPT_ENV gets set correctly in main block but that value does not propagate to other functions. Here's my full working code:

import argparse
import settings
from multiprocessing import Pool


def set_brokers_and_cert_path():
    brokers = None
    cert_full_path = None
    print("I am here man\n\n {0}".format(settings.SCRIPT_ENV))
    if settings.SCRIPT_ENV == "some value":
        brokers = # use these brokers
        cert_full_path = settings.BASE_CERT_PATH + "test_env/"
    if settings.SCRIPT_ENV == "some other value":
        brokers = # use those brokers
        cert_full_path = settings.BASE_CERT_PATH + "new_env/"
    return brokers, cert_full_path


def func_b(partition):
    kafka_brokers, cert_full_path = set_brokers_and_cert_path()
    producer = KafkaProducer(bootstrap_servers=kafka_brokers,
                             security_protocol='SSL',
                             ssl_check_hostname=True,
                             ssl_cafile=cert_full_path +'cacert.pem',
                             ssl_certfile=cert_full_path + 'certificate.pem',
                             ssl_keyfile=cert_full_path + 'key.pem',
                             max_block_ms=1200000,
                             value_serializer=lambda v: json.dumps(v).encode('utf-8'),
                             key_serializer=str.encode
                             )
    try:
        producer.send(settings.KAFKA_TOPIC,
                      value="some val",
                      key="some key",
                      timestamp_ms=int(time.time()),
                      headers=[some headers],
                      partition=partition)
        producer.flush()
    except AssertionError as e:
        print("Error in partition: {0}, {1}".format(partition, e))


def main():
  with Pool(settings.NUM_PROCESSES) as p:
    p.map(func_b, [i for i in range(0, 24)])

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--env", help="Environment against which the script needs to run")
    args = parser.parse_args()
    if args.env:
        settings.SCRIPT_ENV = args.env
        main()
    else:
        raise Exception("Please pass env argument. Ex: --env test/accept")

In the line "I am here man", it prints None as the value of SCRIPT_ENV. Here, SCRIPT_ENV gets set perfectly in the if __name__ == "__main__" block, but in func_a, it comes as None.

contents of settings.py:

KAFKA_TOPIC = "some topic"
NUM_PROCESSES = 8
NUM_MESSAGES = 1000
SCRIPT_ENV = None
NUM_PARTITIONS = 24
TEST_BROKERS = [some brokers]
ACCEPT_BROKERS = [some brokers]
BASE_CERT_PATH = "base path"

I run it like this:

python <script.py> --env <value>
Rajat Bhardwaj
  • 129
  • 1
  • 11

1 Answers1

-1

In your code example a few references are missing, without which it is not executable. First, we should find out the program parts that lead to the behavior you describe. I do not suppose the argparse to be crucial. Basically you change an attribute in the module instance settings and call main(), so you can reduce the last part of the program to a few lines:

settings.SCRIPT_ENV = "test"
main()

Function main() propagates function func_b() over multiple processes. Function func_b() itself performs following steps:

  1. Call of function set_brokers_and_cert_path() (you earlier named func_a())
  2. Create an instance of KafkaProducer (producer)
  3. Send some data per producer.

I cannot see that the created instance has any effect on the content of settings.SCRIPT_ENV, so I reduce function func_b() as well and leave only the first function call (for simplicity I take the original name func_a()).

Function func_a() (aka set_brokers_and_cert_path()) likewise has no apparent influence on settings.SCRIPT_ENV, since only a few strings are generated. For debugging purposes I left the output of the text variables in the function.

All together, I come up with the following executable minimal example, which does not recreate for me the problem you describe (see output). You can take this example and transfer it piece by piece back to your code and then see which part of the program leads to the problems.

import settings
from multiprocessing import Pool


def func_a():
    print("I am here man\n{0}\n".format(settings.SCRIPT_ENV))


def func_b(partition):
    func_a()


def main():
  with Pool(settings.NUM_PROCESSES) as p:
    p.map(func_b, [i for i in range(0, 24)])


settings.SCRIPT_ENV = "Test"
main()

Output

I am here man
Test

I am here man
Test

I am here man
Test

I am here man
Test

...

@RajatBhardwaj, at this point, it is up to you to decide whether you want to participate in finding a solution.

All others who are not interested in a solution would do well not to criticize solutions without having understood its intention.

Andreas
  • 159
  • 1
  • 7
  • This isn't OPs code at all. Their code never calls `func_a`. – Axe319 Dec 30 '22 at 13:42
  • @Axe319, it's a simplified version of the code. His `set_brokers_and_cert_path()` called in `func_b()` is my `func_a()`. No reason for a down vote! – Andreas Dec 30 '22 at 14:41
  • That makes a massive difference if `set_brokers_and_cert_path` is in another module. The point is, OP's code isn't working and this code is. That means there's at least something functionally different about this code. We don't know what that is until they include more details in their post. Even if it would be exactly the same as yours, how is "it works on my machine" an answer? – Axe319 Dec 30 '22 at 19:50