3

I want to buffer output packets originating from a container's network interface. This netlink library file named sch_plug.c https://code.woboq.org/linux/linux/net/sched/sch_plug.c.html looks like it can solve the problem but i'm finding it hard to use it? How should i call these functions to actually make it work? How to get hold of the parameters like struct netlink_ext_ack *extack, struct sk_buff *skb, etc passed to those functons defined in the source code?

user13145713
  • 109
  • 8

1 Answers1

3

Command line

The qdiscs can be controlled with the commands nl-qdisc-add, nl-qdisc-delete, nl-qdisc-list (part of libnl). The --help flag can be used to show some usage example (link):

  • Create the plug qdisc with a buffer of size 32KB for the network interface eth0:

    # nl-qdisc-add --dev=eth0 --parent=root plug --limit=32768
    
  • By default, the plug qdisc will be in buffered mode (meaning it holds back all outgoing traffic). You can switch between buffered and release mode with the following commands:

    • Switch to release mode:

      # nl-qdisc-add --dev=eth0 --parent=root --update plug --release-indefinite
      
    • Switch back to buffered mode:

      # nl-qdisc-add --dev=eth0 --parent=root --update plug --buffer
      
  • You can inspect the active qdiscs with:

      # nl-qdisc-list --kind=plug --details --stats
    

    This will also tell you the id of each qdisc.

  • Based on the id, you can remove a qdisc again:

     # nl-qdisc-delete --id <id>
    

From code

The code of the tools used above can be inspected to write a custom implementation (link):

#include <linux/netlink.h>

#include <netlink/netlink.h>
#include <netlink/route/qdisc.h>
#include <netlink/route/qdisc/plug.h>
#include <netlink/socket.h>

#include <atomic>
#include <csignal>
#include <iostream>
#include <stdexcept>

/**
 * Netlink route socket.
 */
struct Socket {
  Socket() : handle{nl_socket_alloc()} {

    if (handle == nullptr) {
      throw std::runtime_error{"Failed to allocate socket!"};
    }

    if (int err = nl_connect(handle, NETLINK_ROUTE); err < 0) {
      throw std::runtime_error{"Unable to connect netlink socket: " +
                               std::string{nl_geterror(err)}};
    }
  }

  Socket(const Socket &) = delete;
  Socket &operator=(const Socket &) = delete;
  Socket(Socket &&) = delete;
  Socket &operator=(Socket &&) = delete;

  ~Socket() { nl_socket_free(handle); }

  struct nl_sock *handle;
};

/**
 * Read all links from netlink socket.
 */
struct LinkCache {
  explicit LinkCache(Socket *socket) : handle{nullptr} {
    if (int err = rtnl_link_alloc_cache(socket->handle, AF_UNSPEC, &handle);
        err < 0) {
      throw std::runtime_error{"Unable to allocate link cache: " +
                               std::string{nl_geterror(err)}};
    }
  }

  LinkCache(const LinkCache &) = delete;
  LinkCache &operator=(const LinkCache &) = delete;
  LinkCache(LinkCache &&) = delete;
  LinkCache &operator=(LinkCache &&) = delete;

  ~LinkCache() { nl_cache_free(handle); }

  struct nl_cache *handle;
};

/**
 * Link (such as "eth0" or "wlan0").
 */
struct Link {
  Link(LinkCache *link_cache, const std::string &iface)
      : handle{rtnl_link_get_by_name(link_cache->handle, iface.c_str())} {

    if (handle == nullptr) {
      throw std::runtime_error{"Link does not exist:" + iface};
    }
  }

  Link(const Link &) = delete;
  Link &operator=(const Link &) = delete;
  Link(Link &&) = delete;
  Link &operator=(Link &&) = delete;

  ~Link() { rtnl_link_put(handle); }

  struct rtnl_link *handle;
};

/**
 * Queuing discipline.
 */
struct QDisc {
  QDisc(const std::string &iface, const std::string &kind)
      : handle{rtnl_qdisc_alloc()} {
    if (handle == nullptr) {
      throw std::runtime_error{"Failed to allocate qdisc!"};
    }

    struct rtnl_tc *tc = TC_CAST(handle);

    // Set link
    LinkCache link_cache{&socket};
    Link link{&link_cache, iface};
    rtnl_tc_set_link(tc, link.handle);

    // Set parent qdisc
    uint32_t parent = 0;

    if (int err = rtnl_tc_str2handle("root", &parent); err < 0) {
      throw std::runtime_error{"Unable to parse handle: " +
                               std::string{nl_geterror(err)}};
    }

    rtnl_tc_set_parent(tc, parent);

    // Set kind (e.g. "plug")
    if (int err = rtnl_tc_set_kind(tc, kind.c_str()); err < 0) {
      throw std::runtime_error{"Unable to set kind: " +
                               std::string{nl_geterror(err)}};
    }
  }

  QDisc(const QDisc &) = delete;
  QDisc &operator=(const QDisc &) = delete;
  QDisc(QDisc &&) = delete;
  QDisc &operator=(QDisc &&) = delete;

  ~QDisc() {
    if (int err = rtnl_qdisc_delete(socket.handle, handle); err < 0) {
      std::cerr << "Unable to delete qdisc: " << nl_geterror(err) << std::endl;
    }

    rtnl_qdisc_put(handle);
  }

  void send_msg() {
    int flags = NLM_F_CREATE;

    if (int err = rtnl_qdisc_add(socket.handle, handle, flags); err < 0) {
      throw std::runtime_error{"Unable to add qdisc: " +
                               std::string{nl_geterror(err)}};
    }
  }

  Socket socket;
  struct rtnl_qdisc *handle;
};

/**
 * Queuing discipline for plugging traffic.
 */
class Plug {
public:
  Plug(const std::string &iface, uint32_t limit, bool enabled)
      : qdisc_{iface, "plug"}, enabled_{enabled} {

    rtnl_qdisc_plug_set_limit(qdisc_.handle, limit);
    qdisc_.send_msg();

    set_enabled(enabled_);
  }

  void set_enabled(bool enabled) {
    if (enabled) {
      rtnl_qdisc_plug_buffer(qdisc_.handle);
    } else {
      rtnl_qdisc_plug_release_indefinite(qdisc_.handle);
    }

    qdisc_.send_msg();
    enabled_ = enabled;
  }

  bool is_enabled() const { return enabled_; }

private:
  QDisc qdisc_;

  bool enabled_;
};

std::atomic<bool> quit{false};

void exit_handler(int /*signal*/) { quit = true; }

int main() {
  std::string iface{"eth0"};
  constexpr uint32_t buffer_size = 32768;
  bool enabled = true;

  Plug plug{iface, buffer_size, enabled};

  /**
   * Set custom exit handler to ensure destructor runs to delete qdisc.
   */
  struct sigaction sa {};
  sa.sa_handler = exit_handler;
  sigfillset(&sa.sa_mask);
  sigaction(SIGINT, &sa, nullptr);

  while (!quit) {
    std::cout << "Plug set to " << plug.is_enabled() << std::endl;
    std::cout << "Press <Enter> to continue.";
    std::cin.get();

    plug.set_enabled(!plug.is_enabled());
  }

  return EXIT_SUCCESS;
}

Set the network interface you want to use in the main function (e.g. eth0 or wlan0). The program can then be used with:

# g++ -std=c++17 -Wall -Wextra -pedantic netbuf.cpp $( pkg-config --cflags --libs libnl-3.0 libnl-route-3.0 )
# ./a.out 
Plug set to 1
Press <Enter> to continue.
Plug set to 0
Press <Enter> to continue.
Plug set to 1
Press <Enter> to continue.

(Exit with Ctrl+c.)

  • 1
    Thank you very very much for answering. At once i thought i would be stuck at this forever. It would be very interesting to know how did you figure out all this from that link. I spent almost 2 weeks now(including today) and didn't understand what will be the flow of the program while implementing. – user13145713 Oct 04 '20 at 07:47
  • 2
    It would be helpful to know how did you approach the question and solved it. what things striked you to look at, you even mentioned two links which i didn't even got while i was searching around this topic. – user13145713 Oct 04 '20 at 07:50
  • 1
    One thing about the answer, if you again go to the page which i mentioned as a link in the question, there was a diagram of sequence of two plugging and one unplugging, will this can solve that also? I mean do i have to use another qdisc while buffering for another epoch or the same can be used with some modifications in the code? – user13145713 Oct 04 '20 at 07:54
  • 3
    I'm glad I could help you! I started learning about netlink last week, because I wanted to gather information about active socket connections (specifically bluetooth sockets, but unfortunately the kernel does not expose information about those via netlink). I looked at the [netlink manpage](https://man7.org/linux/man-pages/man7/netlink.7.html) and I analyzed the source code of [`ss`](https://github.com/shemminger/iproute2/blob/main/misc/ss.c), (part of iproute2), which uses netlink to dump socket statistics. – f9c69e9781fa194211448473495534 Oct 05 '20 at 00:09
  • 2
    That gave me a basic understanding of how to use netlink, but I also learned that using the raw netlink protocol is tedious, and that some abstraction layer should be used on top of it. iproute2 has its own internal library [`libnetlink`](https://github.com/CumulusNetworks/iproute2/blob/master/lib/libnetlink.c), and then as stand-alone libraries there seem to be `libnl` and `libmnl`. – f9c69e9781fa194211448473495534 Oct 05 '20 at 00:10
  • 2
    When I read your question, I first wondered how the functions from the file you linked were used in the kernel. I searched for `plug_enqueue` in the kernel code, and found no match in any other files. I then looked at the bottom of the file where the `plug_enqueue` function is placed into a `plug_qdisc_ops` struct, which is passed to a function `register_qdisc`. I wanted to know what qdiscs are, which led me [here](https://tldp.org/HOWTO/Traffic-Control-HOWTO/components.html). – f9c69e9781fa194211448473495534 Oct 05 '20 at 00:12
  • 2
    Then I wanted to know how to access qdiscs with netlink, so I googled `netlink qdiscs`, which took me [here](https://www.infradead.org/~tgr/libnl/doc/api/group__qdisc.html) and on that page is a link for the plug qdisc ([here](https://www.infradead.org/~tgr/libnl/doc/api/group__qdisc__plug.html)). From there I mostly went through the libnl source code, which led me to the `nl-qdisc-add` commands and how they are implemented. – f9c69e9781fa194211448473495534 Oct 05 '20 at 00:13
  • 2
    Regarding your question about the diagram of two plugging and one release operation. That should be possible with the `--release-one` flag for `nl-qdisc-add`, I think the same example is mentioned in the usage example [here](https://github.com/tgraf/libnl/blob/a17970b974bb3896f253817f98a9fa6176fcd422/lib/cli/qdisc/plug.c#L29) (see line 37 to 46). And in the code, I think the you would have to use the `rtnl_qdisc_plug_release_one` function (instead of `rtnl_qdisc_plug_release_indefinite`). – f9c69e9781fa194211448473495534 Oct 05 '20 at 00:17
  • 2
    While compiling i was getting many errors of undefined refernece to functions like `nl_socket_alloc`, `nl_connect` and multiple others. This can be corrected by writing the filename in front of the libraries used int the compilation command. You might want to add these in the answer. i.e this: `sudo g++ -std=c++17 -Wall -Wextra -pedantic netbuf.cpp $( pkg-config --cflags --libs libnl-3.0 libnl-route-3.0 )` – user13145713 Oct 08 '20 at 15:02
  • 2
    Ok thank you for pointing this out. I will edit the answer to contain this fix. It could maybe be caused by your `libnl` version using a static library and my version using a shared library. – f9c69e9781fa194211448473495534 Oct 08 '20 at 18:51
  • 1
    from the command line, after creating a qdisc, it is in buffer mode by default. but in the code you are calling `set_enabled` function even after creating a qdisc. so it's like two buffering plugs are created in the starting? – user13145713 Oct 08 '20 at 19:20
  • You are right, the command line example and the code example differ slightly in this regard and the code would create two plugs. This should not make a difference when only switching between buffer and release_indefinite, but should make a difference when using release_one. In the latter case, you could modify the `Plug` constructor to only call the initial `set_enabled` when `enabled_` is false. – f9c69e9781fa194211448473495534 Oct 12 '20 at 23:54
  • ok ...I guess that's why the `release_one` was not working, I modified the code, but only a specific sequence was working like 1, 1, 2, 1, 1, 2........so on. 1 indicates buffering and 2 indicates releasing. any other sequence was not working(like 1, 1, 1, 2 ..don't know why), Basically my aim was to call release one on receiving an external signal and call buffer periodically say after `30ms`. – user13145713 Oct 14 '20 at 03:49
  • @f9c69e9781fa194211448473495534 I've made a program which uses this `qdiscs` in the sense in which it was built to use as mentioned in https://www.infradead.org/~tgr/libnl/doc/api/group__qdisc__plug.html#details link what I'm doing it after the checkpoint is created to asynchronously send the checkpoint to back up I've created another thread and a job queue where the main thread will insert the checkpoint id and the job thread will take the job id from and transfer the checkpoint associated with that id. The problem is the job thread is responsible for unplugging the `qdisc`..(conti) – y_159 Nov 02 '20 at 11:14
  • The job thread will call the `rtnl_qdisc_plug_release_one` function each time but this is giving the error of `Message sequence mismatch error`. if I remove the job thread function and call this `release_one` in the main thread, then it's working. Can you help in this? – y_159 Nov 02 '20 at 11:20
  • If checkpoint thing seems complex/ or creates confuson, forget about it, just take it as, that if I call `release_one` from another thread, it will give the above error. – y_159 Nov 02 '20 at 12:06
  • Sorry, I was a bit busy the last few days. I answered here: https://stackoverflow.com/questions/64669383/cannot-understand-this-message-sequence-mismatch-error/64774247 – f9c69e9781fa194211448473495534 Nov 10 '20 at 17:58