1

I'm writing a program which periodically dumps old data from a RethinkDB database into a file and removes it from the database. Currently, the data is dumped into a single file which grows without limit. I'd like to change this so that the maximum file size is, say, 250 Mb, and the program starts to write to a new output file just before this size is exceeded.

It seems like Python's RotatingFileHandler class for loggers does approximately what I want; however, I'm not sure whether logging can be applied to any JSON-dumpable object or just to strings.

Another possible approach would be to use (a variant of) Mike Pennington's RotatingFile class (see python: outfile to another text file if exceed certain file size).

Which of these approaches is likely to be the most fruitful?

For reference, my current program is as follows:

import os
import sys
import json
import rethinkdb as r
import pytz
from datetime import datetime, timedelta
import schedule
import time
import functools
from iclib import RethinkDB
import msgpack

''' The purpose of the Controller is to periodically archive data from the "sensor_data" table so that it does not grow without limit.'''

class Controller(RethinkDB):
    def __init__(self, db_address=(os.environ['DB_ADDR'], int(os.environ['DB_PORT'])), db_name=os.environ['DB_NAME']):
        super(Controller, self).__init__(db_address=db_address, db_name=db_name)                              # Initialize the IperCronComponent with the default logger name (in this case, "Controller")
        self.db_table = RethinkDB.SENSOR_DATA_TABLE                  # The table name is "sensor_data" and is stored as a class variable in RethinkDBMixIn

    def generate_archiving_query(self, retention_period=timedelta(days=3)):
        expiry_time = r.now() - retention_period.total_seconds()        # Timestamp before which data is to be archived

        if "timestamp" in r.table(self.db_table).index_list().run(self.db):       # If "timestamp" is a secondary index
            beginning_of_time = r.time(1400, 1, 1, 'Z')                 # The minimum time of a ReQL time object (i.e., the year 1400 in the UTC timezone)
            data_to_archive = r.table(self.db_table).between(beginning_of_time, expiry_time, index="timestamp")         # Generate query using "between" (faster)
        else:
            data_to_archive = r.table(self.db_table).filter(r.row['timestamp'] < expiry_time)                           # Generate the same query using "filter" (slower, but does not require "timestamp" to be a secondary index)

        return data_to_archive

    def archiving_job(self, data_to_archive=None, output_file="archived_sensor_data.json"):
        if data_to_archive is None:
            data_to_archive = self.generate_archiving_query()               # By default, the call the "generate_archiving_query" function to generate the query
        old_data = data_to_archive.run(self.db, time_format="raw")        # Without time_format="raw" the output does not dump to JSON
        with open(output_file, 'a') as f:
            ids_to_delete = []
            for item in old_data:
                print item
                # msgpack.dump(item, f)
                json.dump(item, f)
                f.write('\n')                                               # Separate each document by a new line
                ids_to_delete.append(item['id'])

        r.table(self.db_table).get_all(r.args(ids_to_delete)).delete().run(self.db)        # Delete based on ID. It is preferred to delete the entire batch in a single operation rather than to delete them one by one in the for loop.

def test_job_1():
    db_name = "ipercron"
    table_name = "sensor_data"
    port_offset = 1         # To avoid interference of this testing program with the main program, all ports are initialized at an offset of 1 from the default ports using "rethinkdb --port_offset 1" at the command line.
    conn = r.connect("localhost", 28015 + port_offset)
    r.db(db_name).table(table_name).delete().run(conn)
    import rethinkdb_add_data
    controller = Controller(db_address=("localhost", 28015+port_offset))
    archiving_job = functools.partial(controller.archiving_job, data_to_archive=controller.generate_archiving_query())
    return archiving_job

if __name__ == "__main__":

    archiving_job = test_job_1()
    schedule.every(0.1).minutes.do(archiving_job)

    while True:
        schedule.run_pending()

It is not completely 'runnable' from the part shown, but the key point is that I would like to replace the line

json.dump(item, f)

with a similar line in which f is a rotating, and not fixed, file object.

Community
  • 1
  • 1
Kurt Peek
  • 52,165
  • 91
  • 301
  • 526
  • 1
    You can use [json.dumps](https://docs.python.org/3/library/json.html#json.dumps) to get json as `str` its should be work with `RotatingFileHandler`. – Stanislav Ivanov Oct 17 '16 at 15:42

1 Answers1

0

Following Stanislav Ivanov, I used json.dumps to convert each RethinkDB document to a string and wrote this to a RotatingFileHandler:

import os
import sys
import json
import rethinkdb as r
import pytz
from datetime import datetime, timedelta
import schedule
import time
import functools
from iclib import RethinkDB
import msgpack
import logging
from logging.handlers import RotatingFileHandler
from random_data_generator import RandomDataGenerator

''' The purpose of the Controller is to periodically archive data from the "sensor_data" table so that it does not grow without limit.'''

os.environ['DB_ADDR'] = 'localhost'
os.environ['DB_PORT'] = '28015'
os.environ['DB_NAME'] = 'ipercron'

class Controller(RethinkDB):
    def __init__(self, db_address=None, db_name=None):
        if db_address is None:
            db_address = (os.environ['DB_ADDR'], int(os.environ['DB_PORT']))            # The default host ("rethinkdb") and port (28015) are stored as environment variables
        if db_name is None:
            db_name = os.environ['DB_NAME']                                             # The default database is "ipercron" and is stored as an environment variable
        super(Controller, self).__init__(db_address=db_address, db_name=db_name)        # Initialize the instance of the RethinkDB class. IperCronComponent will be initialized with its default logger name (in this case, "Controller")
        self.db_name = db_name
        self.db_table = RethinkDB.SENSOR_DATA_TABLE                                     # The table name is "sensor_data" and is stored as a class variable of RethinkDBMixIn
        self.table = r.db(self.db_name).table(self.db_table)
        self.archiving_logger = logging.getLogger("archiving_logger")
        self.archiving_logger.setLevel(logging.DEBUG)
        self.archiving_handler = RotatingFileHandler("archived_sensor_data.log", maxBytes=2000, backupCount=10)
        self.archiving_logger.addHandler(self.archiving_handler)

    def generate_archiving_query(self, retention_period=timedelta(days=3)):
        expiry_time = r.now() - retention_period.total_seconds()        # Timestamp before which data is to be archived

        if "timestamp" in self.table.index_list().run(self.db):
            beginning_of_time = r.time(1400, 1, 1, 'Z')                                                     # The minimum time of a ReQL time object (namely, the year 1400 in UTC)
            data_to_archive = self.table.between(beginning_of_time, expiry_time, index="timestamp")         # Generate query using "between" (faster, requires "timestamp" to be a secondary index)
        else:
            data_to_archive = self.table.filter(r.row['timestamp'] < expiry_time)                           # Generate query using "filter" (slower, but does not require "timestamp" to be a secondary index)

        return data_to_archive

    def archiving_job(self, data_to_archive=None):
        if data_to_archive is None:
            data_to_archive = self.generate_archiving_query()               # By default, the call the "generate_archiving_query" function to generate the query
        old_data = data_to_archive.run(self.db, time_format="raw")        # Without time_format="raw" the output does not dump to JSON or msgpack

        ids_to_delete = []
        for item in old_data:
            print item
            self.dump(item)
            ids_to_delete.append(item['id'])

        self.table.get_all(r.args(ids_to_delete)).delete().run(self.db)             # Delete based on ID. It is preferred to delete the entire batch in a single operation rather than to delete them one by one in the for-loop.

    def dump(self, item, mode='json'):
        if mode == 'json':
            dump_string = json.dumps(item)
        elif mode == 'msgpack':
            dump_string = msgpack.packb(item)
        self.archiving_logger.debug(dump_string)


def populate_database(db_name, table_name, conn):

    if db_name not in r.db_list().run(conn):
        r.db_create(db_name).run(conn)                          # Create the database if it does not yet exist

    if table_name not in r.db(db_name).table_list().run(conn):
        r.db(db_name).table_create(table_name).run(conn)        # Create the table if it does not yet exist

    r.db(db_name).table(table_name).delete().run(conn)          # Empty the table to start with a clean slate

    # Generate random data with timestamps uniformly distributed over the past 6 days
    random_data_time_interval = timedelta(days=6)
    start_random_data = datetime.utcnow().replace(tzinfo=pytz.utc) - random_data_time_interval

    random_generator = RandomDataGenerator(seed=0)
    packets = random_generator.packets(N=100, start=start_random_data)
    # print packets
    print "Adding data to the database..."
    r.db(db_name).table(table_name).insert(packets).run(conn)


if __name__ == "__main__":
    db_name = "ipercron"
    table_name = "sensor_data"
    port_offset = 1         # To avoid interference of this testing program with the main program, all ports are initialized at an offset of 1 from the default ports using "rethinkdb --port_offset 1" at the command line.
    host = "localhost"
    port = 28015 + port_offset
    conn = r.connect(host, port)        # RethinkDB connection object

    populate_database(db_name, table_name, conn)

    # import rethinkdb_add_data
    controller = Controller(db_address=(host, port))
    archiving_job = functools.partial(controller.archiving_job, data_to_archive=controller.generate_archiving_query())      # This ensures that the query is only generated once. (This is sufficient since r.now() is re-evaluated every time a connection is made).

    schedule.every(0.1).minutes.do(archiving_job)

    while True:
        schedule.run_pending()

In this context the RethinkDB class does little other than define the class variable SENSOR_DATA_TABLE and the RethinkDB connection, self.db = r.connect(self.address[0], self.address[1]). This is run together with a module for generating fake data, random_data_generator.py:

import random
import faker
from datetime import datetime, timedelta
import pytz
import rethinkdb as r

class RandomDataGenerator(object):
    def __init__(self, seed=None):
        self._seed = seed
        self._random = random.Random()
        self._random.seed(seed)
        self.fake = faker.Faker()
        self.fake.random.seed(seed)

    def __getattr__(self, x):
        return getattr(self._random, x)

    def name(self):
        return self.fake.name()

    def datetime(self, start=None, end=None):
        if start is None:
            start = datetime(2000, 1, 1, tzinfo=pytz.utc)  # Jan 1st 2000
        if end is None:
            end = datetime.utcnow().replace(tzinfo=pytz.utc)

        if isinstance(end, datetime):
            dt = end - start
        elif isinstance(end, timedelta):
            dt = end
        assert isinstance(dt, timedelta)

        random_dt = timedelta(microseconds=self._random.randrange(int(dt.total_seconds() * (10 ** 6))))
        return start + random_dt

    def packets(self, N=1, start=None, end=None):
        return [{'name': self.name(), 'timestamp': self.datetime(start=start, end=end)} for _ in range(N)]

When I run controller it produces several rolled-over output logs, each at most 2 kB in size, as expected:

enter image description here

Community
  • 1
  • 1
Kurt Peek
  • 52,165
  • 91
  • 301
  • 526