I'm using mitmproxy to intercept some requests/responses from my mobile devices. I have block of code to start it as below:
import asyncio
from mitmproxy import proxy, options, ctx
from mitmproxy.tools.dump import DumpMaster
from mitmproxy import http
class AdjustBody:
def response(self, flow: http.HTTPFlow) -> None:
if "google" in flow.request.url:
print("Before intercept: %s" % flow.response.text)
flow.response.content = bytes("This is replacement response", "UTF-8")
print("After intercept: %s" % flow.response.text)
def start():
add_on = AdjustBody()
opts = options.Options(listen_host='192.168.1.224', listen_port=8888, confdir="/Users/hienphan/.mitmproxy")
proxy_conf = proxy.config.ProxyConfig(opts)
dump_master = DumpMaster(opts)
dump_master.server = proxy.server.ProxyServer(proxy_conf)
dump_master.addons.add(add_on)
try:
asyncio.ensure_future(stop())
dump_master.run()
except KeyboardInterrupt:
dump_master.shutdown()
async def stop():
# Sleep 10s to do intercept
await asyncio.sleep(10)
ctx.master.shutdown()
start()
I can start it properly, but it's a run_forever() event loop. Then I don't know how to stop it programatically. What I try here is just to sleep 10s to do what I want before shutting it down. Is there any way to wait for my interception is done before shutting down proxy?