5

Say we have a schema like this:

type Subscription {
  objectAddedA: ObjectA
  objectAddedB: ObjectB
}

Can a graphql client subscribe to both the objectAddedA and objectAddedB subscriptions at the same time? I'm having a hard time finding good examples of subscriptions on the web, and the graphql docs don't seem to mention them at all unless I'm missing it. We are designing a system that runs in kubernetes where a single pod will be getting api requests to add/update/delete configuration and we want to use graphql subscriptions to push these changes to any pods that care about them (they would be the graphql clients). However there are going to be lots of different object types and potentially several different types of events that they will want to be notified about at any time, so not sure if you can subscribe to several different subscriptions at once or if you have to designed the schema in a way that a single subscription will give all the possible events you'll need.

bieno002
  • 405
  • 1
  • 5
  • 12

2 Answers2

2

Actually, the GraphQL standard explicitly says that

Subscription operations must have exactly one root field.

Python's "graphql-core" library ensures it through a validation rule. Libraries that are based on it (graphene, ariadne and strawberry) would follow this rule as well.

This is what the server says if you attempt multiple subscriptions in one request:

  "error": {
    "message": "Anonymous Subscription must select only one top level field.",

You can remove this validation rule and see what happens, but remember that your in the no-standards land now, and things usually don't end well there... :D

kolypto
  • 31,774
  • 17
  • 105
  • 99
1

It is possible with graphql-python/gql

See the documentation here

Extract:

# First define all your queries using a session argument:

async def execute_query1(session):
    result = await session.execute(query1)
    print(result)

async def execute_query2(session):
    result = await session.execute(query2)
    print(result)

async def execute_subscription1(session):
    async for result in session.subscribe(subscription1):
        print(result)

async def execute_subscription2(session):
    async for result in session.subscribe(subscription2):
        print(result)

# Then create a couroutine which will connect to your API and run all your queries as tasks.
# We use a `backoff` decorator to reconnect using exponential backoff in case of connection failure.

@backoff.on_exception(backoff.expo, Exception, max_time=300)
async def graphql_connection():

    transport = WebsocketsTransport(url="wss://YOUR_URL")

    client = Client(transport=transport, fetch_schema_from_transport=True)

    async with client as session:
        task1 = asyncio.create_task(execute_query1(session))
        task2 = asyncio.create_task(execute_query2(session))
        task3 = asyncio.create_task(execute_subscription1(session))
        task4 = asyncio.create_task(execute_subscription2(session))

        await asyncio.gather(task1, task2, task3, task4)

asyncio.run(graphql_connection())
leszek.hanusz
  • 5,152
  • 2
  • 38
  • 56