0

I am trying to run two different Discord Bots using a single python script using cogs. But when I try to run the 2nd bot it throws an ImportError even-though I didn't use that specific Library. The reaction roles bot works fine without the anti spam bot. Here's my code. FYI I am working inside a Virtual Env.

main.py

if __name__ == "__main__":
    try:
        reaction_role_bot = commands.Bot(command_prefix=config["reaction_role_bot"]["bot_prefix"], intents=discord.Intents.all())
        reaction_slash = SlashCommand(reaction_role_bot, sync_commands=True)
        reaction_role_bot.load_extension(f"cogs.{str(os.path.basename('cogs/reaction_roles.py')[:-3])}")
        
        anti_spam_bot = commands.Bot(command_prefix=config["anti_spam_bot"]["bot_prefix"], intents=discord.Intents.default())
        spam_slash = SlashCommand(anti_spam_bot, sync_commands=True)   
        anti_spam_bot.load_extension(f"cogs.{str(os.path.basename('cogs/anti_spam.py')[:-3])}")
        
        event_loop = asyncio.get_event_loop()
        event_loop.create_task(reaction_role_bot.run(config["reaction_role_bot"]["token"]))
        event_loop.create_task(anti_spam_bot.run(config["anti_spam_bot"]["token"]))
        event_loop.run_forever()
        
    except Exception as e:
        print(e)

anti_spam.py

import platform
import os

import discord
from discord.ext import commands

from antispam import AntiSpamHandler
from antispam.plugins import AntiSpamTracker, Options

class AntiSpamBot(commands.Cog):
    def __init__(self, client):
        self.client = client
        
        # Initialize the AntiSpamHandler
        self.client.handler = AntiSpamHandler(self.client, options=Options(no_punish=True))
        # 3 Being how many 'punishment requests' before is_spamming returns True
        self.client.tracker = AntiSpamTracker(self.client.handler, 3) 
        self.client.handler.register_extension(self.client.tracker)
        
    @commands.Cog.listener()
    async def on_ready(self):
        print("---------------------------------")
        print(f"Logged in as {str(self.client.user)}")
        print(f"Discord.py API version: {discord.__version__}")
        print(f"Python version: {platform.python_version()}")
        print(f"Running on: {platform.system()} {platform.release()} ({os.name})")
        await self.client.change_presence(status=discord.Status.idle, activity=discord.Game(name="Head of Security"))
        print("---------------------------------\n")
        
    # The code in this event is executed every time a valid commands catches an error
    @commands.Cog.listener()
    async def on_command_error(context, error):
        raise error
    
    @commands.Cog.listener()
    async def on_message(self, message):
        await self.client.handler.propagate(message)
        
        if self.client.tracker.is_spamming(message):
            await message.delete()
            await message.channel.send(f"{message.author.mention} has been automatically kicked for spamming.")
            await message.author.kick()
        
        await self.client.process_commands(message)
    
        
def setup(client):
    client.add_cog(AntiSpamBot(client))

Error

Extension 'cogs.anti_spam' raised an error: ImportError: cannot import name 'AsyncMock' from 'unittest.mock' (/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/unittest/mock.py)

I've no experiences using cogs and its bit confusing me. Any kind of help would help me to sort this out! Thanks in advance!

LW001
  • 2,452
  • 6
  • 27
  • 36
nimendra
  • 25
  • 5

1 Answers1

0

I do not believe this is a cog registration issue. I believe this is an import error from some of the dependencies in your cog file. I googled your error and found something similar, I recommend checking it out here for some more information.

As a blanket statement, I would double check that you have mock installed, and that you're installing it on the version of Python that you think you're installing it on. It can get wonky if you have multiple python versions insealled.

Also, on an unrelated note:: It is best to avoid running multiple bot instances in one python file, but I can help you do it the best way possible.

For starters, you have to realize that Client.run is an abstraction of a couple of more lower level concepts.

There is Client.login which logs in the client and then Client.connect which actually runs the processing. These are coroutines.

asyncio provides the capability of putting things in the event loop for it to work whenever it has time to.

Something like this e.g.

loop = asyncio.get_event_loop()

async def foo():
  await asyncio.sleep(10)
  loop.close()

loop.create_task(foo())
loop.run_forever()

If we want to wait for something to happen from another coroutine, asyncio provides us with this functionality as well through the means of synchronisation via asyncio.Event. You can consider this as a boolean that you are waiting for:

e = asyncio.Event()
loop = asyncio.get_event_loop()

async def foo():
  await e.wait()
  print('we are done waiting...')
  loop.stop()

async def bar():
  await asyncio.sleep(20)
  e.set()

loop.create_task(bar())
loop.create_task(foo())
loop.run_forever() # foo will stop this event loop when 'e' is set to true
loop.close()

Using this concept we can apply it to the discord bots themselves.

import asyncio
import discord
from collections import namedtuple

# First, we must attach an event signalling when the bot has been
# closed to the client itself so we know when to fully close the event loop.

Entry = namedtuple('Entry', 'client event')
entries = [
  Entry(client=discord.Client(), event=asyncio.Event()),
  Entry(client=discord.Client(), event=asyncio.Event())
]

# Then, we should login to all our clients and wrap the connect call
# so it knows when to do the actual full closure

loop = asyncio.get_event_loop()

async def login():
  for e in entries:
    await e.client.login()

async def wrapped_connect(entry):
  try:
    await entry.client.connect()
  except Exception as e:
    await entry.client.close()
    print('We got an exception: ', e.__class__.__name__, e)
    entry.event.set()

# actually check if we should close the event loop:
async def check_close():
  futures = [e.event.wait() for e in entries]
  await asyncio.wait(futures)

# here is when we actually login
loop.run_until_complete(login())

# now we connect to every client
for entry in entries:
  loop.create_task(wrapped_connect(entry))

# now we're waiting for all the clients to close
loop.run_until_complete(check_close())

# finally, we close the event loop
loop.close()


Iced Chai
  • 472
  • 2
  • 13
  • Thanks for the tip @iced-chai but the ImportError is still here.. I've referred the question that you mention above but no luck.. – nimendra Nov 06 '21 at 07:14
  • Also the specific mock.py doesn't contain any AsyncMock as it mentioned in the exception.. – nimendra Nov 06 '21 at 07:18