1

I am trying to obtain data from a redis channel by using a subscription on my client application. I am using python with asyncio and aioredis for this purpose.

I would like to use my subscription to have a variable of my main application updated when this one changes on the server, but I cannot manage to pass the data received from the subscription to my main thread.

According to aioredis website, I implemented my Subscription with:

sub = await aioredis.create_redis(
     'redis://localhost')

ch1 = await sub.subscribe('channel:1')
assert isinstance(ch1, aioredis.Channel)

async def async_reader(channel, globarVar):
    while await channel.wait_message():
        msg = await channel.get(encoding='utf-8')
        # ... process message ...
        globarVar = float(msg)
        print("message in {}: {}".format(channel.name, msg))

tsk1 = asyncio.ensure_future(async_reader(ch1, upToDateValue))

But I cannot get to update the global variable, I guess python pass just the current value as argument (which I expected to, but wanted to be sure).

Is there any viable option to get data out of a subscription? or to pass a reference to a shared variable or queue I could use?

DoJeey
  • 33
  • 4

1 Answers1

1

You should redesign your code so you don't need a global variable. All of your processing should occur when receiving the message. However to modify a global variable you need to declare it in the function with the global keyword. You don't pass global variables around - you just use them.

Sub:

import aioredis
import asyncio
import json

gvar = 2

# Do everything you need here or call another function
# based on the message.  Don't use a global variable.
async def process_message(msg):
  global gvar
  gvar = msg

async def async_reader(channel):
  while await channel.wait_message():
    j = await channel.get(encoding='utf-8')
    msg = json.loads(j)
    if msg == "stop":
      break
    print(gvar)
    await process_message(msg)
    print(gvar)

async def run(loop):
  sub = await aioredis.create_redis('redis://localhost')
  res = await sub.subscribe('channel:1')
  ch1 = res[0]
  assert isinstance(ch1, aioredis.Channel)

  await async_reader(ch1)

  await sub.unsubscribe('channel:1')
  sub.close()

loop = asyncio.get_event_loop()
loop.run_until_complete( run(loop) )
loop.close()

publisher:

import asyncio
import aioredis

async def main():
    pub = await aioredis.create_redis('redis://localhost')

    res = await pub.publish_json('channel:1', ["Hello", "world"])
    await asyncio.sleep(1)
    res = await pub.publish_json('channel:1', "stop")

    pub.close()


if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())
MarkReedZ
  • 1,421
  • 4
  • 10
  • Thanks you MarkReedZ, very good example and explanation. I would like to mention as well for those who might try your code, please be sure you have installed tornado==4.5.3 and not a newer version of it, or else you may run into an error: ["asyncio: RuntimeError this event loop is already running"](https://stackoverflow.com/questions/53248431/asyncio-runtimeerror-this-event-loop-is-already-running) – DoJeey Feb 18 '19 at 03:03