2

I'm using mitmproxy to intercept some requests/responses from my mobile devices. I have block of code to start it as below:

import asyncio
from mitmproxy import proxy, options, ctx
from mitmproxy.tools.dump import DumpMaster
from mitmproxy import http


class AdjustBody:
    def response(self, flow: http.HTTPFlow) -> None:
        if "google" in flow.request.url:
            print("Before intercept: %s" % flow.response.text)
            flow.response.content = bytes("This is replacement response", "UTF-8")
            print("After intercept: %s" % flow.response.text)


def start():
    add_on = AdjustBody()

    opts = options.Options(listen_host='192.168.1.224', listen_port=8888, confdir="/Users/hienphan/.mitmproxy")
    proxy_conf = proxy.config.ProxyConfig(opts)

    dump_master = DumpMaster(opts)
    dump_master.server = proxy.server.ProxyServer(proxy_conf)
    dump_master.addons.add(add_on)

    try:
        asyncio.ensure_future(stop())
        dump_master.run()
    except KeyboardInterrupt:
        dump_master.shutdown()


async def stop():
    # Sleep 10s to do intercept
    await asyncio.sleep(10)
    ctx.master.shutdown()

start()

I can start it properly, but it's a run_forever() event loop. Then I don't know how to stop it programatically. What I try here is just to sleep 10s to do what I want before shutting it down. Is there any way to wait for my interception is done before shutting down proxy?

Hien Phan
  • 21
  • 2

2 Answers2

0

you can try this

import asyncio
import os
import signal

from mitmproxy import proxy, options
from mitmproxy.tools.dump import DumpMaster
from mitmproxy.http import HTTPFlow


class AddHeader:
    def __init__(self):
        self.num = 0

    def response(self, flow: HTTPFlow):
        self.num = self.num + 1
        flow.response.headers["count"] = str(self.num)


addons = [
    AddHeader()
]

opts = options.Options(listen_host='0.0.0.0', listen_port=8080)
pconf = proxy.config.ProxyConfig(opts)

m = DumpMaster(opts)
m.server = proxy.server.ProxyServer(pconf)
m.addons.add(*addons)

try:
    loop = asyncio.get_event_loop()
    try:
        loop.add_signal_handler(signal.SIGINT, getattr(m, "prompt_for_exit", m.shutdown))
        loop.add_signal_handler(signal.SIGTERM, m.shutdown)
    except NotImplementedError:
        # Not supported on Windows
        pass

    # Make sure that we catch KeyboardInterrupts on Windows.
    # https://stackoverflow.com/a/36925722/934719
    if os.name == "nt":
        async def wakeup():
            while True:
                await asyncio.sleep(0.2)
        asyncio.ensure_future(wakeup())

    m.run()
except (KeyboardInterrupt, RuntimeError):
    pass
mocobk
  • 1
  • 1
0

I developed a script running mitmproxy in the background while handling some other stuff some time ago. I tried keeping control over the main thread but could not find a viable solution that is also easy to understand.

My personal preference is to use Python's threading module to start a side thread shutting down the dump master after 10 seconds.

Example:

import threading
import sleep

...
# Initialise dump_master somewhere here
...

def countdown(dump_master):
    time.sleep(10)
    dump_master.shutdown()

# Start countdown function with dump_master as argument in new thread
timer = threading.Thread(
    target=countdown, args=[dump_master], daemon=True)
timer.start()

# Start dumpmaster
dump_master.run()